I am running into a strange problem, when trying to implement a support for a custom storage, and then run this on dask cluster.
client.get_versions(check=True)
succeeds.
I use the following (simplified) code to read csv
df = dd.read_csv('adl://XYZ/ExhibitionData.csv')
adl is a new backend that I support. It works perfectly on a local machine, using df.persist().
But the following fails
from dask.distributed import Client
client = Client('dscheduler:8786')
client.restart()
x = client.persist(df)
Here is what I see on the worker:
2017-07-21T07:13:41.001574000Z distributed.worker - WARNING - Could not deserialize task
2017-07-21T07:13:41.001701000Z Traceback (most recent call last):
2017-07-21T07:13:41.001857000Z File "/work/miniconda/lib/python3.5/site-packages/distributed/worker.py", line 1135, in add_task
2017-07-21T07:13:41.001984000Z self.tasks[key] = _deserialize(function, args, kwargs, task)
2017-07-21T07:13:41.002119000Z File "/work/miniconda/lib/python3.5/site-packages/distributed/worker.py", line 590, in _deserialize
2017-07-21T07:13:41.002246000Z args = pickle.loads(args)
2017-07-21T07:13:41.002372000Z File "/work/miniconda/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads
2017-07-21T07:13:41.002497000Z return pickle.loads(x)
2017-07-21T07:13:41.002622000Z File "/work/miniconda/lib/python3.5/site-packages/azure/datalake/store/core.py", line 88, in __setstate__
2017-07-21T07:13:41.002763000Z self.connect()
2017-07-21T07:13:41.002884000Z File "/work/miniconda/lib/python3.5/site-packages/azure/datalake/store/core.py", line 83, in connect
2017-07-21T07:13:41.003167000Z self.azure = DatalakeRESTInterface(token=self.token, **self.kwargs)
2017-07-21T07:13:41.003293000Z File "/work/miniconda/lib/python3.5/site-packages/azure/datalake/store/lib.py", line 249, in __init__
2017-07-21T07:13:41.003546000Z self.head = {'Authorization': token.signed_session().headers['Authorization']}
2017-07-21T07:13:41.007412000Z AttributeError: 'dict' object has no attribute 'signed_session'
2017-07-21T07:13:41.007556000Z distributed.protocol.pickle - INFO - Failed to deserialize b"\x80\x04\x95\xa3\x10\x00\x00\x00\x00\x00\x00(\x8c\x0fdask.bytes.core\x94\x8c\x08OpenFile\x94\x93\x94)\x81\x94}\x94(\x8c\x08encoding\x94N\x8c\x0bcompression\x94N\x8c\x04mode\x94\x8c\x02rb\x94\x8c\x04text\x94\x89\x8c\x06myopen\x94\x8c\x08builtins\x94\x8c\x07getattr\x94\x93\x94\x8c\x0edask.bytes.adl\x94\x8c\rAdlFileSystem\x94\x93\x94)\x81\x94}\x94(\x8c\x05azure\x94\x8c\x18azure.datalake.store.lib\x94\x8c\x15DatalakeRESTInterface\x94\x93\x94)\x81\x94}\x94(\x8c\x04head\x94}\x94\x8c\rAuthorization\x94X\x81\x04\x00\x00Bearer...
Interesting is this line
2017-07-21T07:13:41.007412000Z AttributeError: 'dict' object has no attribute 'signed_session'
Which is triggered on the following code
self.head = {'Authorization': token.signed_session().headers['Authorization']}
Issue 1: token object got somehow deserialized as a dictionary, rather than a python object.
The token object is a result of this method invocation in my backend
https://github.com/Azure/azure-data-lake-store-python/blob/master/azure/datalake/store/lib.py#L67
class AdlFileSystem(AzureDLFileSystem, core.FileSystem):
"""API spec for the methods a filesystem
A filesystem must provid§e these methods, if it is to be registered as
a backend for dask.
Implementation for Azure Data Lake """
sep = '/'
def __init__(self, tenant_id=None, client_id=None, client_secret=None, store_name=None, **kwargs):
token = lib.auth(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret)
kwargs['store_name'] = store_name
kwargs['token'] = token
AzureDLFileSystem.__init__(self, **kwargs)
Issue 2: why this code should be serialized at all, if it is invoked on each worker?
I am running into a strange problem, when trying to implement a support for a custom storage, and then run this on dask cluster.
client.get_versions(check=True)succeeds.
I use the following (simplified) code to read csv
df = dd.read_csv('adl://XYZ/ExhibitionData.csv')adl is a new backend that I support. It works perfectly on a local machine, using df.persist().
But the following fails
Here is what I see on the worker:
Interesting is this line
2017-07-21T07:13:41.007412000Z AttributeError: 'dict' object has no attribute 'signed_session'
Which is triggered on the following code
self.head = {'Authorization': token.signed_session().headers['Authorization']}Issue 1: token object got somehow deserialized as a dictionary, rather than a python object.
The token object is a result of this method invocation in my backend
https://github.com/Azure/azure-data-lake-store-python/blob/master/azure/datalake/store/lib.py#L67
Issue 2: why this code should be serialized at all, if it is invoked on each worker?