Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save error when using lazy data and dask distributed scheduler #3157

Closed
jvegreg opened this issue Sep 4, 2018 · 3 comments
Closed

Save error when using lazy data and dask distributed scheduler #3157

jvegreg opened this issue Sep 4, 2018 · 3 comments

Comments

@jvegreg
Copy link

jvegreg commented Sep 4, 2018

Hi

I am having an error when using Iris and dask distributed or processes scheduler (not the threaded one). Look at the following code:

import iris
import dask.multiprocessing

from dask.distributed import Client
print('Starting client ...')
# client = Client(scheduler_file='/home/Earth/jvegas/scheduler.json')
dask.config.set(scheduler='processes')
filepath = '/esnas/exp/ecmwf/system4_m1/6hourly/tas/tas_19810101.nc'
cube = iris.load_cube(filepath, '2 metre temperature')

print('Saving...')
iris.save(cube, '/home/Earth/jvegas/temp.nc')

Independently of the netcdf file used for testing, it fails with the following traceback:

Traceback (most recent call last):
  File "test.py", line 14, in <module>
    iris.save(cube, '/home/Earth/jvegas/temp.nc')
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/site-packages/iris/io/__init__.py", line 407, in save
    saver(source, target, **kwargs)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/site-packages/iris/fileformats/netcdf.py", line 2343, in save
    fill_value=fill_value)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/site-packages/iris/fileformats/netcdf.py", line 977, in write
    fill_value=fill_value)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/site-packages/iris/fileformats/netcdf.py", line 2013, in _create_cf_data_variable
    fill_value_to_check)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/site-packages/iris/fileformats/netcdf.py", line 1990, in store
    da.store([data], [target])
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/site-packages/dask/array/core.py", line 1019, in store
    result.compute(**kwargs)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/site-packages/dask/base.py", line 395, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/site-packages/dask/multiprocessing.py", line 172, in get
    raise_exception=reraise, **kwargs)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/site-packages/dask/local.py", line 488, in get_async
    fire_task()
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/site-packages/dask/local.py", line 482, in fire_task
    args=(key, dumps((dsk[key], data)),
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/site-packages/dask/multiprocessing.py", line 25, in _dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 917, in dumps
    cp.dump(obj)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 268, in dump
    return Pickler.dump(self, obj)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/pickle.py", line 409, in dump
    self.save(obj)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/pickle.py", line 736, in save_tuple
    save(element)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/pickle.py", line 751, in save_tuple
    save(element)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/pickle.py", line 634, in save_reduce
    save(state)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/home/Earth/jvegas/.conda/envs/iris2/lib/python3.6/pickle.py", line 496, in save
    rv = reduce(self.proto)
  File "netCDF4/_netCDF4.pyx", line 5129, in netCDF4._netCDF4.Variable.__reduce__
NotImplementedError: Variable is not picklable

Iris was installed with conda and versions are:
dask 0.19.0 py_0 conda-forge
dask-core 0.19.0 py_0 conda-forge
distributed 1.23.0 py36_0 conda-forge
iris 2.1.0 py36_3 conda-forge

I also tested with other versions of dask and got the same error

@DPeterK
Copy link
Member

DPeterK commented Oct 9, 2018

Hi @jvegasbsc, thanks for contacting us with this issue you've hit, and apologies it's taken us a little while to get back to you. The problem you've hit here is that the save is trying to run in parallel. This means that different parts of the NetCDF file to be are being handled by different processes, and eventually these different processes will need to be pulled together, as only a single process can be used to write the NetCDF file.

It's at this point that the error occurs. For the save to happen in only a single process all the work done in the other processes needs to be pulled together, and this is done by communicating the intermediate results between workers. Dask does this by pickling the intermediate results in preparation for transmitting from one worker to the next. The problem is that NetCDF objects can't be pickled, which is what causes the error above to be raised.

The solution for this is to not parallelise the save step, but instead run it in serial. This can be done with a context manager:

print('Saving...')
with dask.config.set(scheduler='synchronous'):
    iris.save(cube, '/home/Earth/jvegas/temp.nc')

Note that you may need to edit the name of the scheduler based on the specific version of dask you're using. For some reason scheduler names are very ripe for change between dask versions!

@jvegreg
Copy link
Author

jvegreg commented Oct 10, 2018

Thanks. It works now

@DPeterK
Copy link
Member

DPeterK commented Oct 10, 2018

Hi @jvegasbsc - great to hear it's working now and glad that we were able to help you out with getting to the bottom of this!

@DPeterK DPeterK closed this as completed Oct 10, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants