Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected ParserError when loading data with dask.dataframe.read_csv() #7680

Closed
paigem opened this issue May 20, 2021 · 5 comments · Fixed by #7825
Closed

Unexpected ParserError when loading data with dask.dataframe.read_csv() #7680

paigem opened this issue May 20, 2021 · 5 comments · Fixed by #7825

Comments

@paigem
Copy link

paigem commented May 20, 2021

What happened:

Get unexpected ParserError when loading a csv file via dask.dataframe.read_csv(). However, loading the file directly with pandas.read_csv() and then converting to dask Dataframe via dask.dataframe.from_pandas() runs successfully.

What you expected to happen:

I expect dask.dataframe.read_csv() to successfully load the data if pandas.read_csv() is able to.

Minimal Complete Verifiable Example:

import dask.dataframe as dd
url = 'https://webservices.volcano.si.edu/geoserver/GVP-VOTW/ows?service=WFS&version=2.0.0&request=GetFeature&typeName=GVP-VOTW:Smithsonian_VOTW_Holocene_Volcanoes&outputFormat=csv'
df = dd.read_csv(url, blocksize=None)

Anything else we need to know?:

This bug was found while running a dask notebook tutorial in Pangeo Tutorial Gallery, which runs on Pangeo Binder. This issue was originally reported here.

The error can be found below:

ParserError ---------------------------------------------------------------------------

ParserError Traceback (most recent call last)
in
6
7 # blocksize=None means use a single partion
----> 8 df = dd.read_csv(server+query, blocksize=None)

/srv/conda/envs/notebook/lib/python3.7/site-packages/dask/dataframe/io/csv.py in read(urlpath, blocksize, collection, lineterminator, compression, sample, enforce, assume_missing, storage_options, include_path_column, **kwargs)
578 storage_options=storage_options,
579 include_path_column=include_path_column,
--> 580 **kwargs,
581 )
582

/srv/conda/envs/notebook/lib/python3.7/site-packages/dask/dataframe/io/csv.py in read_pandas(reader, urlpath, blocksize, collection, lineterminator, compression, sample, enforce, assume_missing, storage_options, include_path_column, **kwargs)
444
445 # Use sample to infer dtypes and check for presence of include_path_column
--> 446 head = reader(BytesIO(b_sample), **kwargs)
447 if include_path_column and (include_path_column in head.columns):
448 raise ValueError(

/srv/conda/envs/notebook/lib/python3.7/site-packages/pandas/io/parsers.py in parser_f(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, skipfooter, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, doublequote, escapechar, comment, encoding, dialect, error_bad_lines, warn_bad_lines, delim_whitespace, low_memory, memory_map, float_precision)
674 )
675
--> 676 return _read(filepath_or_buffer, kwds)
677
678 parser_f.name = name

/srv/conda/envs/notebook/lib/python3.7/site-packages/pandas/io/parsers.py in _read(filepath_or_buffer, kwds)
452
453 try:
--> 454 data = parser.read(nrows)
455 finally:
456 parser.close()

/srv/conda/envs/notebook/lib/python3.7/site-packages/pandas/io/parsers.py in read(self, nrows)
1131 def read(self, nrows=None):
1132 nrows = _validate_integer("nrows", nrows)
-> 1133 ret = self._engine.read(nrows)
1134
1135 # May alter columns / col_dict

/srv/conda/envs/notebook/lib/python3.7/site-packages/pandas/io/parsers.py in read(self, nrows)
2035 def read(self, nrows=None):
2036 try:
-> 2037 data = self._reader.read(nrows)
2038 except StopIteration:
2039 if self._first_chunk:

pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader.read()

pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._read_low_memory()

pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._read_rows()

pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._tokenize_rows()

pandas/_libs/parsers.pyx in pandas._libs.parsers.raise_parser_error()

ParserError: Error tokenizing data. C error: EOF inside string starting at row 172

Environment:

  • Dask version: 2021.05.0 (and also 2.17.2)
  • Python version: 3.7
  • Operating System: Pangeo Binder (JupyterHub on the Cloud)
  • Install method (conda, pip, source): conda, conda-forge
@jrbourbeau
Copy link
Member

Thanks for raising an issue @paigem! It looks like the issue is arising when we pass a sample of the data to pandas so we can infer dtypes. cc @martindurant if you get a moment

@martindurant
Copy link
Member

Ah, after some prodding, I get it - the "sample", which terminates on a newline, but inside a quoted string. This should be fixable by setting sample=False, but I believe that the server is also incorrectly reporting the size of the file, because it is being included as an HTTP attachment:

response.headers['Content-Disposition'] == 'attachment; filename=Smithsonian_VOTW_Holocene_Volcanoes.csv'

and the size of the preamble is being included in the apparent size.

So there are two issues:

  • sample is doing the wrong thing in dd.read_csv when blocksize=None. We indicate that splitting the file isn't OK, but try to anyway. Futhermore, fsspec's cat method might be more appropriate than open/read
  • fsspec's HTTPFileSystem apparently doesn't handle this "attachment" case (i.e., multi-part HTTP), which I haven't seen before in this context.

@vmussa
Copy link

vmussa commented May 28, 2021

Hello everyone. I'm getting the same error as @paigem's while trying to export a DataFrame with to_parquet() after applying dask.dataframe.read_csv() to a 210MB CSV file with multiple EOFs between quotes throughout it. I've already tested reading it with Pandas and then exporting it to a parquet file and everything works fine. But I need to do it with Dask because of my low memory production environment.

I can't provide the sample file as it has sensible data, but I can generate an example later if needed. I already tried @martindurant's tip of setting sample=False, but it gives me the same error. Here is my code:

import dask.dataframe as dd
df = dd.read_csv(filename)
df.to_parquet('test.parquet')
And here is the ParseError --------------------------------------------------------------------------- ParserError Traceback (most recent call last) in 1 df = dd.read_csv(filename) ----> 2 df.to_parquet('test.parquet')

~/.virtualenvs/bigdata/lib/python3.8/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
4339 from .io import to_parquet
4340
-> 4341 return to_parquet(self, path, *args, **kwargs)
4342
4343 @derived_from(pd.DataFrame)

~/.virtualenvs/bigdata/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, overwrite, ignore_divisions, partition_on, storage_options, custom_metadata, write_metadata_file, compute, compute_kwargs, schema, **kwargs)
646 if compute_kwargs is None:
647 compute_kwargs = dict()
--> 648 out = out.compute(**compute_kwargs)
649 return out
650

~/.virtualenvs/bigdata/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
283 dask.base.compute
284 """
--> 285 (result,) = compute(self, traverse=False, **kwargs)
286 return result
287

~/.virtualenvs/bigdata/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
565 postcomputes.append(x.dask_postcompute())
566
--> 567 results = schedule(dsk, keys, **kwargs)
568 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
569

~/.virtualenvs/bigdata/lib/python3.8/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
77 pool = MultiprocessingPoolExecutor(pool)
78
---> 79 results = get_async(
80 pool.submit,
81 pool._max_workers,

~/.virtualenvs/bigdata/lib/python3.8/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
512 _execute_task(task, data) # Re-execute locally
513 else:
--> 514 raise_exception(exc, tb)
515 res, worker_id = loads(res_info)
516 state["cache"][key] = res

~/.virtualenvs/bigdata/lib/python3.8/site-packages/dask/local.py in reraise(exc, tb)
323 if exc.traceback is not tb:
324 raise exc.with_traceback(tb)
--> 325 raise exc
326
327

~/.virtualenvs/bigdata/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
221 try:
222 task, data = loads(task_info)
--> 223 result = _execute_task(task, data)
224 id = get_id()
225 result = dumps((result, id))

~/.virtualenvs/bigdata/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg

~/.virtualenvs/bigdata/lib/python3.8/site-packages/dask/optimization.py in call(self, *args)
964 if not len(args) == len(self.inkeys):
965 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 966 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
967
968 def reduce(self):

~/.virtualenvs/bigdata/lib/python3.8/site-packages/dask/core.py in get(dsk, out, cache)
149 for key in toposort(dsk):
150 task = dsk[key]
--> 151 result = _execute_task(task, cache)
152 cache[key] = result
153 result = _execute_task(out, cache)

~/.virtualenvs/bigdata/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg

~/.virtualenvs/bigdata/lib/python3.8/site-packages/dask/dataframe/io/csv.py in call(self, part)
114
115 # Call pandas_read_text
--> 116 df = pandas_read_text(
117 self.reader,
118 block,

~/.virtualenvs/bigdata/lib/python3.8/site-packages/dask/dataframe/io/csv.py in pandas_read_text(reader, b, header, kwargs, dtypes, columns, write_header, enforce, path)
167 bio.write(b)
168 bio.seek(0)
--> 169 df = reader(bio, **kwargs)
170 if dtypes:
171 coerce_dtypes(df, dtypes)

~/.virtualenvs/bigdata/lib/python3.8/site-packages/pandas/io/parsers.py in read_csv(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, skipfooter, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, doublequote, escapechar, comment, encoding, dialect, error_bad_lines, warn_bad_lines, delim_whitespace, low_memory, memory_map, float_precision, storage_options)
608 kwds.update(kwds_defaults)
609
--> 610 return _read(filepath_or_buffer, kwds)
611
612

~/.virtualenvs/bigdata/lib/python3.8/site-packages/pandas/io/parsers.py in _read(filepath_or_buffer, kwds)
466
467 with parser:
--> 468 return parser.read(nrows)
469
470

~/.virtualenvs/bigdata/lib/python3.8/site-packages/pandas/io/parsers.py in read(self, nrows)
1055 def read(self, nrows=None):
1056 nrows = validate_integer("nrows", nrows)
-> 1057 index, columns, col_dict = self._engine.read(nrows)
1058
1059 if index is None:

~/.virtualenvs/bigdata/lib/python3.8/site-packages/pandas/io/parsers.py in read(self, nrows)
2059 def read(self, nrows=None):
2060 try:
-> 2061 data = self._reader.read(nrows)
2062 except StopIteration:
2063 if self._first_chunk:

pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader.read()

pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._read_low_memory()

pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._read_rows()

pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._tokenize_rows()

pandas/_libs/parsers.pyx in pandas._libs.parsers.raise_parser_error()

ParserError: Error tokenizing data. C error: EOF inside string starting at row 193429

Environment info

  • Python 3.8.5
  • Dask 2021.5.0
  • Operating System: Ubuntu 20.04 (Droplet@DigitalOcean, 1GB RAM)
  • Install method: pip/PyPI

@martindurant
Copy link
Member

I wonder if, instead of getting a sample of some block of bytes, we should open the file here and call pd.read_csv(rows=small_number) explicitly and let pandas sort it out. @jrbourbeau , does that sound reasonable?

@jrbourbeau
Copy link
Member

Hmm that sounds sensible. Would that require a more recent version of pandas that use fsspec internally? If so, do you know how long that release has been out for?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants