Skip to content

Pickling error when reading multiple parquet files from Azure storage account with distributed client #6060

@mitchellwood

Description

@mitchellwood

An exception is thrown when loading parquet files into a ddf from Azure storage account. This only happens when using a distributed client, and it doesn't happen when using a local filesystem. The stack trace indicates it may be a problem pickling the data. I have not tested other cloud filesystems, so I'm not sure if this is only a problem with ADLS.

I have provided an MRE with sample dataset: test_parquet_dataset.zip. The dataset was created using the pyarrow library and employs pyarrow's native partitioning, however I have tested this using unpartitioned parquet files, reading them using glob wildcards in the path, and the same issue occurs. Python version is pinned to 3.7.

import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from azure.identity.aio import DefaultAzureCredential

cluster = LocalCluster()
client = Client(cluster)

def get_storage_options() -> dict:
    account_name = '<account_name>'
    credential = DefaultAzureCredential()
    storage_options = dict(
        account_name=account_name,
        credential=credential,
    )
    return storage_options

path = 'az://<storage_container>/test_parquet_dataset'

df = dd.read_parquet(path, storage_options=get_storage_options())

df.compute()

The last line throws the exception with the following stacktrace:

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
/opt/conda/envs/quantdev/lib/python3.7/site-packages/distributed/worker.py in dumps_function(func)
   4441         with _cache_lock:
-> 4442             result = cache_dumps[func]
   4443     except KeyError:

/opt/conda/envs/quantdev/lib/python3.7/site-packages/distributed/utils.py in __getitem__(self, key)
   1359     def __getitem__(self, key):
-> 1360         value = super().__getitem__(key)
   1361         self.data.move_to_end(key)

/opt/conda/envs/quantdev/lib/python3.7/collections/__init__.py in __getitem__(self, key)
   1026             return self.__class__.__missing__(self, key)
-> 1027         raise KeyError(key)
   1028     def __setitem__(self, key, item): self.data[key] = item

KeyError: subgraph_callable-345fdcd0-fc5e-4e79-b6a9-f39c26562d7b

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
/opt/conda/envs/quantdev/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     48         buffers.clear()
---> 49         result = pickle.dumps(x, **dump_kwargs)
     50         if len(result) < 1000:

TypeError: can't pickle _thread.RLock objects

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_2922/1537055205.py in <module>
----> 1 df.compute()

/opt/conda/envs/quantdev/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    288         dask.base.compute
    289         """
--> 290         (result,) = compute(self, traverse=False, **kwargs)
    291         return result
    292 

/opt/conda/envs/quantdev/lib/python3.7/site-packages/dask/base.py in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    571         postcomputes.append(x.__dask_postcompute__())
    572 
--> 573     results = schedule(dsk, keys, **kwargs)
    574     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    575 

/opt/conda/envs/quantdev/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2981             retries=retries,
   2982             user_priority=priority,
-> 2983             actors=actors,
   2984         )
   2985         packed = pack_data(keys, futures)

/opt/conda/envs/quantdev/lib/python3.7/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
   2880             # Pack the high level graph before sending it to the scheduler
   2881             keyset = set(keys)
-> 2882             dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
   2883 
   2884             # Create futures before sending graph (helps avoid contention)

/opt/conda/envs/quantdev/lib/python3.7/site-packages/dask/highlevelgraph.py in __dask_distributed_pack__(self, client, client_keys, annotations)
   1056                         self.key_dependencies,
   1057                         client,
-> 1058                         client_keys,
   1059                     ),
   1060                     "annotations": layer.__dask_distributed_annotations_pack__(

/opt/conda/envs/quantdev/lib/python3.7/site-packages/dask/blockwise.py in __dask_distributed_pack__(self, all_hlg_keys, known_key_dependencies, client, client_keys)
    576             to_serialize(dsk[0])
    577             if (self.concatenate or inline_tasks)
--> 578             else dumps_function(dsk[0])
    579         )
    580         func_future_args = dsk[1:]

/opt/conda/envs/quantdev/lib/python3.7/site-packages/distributed/worker.py in dumps_function(func)
   4442             result = cache_dumps[func]
   4443     except KeyError:
-> 4444         result = pickle.dumps(func, protocol=4)
   4445         if len(result) < 100000:
   4446             with _cache_lock:

/opt/conda/envs/quantdev/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     58         try:
     59             buffers.clear()
---> 60             result = cloudpickle.dumps(x, **dump_kwargs)
     61         except Exception as e:
     62             logger.info("Failed to serialize %s. Exception: %s", x, e)

/opt/conda/envs/quantdev/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol)
    100         with io.BytesIO() as file:
    101             cp = CloudPickler(file, protocol=protocol)
--> 102             cp.dump(obj)
    103             return file.getvalue()
    104 

/opt/conda/envs/quantdev/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    600     def dump(self, obj):
    601         try:
--> 602             return Pickler.dump(self, obj)
    603         except RuntimeError as e:
    604             if "recursion" in e.args[0]:

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in dump(self, obj)
    435         if self.proto >= 4:
    436             self.framer.start_framing()
--> 437         self.save(obj)
    438         self.write(STOP)
    439         self.framer.end_framing()

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    636         else:
    637             save(func)
--> 638             save(args)
    639             write(REDUCE)
    640 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_tuple(self, obj)
    787         write(MARK)
    788         for element in obj:
--> 789             save(element)
    790 
    791         if id(obj) in memo:

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_dict(self, obj)
    857 
    858         self.memoize(obj)
--> 859         self._batch_setitems(obj.items())
    860 
    861     dispatch[dict] = save_dict

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in _batch_setitems(self, items)
    888                 k, v = tmp[0]
    889                 save(k)
--> 890                 save(v)
    891                 write(SETITEM)
    892             # else tmp is empty, and we're done

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_tuple(self, obj)
    772         if n <= 3 and self.proto >= 2:
    773             for element in obj:
--> 774                 save(element)
    775             # Subtle.  Same as in the big comment below.
    776             if id(obj) in memo:

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    660 
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)
    664 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_dict(self, obj)
    857 
    858         self.memoize(obj)
--> 859         self._batch_setitems(obj.items())
    860 
    861     dispatch[dict] = save_dict

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in _batch_setitems(self, items)
    883                 for k, v in tmp:
    884                     save(k)
--> 885                     save(v)
    886                 write(SETITEMS)
    887             elif n:

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    636         else:
    637             save(func)
--> 638             save(args)
    639             write(REDUCE)
    640 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_tuple(self, obj)
    772         if n <= 3 and self.proto >= 2:
    773             for element in obj:
--> 774                 save(element)
    775             # Subtle.  Same as in the big comment below.
    776             if id(obj) in memo:

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_dict(self, obj)
    857 
    858         self.memoize(obj)
--> 859         self._batch_setitems(obj.items())
    860 
    861     dispatch[dict] = save_dict

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in _batch_setitems(self, items)
    883                 for k, v in tmp:
    884                     save(k)
--> 885                     save(v)
    886                 write(SETITEMS)
    887             elif n:

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    660 
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)
    664 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_dict(self, obj)
    857 
    858         self.memoize(obj)
--> 859         self._batch_setitems(obj.items())
    860 
    861     dispatch[dict] = save_dict

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in _batch_setitems(self, items)
    883                 for k, v in tmp:
    884                     save(k)
--> 885                     save(v)
    886                 write(SETITEMS)
    887             elif n:

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    660 
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)
    664 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_dict(self, obj)
    857 
    858         self.memoize(obj)
--> 859         self._batch_setitems(obj.items())
    860 
    861     dispatch[dict] = save_dict

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in _batch_setitems(self, items)
    888                 k, v = tmp[0]
    889                 save(k)
--> 890                 save(v)
    891                 write(SETITEM)
    892             # else tmp is empty, and we're done

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    660 
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)
    664 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_dict(self, obj)
    857 
    858         self.memoize(obj)
--> 859         self._batch_setitems(obj.items())
    860 
    861     dispatch[dict] = save_dict

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in _batch_setitems(self, items)
    883                 for k, v in tmp:
    884                     save(k)
--> 885                     save(v)
    886                 write(SETITEMS)
    887             elif n:

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    660 
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)
    664 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_dict(self, obj)
    857 
    858         self.memoize(obj)
--> 859         self._batch_setitems(obj.items())
    860 
    861     dispatch[dict] = save_dict

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in _batch_setitems(self, items)
    883                 for k, v in tmp:
    884                     save(k)
--> 885                     save(v)
    886                 write(SETITEMS)
    887             elif n:

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    660 
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)
    664 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save_dict(self, obj)
    857 
    858         self.memoize(obj)
--> 859         self._batch_setitems(obj.items())
    860 
    861     dispatch[dict] = save_dict

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in _batch_setitems(self, items)
    883                 for k, v in tmp:
    884                     save(k)
--> 885                     save(v)
    886                 write(SETITEMS)
    887             elif n:

/opt/conda/envs/quantdev/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    522             reduce = getattr(obj, "__reduce_ex__", None)
    523             if reduce is not None:
--> 524                 rv = reduce(self.proto)
    525             else:
    526                 reduce = getattr(obj, "__reduce__", None)

TypeError: can't pickle _thread.RLock objects

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions