Skip to content

Calculate valid partition divisions to avoid parser EOF error when CSV rows contain unescaped newlines #8045

@lmmx

Description

@lmmx

This follows discussion in #4415 on the parser error encountered (‘EOF within string’) when partitioning a file with unescaped newlines on a row. The partition naively splits on newlines (from what I understand?), which is only a valid assumption for files without unescaped within-row EOL characters.

It should also be noted that this function may fail if a {file_type} file
includes quoted strings that contain the line terminator. To get around this
you can specify ``blocksize=None`` to not split files into multiple partitions,
at the cost of reduced parallelism.

Would it be possible to ‘assist’ dask in identifying where to partition a CSV somehow?

From the above issue I surmise that this is not currently a feature (but if it is and nobody thought to suggest it there please let me know).

I can imagine a function that:

  • opens the file,
  • seeks to the would-be partition offset minus some chunk size (expected to cover one or more lines),
  • reads in a chunk
  • identifies the latest row ending position within that chunk
  • passes that chunk to dask as the partition point
  • [repeat until file is completely partitioned]

(I recently wrote a function that ‘greps backwards’ within a buffer, here, which seems in the same spirit)

This could be done to identify the partition positions and then pass these into dask, or dask could provide a helper method to generate these positions.

Alternatively, if dask’s partition positions are accessible, these could be modified after calculation, but I presume manually doing so would be undesirable (primarily as if all the partition positions moved backwards then the final one would risk being greater than the max. permitted partition size)

I tried to load the WIT dataset which has newlines within rows (i.e. unescaped free text, from Wikipedia image captions) and cannot get the advantage of cuDF speedup over pandas due to the problem of distributing the computation (in the way that dask achieves with partitions).

I think I’d be able to speed this up if dask could successfully partition with such a routine, but I don’t know where to put it (i.e. I don’t see the “select where to partition” aspect of dask exposed in the API, I presume it is not)

Another alternative I can envision is to try/except reading one row at the start of each partition (skipping the first as this is not post-partition), and if it fails with the EOF error then move that partition to the previous newline and repeat


Minimal example:

  • First get the sample file (here I also decompressed it)
    • wget https://storage.googleapis.com/gresearch/wit/wit_v1.train.all-1percent_sample.tsv.gz
import dask.dataframe as dd
from pathlib import Path
import time

p = Path("wit_v1.train.all-1percent_sample.tsv")
print(f"Data file: {p.name}")

t0 = time.time()
dtype_dict = {"mime_type": "category"}
fields = [*dtype_dict]
df = dd.read_csv(p, sep="\t", dtype=dtype_dict, usecols=fields)
filetypes = list(df.mime_type)
t1 = time.time()
print(f"dask took: {t1-t0}s")

