Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

It does not work in a dask cluster #104

Closed
james-tn opened this issue Sep 21, 2020 · 3 comments
Closed

It does not work in a dask cluster #104

james-tn opened this issue Sep 21, 2020 · 3 comments

Comments

@james-tn
Copy link

What happened:
The library runs when dask is on a single machine. But when I run it in a dask cluster, error occured.

KilledWorker Traceback (most recent call last)
in
----> 1 ddf.head()

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
1004 Whether to compute the result, default is True.
1005 """
-> 1006 return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
1007
1008 def _head(self, n, npartitions, compute, safe):

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/dask/dataframe/core.py in _head(self, n, npartitions, compute, safe)
1037
1038 if compute:
-> 1039 result = result.compute()
1040 return result
1041

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
165 dask.base.compute
166 """
--> 167 (result,) = compute(self, traverse=False, **kwargs)
168 return result
169

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
450 postcomputes.append(x.dask_postcompute())
451
--> 452 results = schedule(dsk, keys, **kwargs)
453 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
454

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2712 should_rejoin = False
2713 try:
-> 2714 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2715 finally:
2716 for f in futures.values():

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1991 direct=direct,
1992 local_worker=local_worker,
-> 1993 asynchronous=asynchronous,
1994 )
1995

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
832 else:
833 return sync(
--> 834 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
835 )
836

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
337 if error[0]:
338 typ, exc, tb = error[0]
--> 339 raise exc.with_traceback(tb)
340 else:
341 return result[0]

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/distributed/utils.py in f()
321 if callback_timeout is not None:
322 future = asyncio.wait_for(future, callback_timeout)
--> 323 result[0] = yield future
324 except Exception as exc:
325 error[0] = sys.exc_info()

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()

/anaconda/envs/azureml_py36/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1850 exc = CancelledError(key)
1851 else:
-> 1852 raise exception.with_traceback(traceback)
1853 raise exc
1854 if errors == "skip":

KilledWorker: ("('read-parquet-head-1-5-read-parquet-ccef788ad675d1ca53552831cb61f745', 0)", <Worker 'tcp://10.0.0.5:37793', name: tcp://10.0.0.5:37793, memory: 0, processing: 1>)

What you expected to happen:
Should run in a multi-node cluster
Minimal Complete Verifiable Example:

# Put your MCVE code here

import dask.dataframe as dd
STORAGE_OPTIONS={'account_name': 'someaccount', 'account_key': 'some_key'}
ddf = dd.read_parquet('abfs://mltraining/ISDWeather///*.parquet', storage_options=STORAGE_OPTIONS)

#ok up to this point

#103 Then when running with head, error happened
ddf.head()

Anything else we need to know?:

Environment:

  • Dask version: 2.27
  • Python version: 3.6
  • Operating System: Ubuntu
  • Install method (conda, pip, source):
    pip
@james-tn
Copy link
Author

No, it actually is the issue. It can only run in standalone mode

@james-tn james-tn reopened this Sep 22, 2020
@andersbogsnes
Copy link
Contributor

Hi @james-tn, just chiming in as I've had similar issues - did you check the logs from the workers? My issues have been because the workers didn't have the correct packages installed, which I only noticed by looking directly at the worker logs

@james-tn
Copy link
Author

Yes, It actually was because of missing some package at the worker. I corrected it and it worked

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants