Reading from Parquet is failing with PyArrow 0.13. Downgrading to PyArrow 0.12.1 seems to fix the problem. I've only encountered this when using the distributed client. Using a Dask dataframe by itself does not appear to be affected.
from distributed import LocalCluster, Client
import dask.dataframe as dd
client = Client(LocalCluster(diagnostics_port=('0.0.0.0', 8889), n_workers = 4))
ddf = dd.read_parquet('DATA/parquet/', engine = 'pyarrow')
ddf.set_index('index')
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 54, in dumps
for key, value in data.items()
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 55, in <dictcomp>
if type(value) is Serialize}
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 163, in serialize
raise TypeError(msg, str(x)[:10000])
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/distributed/batched.py", line 94, in _background_send
on_error='raise')
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
value = future.result()
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 736, in run
yielded = self.gen.throw(*exc_info) # type: ignore
File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 224, in write
'recipient': self._peer_addr})
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
value = future.result()
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 736, in run
yielded = self.gen.throw(*exc_info) # type: ignore
File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py", line 50, in to_frames
res = yield offload(_to_frames)
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
value = future.result()
File "/opt/conda/lib/python3.6/concurrent/futures/_base.py", line 425, in result
return self.__get_result()
File "/opt/conda/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/opt/conda/lib/python3.6/concurrent/futures/thread.py", line 56, in run
result = self.fn(*self.args, **self.kwargs)
File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py", line 43, in _to_frames
context=context))
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 54, in dumps
for key, value in data.items()
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 55, in <dictcomp>
if type(value) is Serialize}
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 163, in serialize
raise TypeError(msg, str(x)[:10000])
from distributed import LocalCluster, Client
import dask.dataframe as dd
client = Client(LocalCluster(diagnostics_port=('0.0.0.0', 8889), n_workers = 4))
ddf = dd.read_parquet('DATA/parquet/', engine = 'pyarrow')
ddf.compute()
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
37 try:
---> 38 result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
39 if len(result) < 1000:
AttributeError: Can't pickle local object 'ParquetDataset._get_open_file_func.<locals>.open_file'
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
<ipython-input-3-a90291ebde1e> in <module>
1 ft_inp_ddf = dd.read_parquet('DATA/ft_14_15_inputs_parquet/', engine = 'pyarrow')
----> 2 ft_inp_ddf.compute()
/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
/opt/conda/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2318 retries=retries,
2319 user_priority=priority,
-> 2320 actors=actors,
2321 )
2322 packed = pack_data(keys, futures)
/opt/conda/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
2259
2260 self._send_to_scheduler({'op': 'update-graph',
-> 2261 'tasks': valmap(dumps_task, dsk3),
2262 'dependencies': dependencies,
2263 'keys': list(flatkeys),
/opt/conda/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/opt/conda/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/opt/conda/lib/python3.6/site-packages/distributed/worker.py in dumps_task(task)
2769 elif not any(map(_maybe_complex, task[1:])):
2770 return {'function': dumps_function(task[0]),
-> 2771 'args': warn_dumps(task[1:])}
2772 return to_serialize(task)
2773
/opt/conda/lib/python3.6/site-packages/distributed/worker.py in warn_dumps(obj, dumps, limit)
2778 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
2779 """ Dump an object to bytes, warn if those bytes are large """
-> 2780 b = dumps(obj)
2781 if not _warn_dumps_warned[0] and len(b) > limit:
2782 _warn_dumps_warned[0] = True
/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
49 except Exception:
50 try:
---> 51 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
52 except Exception as e:
53 logger.info("Failed to serialize %s. Exception: %s", x, e)
/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
959 try:
960 cp = CloudPickler(file, protocol=protocol)
--> 961 cp.dump(obj)
962 return file.getvalue()
963 finally:
/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
265 self.inject_addons()
266 try:
--> 267 return Pickler.dump(self, obj)
268 except RuntimeError as e:
269 if 'recursion' in e.args[0]:
/opt/conda/lib/python3.6/pickle.py in dump(self, obj)
407 if self.proto >= 4:
408 self.framer.start_framing()
--> 409 self.save(obj)
410 self.write(STOP)
411 self.framer.end_framing()
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/opt/conda/lib/python3.6/pickle.py in save_tuple(self, obj)
749 write(MARK)
750 for element in obj:
--> 751 save(element)
752
753 if id(obj) in memo:
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
/opt/conda/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
632
633 if state is not None:
--> 634 save(state)
635 write(BUILD)
636
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
398 # func is nested
399 if lookedup_by_name is None or lookedup_by_name is not obj:
--> 400 self.save_function_tuple(obj)
401 return
402
/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
592 if hasattr(func, '__qualname__'):
593 state['qualname'] = func.__qualname__
--> 594 save(state)
595 write(pickle.TUPLE)
596 write(pickle.REDUCE) # applies _fill_function on the tuple
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/opt/conda/lib/python3.6/pickle.py in save_list(self, obj)
779
780 self.memoize(obj)
--> 781 self._batch_appends(obj)
782
783 dispatch[list] = save_list
/opt/conda/lib/python3.6/pickle.py in _batch_appends(self, items)
806 write(APPENDS)
807 elif n:
--> 808 save(tmp[0])
809 write(APPEND)
810 # else tmp is empty, and we're done
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
/opt/conda/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
632
633 if state is not None:
--> 634 save(state)
635 write(BUILD)
636
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
494 reduce = getattr(obj, "__reduce_ex__", None)
495 if reduce is not None:
--> 496 rv = reduce(self.proto)
497 else:
498 reduce = getattr(obj, "__reduce__", None)
/opt/conda/lib/python3.6/site-packages/pyarrow/_parquet.cpython-36m-x86_64-linux-gnu.so in pyarrow._parquet.ParquetSchema.__reduce_cython__()
TypeError: no default __reduce__ due to non-trivial __cinit__
Reading from Parquet is failing with PyArrow 0.13. Downgrading to PyArrow 0.12.1 seems to fix the problem. I've only encountered this when using the distributed client. Using a Dask dataframe by itself does not appear to be affected.
For example,
Gives
Similarly,
Causes this error