Data file: wit_v1.train.all-1percent_sample.tsv
Traceback (most recent call last):
  File "dask_test.py", line 12, in <module>
    filetypes = list(df.mime_type)
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/dataframe/core.py", line 579, in __len__
    return self.reduction(
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/base.py", line 286, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/base.py", line 568, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/threaded.py", line 79, in get
    results = get_async(
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/local.py", line 514, in get_async
    raise_exception(exc, tb)
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/local.py", line 325, in reraise
    raise exc
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/local.py", line 223, in execute_task
    result = _execute_task(task, data)
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/optimization.py", line 969, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/core.py", line 151, in get
    result = _execute_task(task, cache)
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/core.py", line 121, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/dataframe/io/csv.py", line 125, in __call__
    df = pandas_read_text(
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/dask/dataframe/io/csv.py", line 178, in pandas_read_text
    df = reader(bio, **kwargs)
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/pandas/util/_decorators.py", line 311, in wrapper
    return func(*args, **kwargs)
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/pandas/io/parsers/readers.py", line 586, in read_csv
    return _read(filepath_or_buffer, kwds)
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/pandas/io/parsers/readers.py", line 488, in _read
    return parser.read(nrows)
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/pandas/io/parsers/readers.py", line 1047, in read
    index, columns, col_dict = self._engine.read(nrows)
  File "/home/louis/miniconda3/envs/dask_test/lib/python3.8/site-packages/pandas/io/parsers/c_parser_wrapper.py", line 223, in read
    chunks = self._reader.read_low_memory(nrows)
  File "pandas/_libs/parsers.pyx", line 801, in pandas._libs.parsers.TextReader.read_low_memory
  File "pandas/_libs/parsers.pyx", line 857, in pandas._libs.parsers.TextReader._read_rows
  File "pandas/_libs/parsers.pyx", line 843, in pandas._libs.parsers.TextReader._tokenize_rows
  File "pandas/_libs/parsers.pyx", line 1925, in pandas._libs.parsers.raise_parser_error
pandas.errors.ParserError: Error tokenizing data. C error: EOF inside string starting at row 34111

(The exact row number given varies)

I can't see any way of inspecting the divisions that are computed here (as they're not coming from a pre-existing dataframe but created from the TSV, hence no index to display in the divisions)

>>> df.divisions
(None, None, None, None, None, None, None, None, None, None, None, None)
>>> df.known_divisions
False

but since it's bytes buffers under the hood you could check the divisions by seeking to the end of these buffers and performing the routine described above.

The blocks in question are the values passed as the block_lists arg to text_blocks_to_pandas after being constructed in read_pandas:

values = [[list(dsk.dask.values()) for dsk in block] for block in values]

so read_pandas is where the change I am suggesting would go.

The values in turn were unpacked from b_out, which comes from read_bytes (from dask.bytes.core):

b_out = read_bytes(
urlpath,
delimiter=b_lineterminator,
blocksize=blocksize,
sample=sample,
compression=compression,
include_path=include_path_column,
**(storage_options or {}),
)
if include_path_column:
b_sample, values, paths = b_out
path = (include_path_column, path_converter)
else:
b_sample, values = b_out
path = None

The offsets are derived from these block sizes without any line delimiter detection:

dask/dask/bytes/core.py

Lines 115 to 116 in 5cb8961

off = list(range(0, size, blocksize))
length = [blocksize] * len(off)

and then very importantly the blocks are split according to the delimiter here (the delimiter was passed as the byte-encoded line terminator):

dask/dask/bytes/core.py

Lines 126 to 138 in 5cb8961

for path, offset, length in zip(paths, offsets, lengths):
token = tokenize(fs_token, delimiter, path, fs.ukey(path), compression, offset)
keys = ["read-block-%s-%s" % (o, token) for o in offset]
values = [
delayed_read(
OpenFile(fs, path, compression=compression),
o,
l,
delimiter,
dask_key_name=key,
)
for o, key, l in zip(offset, keys, length)
]

After which they are ready to be passed on to the delayed_read which then becomes values a.k.a. block_list.

It's that step of splitting the blocks according to the delimiter that I think could be modified to ensure rows are parsed correctly.

2 further aspects to this:

  • Within the delayed_read function call is OpenFile (i.e. executed and then passed into delayed_read)
  • delayed_read is itself an alias:
    delayed_read = delayed(read_block_from_file)
    • delayed is the lazy computation proxy (not relevant to what's done here)
    • read_blocks_from_file means that the computation being delayed is to read the block passed with 4 arguments: lazy_file, off, bs, delimiter

      dask/dask/bytes/core.py

      Lines 168 to 172 in 5cb8961

      def read_block_from_file(lazy_file, off, bs, delimiter):
      with copy.copy(lazy_file) as f:
      if off == 0 and bs is None:
      return f.read()
      return read_block(f, off, bs, delimiter)
    • The lazy_file = an OpenFile object via fsspec.core
    • The off offset is the unmodified offset (a multiple of the block size from list(range(0, size, block_size))
    • The bs is the l, an item in lengths, which will always be block_size except for the last block which may be shorter if the file size is not an exact multiple of the block_size
    • read_block is called when l in lengths is set [by virtue of block_size being set, as it is here] to anything other than None, and this function comes from fsspec.utils.
  • The signature and docstring show that its default behaviour is split_before=False, hence it starts/stops reading after the delimiter [newline] at offset and offset+length. This is how the newlines are detected.

So long story short:

  • Roughly speaking [assuming file is evenly split into blocks of equal size], dd.read_csv reduces to
for o in range(0, size, dd.io.csv.AUTO_BLOCK_SIZE):
  fsspec.read_block(f=input_csv, offset=o, length=dd.io.csv.AUTO_BLOCKSIZE,, delimiter="\n", split_before=False)
  • This respects line separators but not necessarily row-delimiting line separators
  • Extending this procedure to respect row-delimiting line separators would make dask usable with a larger subset of the CSV files pandas can read

The problem remains somewhat tricky to specify (how to verify that the newline at a given offset into a CSV is/isn’t the start of a valid row and partition at the next one)


I thought about this some more and the three approaches I can think of are:

  1. Let the user provide a custom checking function to be used to verify the buffer bytes at a position are valid
  • In the example dataset I am using, this would be very simple: the first column must equal a lower case two letter language code followed by the separator, and so on. This would make it very simple to check that the bytes after the line separator at a given offset was indeed the start of a row. (Or rather, to rule out invalid rows)
  1. Let the user provide a list of rows which may contain free text (including newlines, i.e. the "separable columns", those which may be separated across lines), so as to validate based on an attempted separation (reading as many bytes as necessary, including line separator/s, until fulfilling the expected number of columns). Alternatively, these may be discerned from the sample.
  2. Simply have pandas 'stream' a row, and catch any parsing error as a failure

The 1st option would potentially involve reading fewer bytes (thus faster).

The 2nd option seems the simplest to achieve, to identify the correct "delimiter offset" for the given partition offset (that is, the number of newline delimiters away from the offset at which the next row-delimiting newline is found). If the first newline delimiter after the partition offset was a valid start of row then the 'delimiter offset' would be 0, if it was the one after then it'd be 1, and so on.

The 3rd option would be most 'foolproof' (and perhaps simplest).

I need to think about this some more (I suspect it may need a mix of the 3 approaches). Any suggestions/comments welcome

Metadata

Metadata

Assignees

No one assigned

    Labels

    dataframeioneeds attentionIt's been a while since this was pushed on. Needs attention from the owner or a maintainer.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions