Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug writing to parquet #3660

Closed
henriqueribeiro opened this issue Jun 22, 2018 · 7 comments · Fixed by #3833
Closed

Bug writing to parquet #3660

henriqueribeiro opened this issue Jun 22, 2018 · 7 comments · Fixed by #3833

Comments

@henriqueribeiro
Copy link
Contributor

henriqueribeiro commented Jun 22, 2018

Hello,

import dask.dataframe as dd
df = dd.read_parquet('test_dataframe')
df = df.resample('10s').mean()
df.to_parquet('new_dataframe')

Running the code above on dask version 0.17.5, everything works fine but when running it with dask version 0.18.0 it fails with the error:

ValueError: The columns in the computed data do not match the columns in the provided metadata

Edit: See 3rd comment for a minimal reproducible example.

@mrocklin
Copy link
Member

Thanks for the bug report @henriqueribeiro

I suspect that it will be easier for people to resolve if you are able to provide a minimal reproducible example. http://matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reports

@henriqueribeiro
Copy link
Contributor Author

Hey @mrocklin,

Here it is a minimal reproducible example

import numpy as np
import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({'time': pd.date_range('1980-01-01', periods=20, freq='1min'),
                   'data1': np.random.normal(size=20),
                   'data2': np.random.normal(size=20),
                   })
df = df.set_index('time')
ddf = dd.from_pandas(df, npartitions=2)
ddf.to_parquet('dummy_df')

ddf2 = dd.read_parquet('dummy_df')
ddf2 = ddf2.resample('1min').mean()
ddf2.to_parquet('new_df')

I get a ValueError when writing ddf2 to parquet:

ddf2.to_parquet('new_df')

Traceback

ValueError                                Traceback (most recent call last)
<ipython-input-15-beb91111f173> in <module>()
      1 ddf2 = dd.read_parquet('dummy_df')
      2 ddf2 = ddf2.resample('1min').mean()
----> 3 ddf2.to_parquet('new_df')

/opt/conda/lib/python3.6/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
   1084         """ See dd.to_parquet docstring for more information """
   1085         from .io import to_parquet
-> 1086         return to_parquet(self, path, *args, **kwargs)
   1087 
   1088     def to_csv(self, filename, **kwargs):

/opt/conda/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in to_parquet(df, path, engine, compression, write_index, append, ignore_divisions, partition_on, storage_options, compute, **kwargs)
   1076 
   1077     if compute:
-> 1078         out.compute()
   1079         return None
   1080     return out

/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    400     keys = [x.__dask_keys__() for x in collections]
    401     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 402     results = schedule(dsk, keys, **kwargs)
    403     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    404 

/opt/conda/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
     73     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     74                         cache=cache, get_id=_thread_get_id,
---> 75                         pack_exception=pack_exception, **kwargs)
     76 
     77     # Cleanup pools associated to dead threads

/opt/conda/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    519                         _execute_task(task, data)  # Re-execute locally
    520                     else:
--> 521                         raise_exception(exc, tb)
    522                 res, worker_id = loads(res_info)
    523                 state['cache'][key] = res

/opt/conda/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
     67         if exc.__traceback__ is not tb:
     68             raise exc.with_traceback(tb)
---> 69         raise exc
     70 
     71 else:

/opt/conda/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    288     try:
    289         task, data = loads(task_info)
--> 290         result = _execute_task(task, data)
    291         id = get_id()
    292         result = dumps((result, id))

/opt/conda/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk)
    269         func, args = arg[0], arg[1:]
    270         args2 = [_execute_task(a, cache) for a in args]
--> 271         return func(*args2)
    272     elif not ishashable(arg):
    273         return arg

/opt/conda/lib/python3.6/site-packages/dask/dataframe/core.py in apply_and_enforce(func, args, kwargs, meta)
   3561             if not np.array_equal(np.nan_to_num(meta.columns),
   3562                                   np.nan_to_num(df.columns)):
-> 3563                 raise ValueError("The columns in the computed data do not match"
   3564                                  " the columns in the provided metadata")
   3565             else:

ValueError: The columns in the computed data do not match the columns in the provided metadata

Versions

I used das 0.18.0 and, as said before, this doesn't happen when using dask with version 0.17.5.

@martindurant
Copy link
Member

This does not appear to have anything to do with parquet.

> ddf2.compute().resample('1min').mean()
                        data1     data2
time
1980-01-01 00:00:00 -0.276964  0.511200
1980-01-01 00:01:00  0.541851  0.245502
..
> ddf2.resample('1min').mean().compute()
Out[70]:
                        data1     data2
1980-01-01 00:00:00 -0.276964  0.511200
1980-01-01 00:01:00  0.541851  0.245502
..

somehow in the dask version, the index loses its name. The _meta gets the correct pandas column names, resulting in the mismatch. This is a bug.
A workaround would be to set the _meta's index name to that in the partitions (ddf3._meta.index.name = None) or to work on the partitions

def set_index_name(df):
    df.index.name = 'time'
    return df

ddf2.map_partitions(set_index_name, meta=ddf3._meta).to_parquet('new_df')

@martindurant
Copy link
Member

@TomAugspurger , perhaps?

@TomAugspurger
Copy link
Member

TomAugspurger commented Jul 13, 2018

Maybe a pandas bug with Index.reindex dropping the name:

In [12]: idx = pd.date_range('1980-01-01T00:00:00', '1980-01-01T00:10:00', freq='2T', name='name')

In [13]: idx
Out[13]:
DatetimeIndex(['1980-01-01 00:00:00', '1980-01-01 00:02:00',
               '1980-01-01 00:04:00', '1980-01-01 00:06:00',
               '1980-01-01 00:08:00', '1980-01-01 00:10:00'],
              dtype='datetime64[ns]', name='name', freq='2T')

In [14]: idx.reindex(pd.date_range('1980-01-01T00:00:00', '1980-01-01T00:10:00', freq='T'))[0].name
# None

@TomAugspurger
Copy link
Member

TomAugspurger commented Jul 13, 2018

pandas issue: pandas-dev/pandas#9885

In short, it's not clear whether index.reindex(Index) should retain the name, so dask should manually ensure that it's passed along.

@henriqueribeiro you're hitting

def _resample_series(series, start, end, reindex_closed, rule,
resample_kwargs, how, fill_value, how_args, how_kwargs):
out = getattr(series.resample(rule, **resample_kwargs), how)(*how_args, **how_kwargs)
return out.reindex(pd.date_range(start, end, freq=rule,
closed=reindex_closed),
fill_value=fill_value)
, if you want to work on a pull request.

@henriqueribeiro
Copy link
Contributor Author

Sure! I will work on it. Give me a few days

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants