Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error 'Could not serialize object of type _FillValueMaskCheckAndStore' saving computed cube with Dask distributed #4509

Closed
dmcg opened this issue Jan 18, 2022 · 22 comments

Comments

@dmcg
Copy link

dmcg commented Jan 18, 2022

🐛 Bug Report

I don't seem to be able to save a cube that has been computed on a dask cluster.

To be honest, I don't know if I should be able to, but if I could it would be really useful.

How To Reproduce

from dask.distributed import Client
client = Client(n_workers=4)

import iris
cube = iris.load_cube('~/environment/data/000490262cdd067721a34112963bcaa2b44860ab.nc')
assert cube.shape == (18, 33, 960, 1280)
averages = cube.collapsed('realization', iris.analysis.MEAN)
assert type(averages) == iris.cube.Cube
iris.io.save(averages, "delme.nc")

Expected behaviour

File is saved without error. This is the behaviour if I don't start a dask.distributed.Client before invoking Iris.

Environment

  • OS & Version: Amazon Linux 2.3 (Centos)
  • Iris Version: 3.1.0

Some more relevant versions

# packages in environment at /home/ec2-user/miniconda3/envs/iris:
#
# Name                    Version                   Build  Channel
cloudpickle               2.0.0              pyhd8ed1ab_0    conda-forge
dask                      2021.12.0          pyhd8ed1ab_0    conda-forge
dask-core                 2021.12.0          pyhd8ed1ab_0    conda-forge
hdf4                      4.2.15               h10796ff_3    conda-forge
hdf5                      1.12.1          nompi_h2750804_103    conda-forge
ipykernel                 6.6.1           py310hfdc917e_0    conda-forge
ipython                   7.31.0          py310hff52083_0    conda-forge
iris                      3.1.0              pyhd8ed1ab_3    conda-forge
numpy                     1.22.0          py310h454958d_0    conda-forge
pandas                    1.3.5           py310hb5077e9_0    conda-forge
pickleshare               0.7.5                   py_1003    conda-forge
python                    3.10.1          h62f1059_2_cpython    conda-forge
scipy                     1.7.3           py310hea5193d_0    conda-forge

Additional context

Stack trace

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 57, in _encode_default
    sub_header, sub_frames = serialize_and_split(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 435, in serialize_and_split
    header, frames = serialize(x, serializers, on_error, context)
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 262, in serialize
    return serialize(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 308, in serialize
    headers_frames = [
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 309, in <listcomp>
    serialize(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 359, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')
distributed.comm.utils - ERROR - ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')
Traceback (most recent call last):
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/utils.py", line 33, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 57, in _encode_default
    sub_header, sub_frames = serialize_and_split(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 435, in serialize_and_split
    header, frames = serialize(x, serializers, on_error, context)
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 262, in serialize
    return serialize(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 308, in serialize
    headers_frames = [
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 309, in <listcomp>
    serialize(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 359, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/batched.py", line 93, in _background_send
    nbytes = yield self.comm.write(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/tcp.py", line 250, in write
    frames = await to_frames(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/utils.py", line 50, in to_frames
    return _to_frames()
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/utils.py", line 33, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 57, in _encode_default
    sub_header, sub_frames = serialize_and_split(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 435, in serialize_and_split
    header, frames = serialize(x, serializers, on_error, context)
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 262, in serialize
    return serialize(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 308, in serialize
    headers_frames = [
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 309, in <listcomp>
    serialize(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 359, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')

---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
<timed exec> in <module>

~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/io/__init__.py in save(source, target, saver, **kwargs)
    426     # Single cube?
    427     if isinstance(source, Cube):
--> 428         saver(source, target, **kwargs)
    429 
    430     # CubeList or sequence of cubes?

~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in save(cube, filename, netcdf_format, local_keys, unlimited_dimensions, zlib, complevel, shuffle, fletcher32, contiguous, chunksizes, endian, least_significant_digit, packing, fill_value)
   2770         # Iterate through the cubelist.
   2771         for cube, packspec, fill_value in zip(cubes, packspecs, fill_values):
-> 2772             sman.write(
   2773                 cube,
   2774                 local_keys,

~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in write(self, cube, local_keys, unlimited_dimensions, zlib, complevel, shuffle, fletcher32, contiguous, chunksizes, endian, least_significant_digit, packing, fill_value)
   1150 
   1151         # Create the associated cube CF-netCDF data variable.
-> 1152         cf_var_cube = self._create_cf_data_variable(
   1153             cube,
   1154             dimension_names,

~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in _create_cf_data_variable(self, cube, dimension_names, local_keys, packing, fill_value, **kwargs)
   2417 
   2418         # Store the data and check if it is masked and contains the fill value
-> 2419         is_masked, contains_fill_value = store(
   2420             data, cf_var, fill_value_to_check
   2421         )

~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in store(data, cf_var, fill_value)
   2395                 # the fill value
   2396                 target = _FillValueMaskCheckAndStoreTarget(cf_var, fill_value)
-> 2397                 da.store([data], [target])
   2398                 return target.is_masked, target.contains_value
   2399 

~/miniconda3/envs/iris/lib/python3.10/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
   1116     elif compute:
   1117         store_dsk = HighLevelGraph(layers, dependencies)
-> 1118         compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
   1119         return None
   1120 

~/miniconda3/envs/iris/lib/python3.10/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
    313     schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
    314     dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 315     return schedule(dsk2, keys, **kwargs)
    316 
    317 

~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2689                     should_rejoin = False
   2690             try:
-> 2691                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2692             finally:
   2693                 for f in futures.values():

~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1944             else:
   1945                 local_worker = None
-> 1946             return self.sync(
   1947                 self._gather,
   1948                 futures,

~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/utils.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    308             return future
    309         else:
--> 310             return sync(
    311                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    312             )

~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    362     if error[0]:
    363         typ, exc, tb = error[0]
--> 364         raise exc.with_traceback(tb)
    365     else:
    366         return result[0]

~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/utils.py in f()
    347             if callback_timeout is not None:
    348                 future = asyncio.wait_for(future, callback_timeout)
--> 349             result[0] = yield future
    350         except Exception:
    351             error[0] = sys.exc_info()

~/miniconda3/envs/iris/lib/python3.10/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1810                         else:
   1811                             raise exception.with_traceback(traceback)
-> 1812                         raise exc
   1813                     if errors == "skip":
   1814                         bad_keys.add(key)

CancelledError: ('store-map-ff2955f4724c217f42a2a75fc58e80e8', 1, 0, 0)
@dmcg
Copy link
Author

dmcg commented Jan 18, 2022

Here's a notebook with the trail I followed

Archive.zip

@dmcg
Copy link
Author

dmcg commented Jan 19, 2022

For what it's worth, I can perform the same operations on NetCDF files with Xarray, so it doesn't seem a ridulous thing to do?

from dask.distributed import Client
client = Client(n_workers=4)

import xarray
dataset = xarray.open_dataset('~/environment/data/000490262cdd067721a34112963bcaa2b44860ab.nc').chunk({"latitude": 10})
assert {'realization': 18, 'height': 33, 'latitude': 960, 'longitude': 1280, 'bnds': 2} == dataset.dims.mapping
averages = dataset.mean('realization', keep_attrs=True)
averages.to_netcdf('delme.nc')

@wjbenfold wjbenfold added the Peloton 🚴‍♂️ Target a breakaway issue to be caught and closed by the peloton label Jan 19, 2022
@wjbenfold
Copy link
Contributor

Hi @dmcg, thanks for getting in touch. It looks like you're running in an AWS ec2 instance, is that the case?

We discussed this a bit this morning and what you're doing does seem reasonable, and we've got a couple of ideas as to what might be going wrong. We're a bit surprised we don't see this issue more, so it might be that the way things are pickled in the cloud. Can you reproduce the issue on the ground?

@dmcg
Copy link
Author

dmcg commented Jan 20, 2022

Hi, thanks for the response.

This is running in EC2 (Cloud9 fwiw), but my understanding of that Client(n_workers=4) line is that the Dask ‘cluster’ will just be on the Jupyter machine, maybe even in the same process, so the pickled objects won’t really touch the cloud? Or, put another way, my ground is in the cloud!

I’m out for a couple of days, but happy to help next week, maybe at least check I can reproduce with other NetCDF files. I’ll also speak with my clients about sharing the repo with the data and code.

@dmcg
Copy link
Author

dmcg commented Jan 20, 2022

I’ve given @wjbenfold access to the repo, it seems we’re organisationally proximate!

@dmcg
Copy link
Author

dmcg commented Jan 26, 2022

I've tried a selection of files from https://github.com/SciTools/iris-test-data/tree/master/test_data/NetCDF, but can't find one that will open with iris.load_cube. If you can point me at a suitable candidate to run against the sample code, or amended code, I can try that in my environment.

@wjbenfold
Copy link
Contributor

Whilst acknowledging that this current doesn't diagnose the actual source of the issue, I'd based on your traceback that you're running with python 3.10, which no version (not even the upcoming Iris 3.2) has been tested against. When running the Iris 3.1 tests in a python 3.10 environment I get a lot of test failures, which could well include this issue.

Is running in an older version of python an option for you? Iris 3.1 was tested in python 3.7, so that's probably the best choice to give a go.

@wjbenfold wjbenfold self-assigned this Jan 26, 2022
@dmcg
Copy link
Author

dmcg commented Jan 26, 2022

I'll have a chat with miniconda and see what I can do

@dmcg
Copy link
Author

dmcg commented Jan 27, 2022

I've tried again in a fresh and minimal 3.7 environment.

conda create --name iris python=3.7 -y
conda activate iris
conda install distributed iris pytest -c conda-forge -y

Given test_iris_issue.py

import iris

import pytest
from dask.distributed import Client

# Uncomment to see the failure
# client = Client(n_workers=4)

def test_load_and_save():
    cube = iris.load_cube('metoffice-data/000490262cdd067721a34112963bcaa2b44860ab.nc')
    averages = cube.collapsed('realization', iris.analysis.MEAN)
    iris.io.save(averages, "delme.nc")

pytest test_iris_issue.py succeeds as is, and fails with the _FillValueMaskCheckAndStoreTarget when the client line is uncommented.

The environment is still EC2, and this is what is running:

# packages in environment at /home/ec2-user/miniconda3/envs/iris:
#
# Name                    Version                   Build  Channel
_libgcc_mutex             0.1                        main  
_openmp_mutex             4.5                       1_gnu  
antlr-python-runtime      4.7.2           py37h89c1867_1003    conda-forge
attrs                     21.4.0             pyhd8ed1ab_0    conda-forge
c-ares                    1.18.1               h7f8727e_0  
ca-certificates           2021.10.8            ha878542_0    conda-forge
cartopy                   0.18.0           py37h0d9ca2b_1  
certifi                   2021.10.8        py37h89c1867_1    conda-forge
cf-units                  3.0.1            py37h6f94858_0    conda-forge
cftime                    1.5.1.1          py37hce1f21e_0  
click                     8.0.3            py37h89c1867_1    conda-forge
cloudpickle               2.0.0              pyhd8ed1ab_0    conda-forge
curl                      7.80.0               h7f8727e_0  
cycler                    0.11.0             pyhd8ed1ab_0    conda-forge
cytoolz                   0.11.0           py37h7b6447c_0  
dask-core                 2022.1.0           pyhd8ed1ab_0    conda-forge
distributed               2022.1.0         py37h89c1867_0    conda-forge
expat                     2.2.10               h9c3ff4c_0    conda-forge
freetype                  2.10.4               h0708190_1    conda-forge
fsspec                    2022.1.0           pyhd8ed1ab_0    conda-forge
geos                      3.8.0                he6710b0_0  
hdf4                      4.2.13               h3ca952b_2  
hdf5                      1.10.6          nompi_h6a2412b_1114    conda-forge
heapdict                  1.0.1                      py_0    conda-forge
icu                       67.1                 he1b5a44_0    conda-forge
importlib-metadata        4.10.1           py37h89c1867_0    conda-forge
importlib_metadata        4.10.1               hd8ed1ab_0    conda-forge
iniconfig                 1.1.1              pyh9f0ad1d_0    conda-forge
iris                      3.1.0              pyhd8ed1ab_3    conda-forge
jinja2                    3.0.3              pyhd8ed1ab_0    conda-forge
jpeg                      9d                   h7f8727e_0  
kiwisolver                1.3.1            py37h2531618_0  
krb5                      1.19.2               hcc1bbae_0    conda-forge
ld_impl_linux-64          2.35.1               h7274673_9  
libblas                   3.9.0           11_linux64_openblas    conda-forge
libcblas                  3.9.0           11_linux64_openblas    conda-forge
libcurl                   7.80.0               h0b77cf5_0  
libedit                   3.1.20191231         he28a2e2_2    conda-forge
libev                     4.33                 h516909a_1    conda-forge
libffi                    3.3                  he6710b0_2  
libgcc-ng                 9.3.0               h5101ec6_17  
libgfortran-ng            11.2.0              h69a702a_12    conda-forge
libgfortran5              11.2.0              h5c6108e_12    conda-forge
libgomp                   9.3.0               h5101ec6_17  
liblapack                 3.9.0           11_linux64_openblas    conda-forge
libnetcdf                 4.6.1                h2053bdc_4  
libnghttp2                1.46.0               hce63b2e_0  
libopenblas               0.3.17          pthreads_h8fe5266_1    conda-forge
libpng                    1.6.37               h21135ba_2    conda-forge
libssh2                   1.9.0                h1ba5d50_1  
libstdcxx-ng              9.3.0               hd4cf53a_17  
locket                    0.2.0                      py_2    conda-forge
markupsafe                2.0.1            py37h5e8e339_0    conda-forge
matplotlib-base           3.2.2            py37h1d35a4c_1    conda-forge
msgpack-python            1.0.2            py37hff7bd54_1  
ncurses                   6.3                  h7f8727e_2  
netcdf4                   1.5.7            py37h0a24e14_0  
numpy                     1.20.3           py37h038b26d_1    conda-forge
openssl                   1.1.1m               h7f8727e_0  
packaging                 21.3               pyhd8ed1ab_0    conda-forge
partd                     1.2.0              pyhd8ed1ab_0    conda-forge
pip                       21.2.2           py37h06a4308_0  
pluggy                    1.0.0            py37h89c1867_2    conda-forge
proj                      6.2.1                hc80f0dc_0    conda-forge
psutil                    5.8.0            py37h27cfd23_1  
py                        1.11.0             pyh6c4a22f_0    conda-forge
pyparsing                 3.0.7              pyhd8ed1ab_0    conda-forge
pyshp                     2.1.3              pyh44b312d_0    conda-forge
pytest                    6.2.5            py37h89c1867_2    conda-forge
python                    3.7.11               h12debd9_0  
python-dateutil           2.8.2              pyhd8ed1ab_0    conda-forge
python-xxhash             2.0.2            py37h5e8e339_0    conda-forge
python_abi                3.7                     2_cp37m    conda-forge
pyyaml                    5.4.1            py37h5e8e339_0    conda-forge
readline                  8.1.2                h7f8727e_1  
scipy                     1.5.3            py37h14a347d_0    conda-forge
setuptools                58.0.4           py37h06a4308_0  
shapely                   1.7.1            py37h1728cc4_0  
six                       1.16.0             pyh6c4a22f_0    conda-forge
sortedcontainers          2.4.0              pyhd8ed1ab_0    conda-forge
sqlite                    3.37.0               hc218d9a_0  
tblib                     1.7.0              pyhd8ed1ab_0    conda-forge
tk                        8.6.11               h1ccaba5_0  
toml                      0.10.2             pyhd8ed1ab_0    conda-forge
toolz                     0.11.2             pyhd8ed1ab_0    conda-forge
tornado                   6.1              py37h5e8e339_1    conda-forge
typing_extensions         4.0.1              pyha770c72_0    conda-forge
udunits2                  2.2.27.27            h360fe7b_0    conda-forge
wheel                     0.37.1             pyhd3eb1b0_0  
xxhash                    0.8.0                h7f98852_3    conda-forge
xz                        5.2.5                h7b6447c_0  
yaml                      0.2.5                h516909a_0    conda-forge
zict                      2.0.0                      py_0    conda-forge
zipp                      3.7.0              pyhd8ed1ab_0    conda-forge
zlib                      1.2.11               h7f8727e_4  

I'll try to reproduce outside EC2

@dmcg
Copy link
Author

dmcg commented Jan 27, 2022

We get the same issue on MacOS and a different MOGREPS-G NetCDF file.

@wjbenfold
Copy link
Contributor

I have local reproduction! For future reference, I had to protect the Client() call with an if __name__ == "__main__":

from dask.distributed import Client

import iris

def main():
    client = Client()

    filename = iris.sample_data_path("A1B_north_america.nc")
    # cube = iris.load_cube('~/environment/data/000490262cdd067721a34112963bcaa2b44860ab.nc')
    cube = iris.load_cube(filename)
    # assert cube.shape == (18, 33, 960, 1280)
    averages = cube.collapsed('latitude', iris.analysis.MEAN)
    # assert type(averages) == iris.cube.Cube
    iris.save(averages, "delme.nc")

if __name__ == "__main__":
    main()

@wjbenfold
Copy link
Contributor

More things I've worked out:

  • If I remove the collapse stage, I still see a serialization error (albeit a different one)
  • Both failures seem to be due to the use of msgpack to serialize Iris objects. This seems to be a widespread problem in Iris (just straight import of msgpack then trying to serialize a Cube hits an error).
  • Dask workers seem to use msgpack regardless of the client being started with specified serializers that don't include it.

This doesn't seem quick to solve, though I'm no expert on serialization / dask so there might be approaches I've not spotted.

@DPeterK
Copy link
Member

DPeterK commented Feb 3, 2022

Hi @dmcg, thanks for raising this issue with Iris! As per @wjbenfold, I've also been able to locally reproduce this error, again with the A1B_north_america.nc file. Hopefully I'll also be able to add some more context to the error, a workaround you can use immediately, and a suggestion for how we can fix Iris to stop this happening in general.

I'll start with the workaround, as it has the most immediate value. If you don't use distributed (i.e. don't create a Client), you won't get the error (the reason for this will hopefully become clear in the context). If you just use plain Iris you will automatically get dask local parallel processing, so you should see the same parallel performance for your collapse operation with or without distributed. As you're using a single EC2 (I think) you won't lose performance for distributing your processing over multiple machines - although obviously this workaround won't scale if you do move to a cluster of machines.

So, the following code should run without error:

import iris

cube = iris.load_cube('~/environment/data/000490262cdd067721a34112963bcaa2b44860ab.nc')
averages = cube.collapsed('realization', iris.analysis.MEAN)
iris.save(averages, "delme.nc")

Your other option is to realise the data before you save it - that is, load the data into memory after you collapse but before you save. Note this will only work if there's enough memory on the machine to store the collapsed cube's data. For example:

import iris
from distributed import Client

client = Client(n_workers=4)

cube = iris.load_cube('~/environment/data/000490262cdd067721a34112963bcaa2b44860ab.nc')
averages = cube.collapsed('realization', iris.analysis.MEAN)

averages.data  # <-- This line will load the data and force processing of the collapse operation before you save.

iris.save(averages, "delme.nc")

The reason for the difference between using distributed and not is that distributed communications always run over network links - even when client, scheduler and workers are all on the same machine. There are certainly some advantages to using a local cluster over dask multiprocessing (and it's the preferred solution in the dask docs), but it can be more unreliable.

One example of this is that the network communications cause extra constraints on how data is moved between workers. By default a distributed network runs over TCP, which transmits frames of bytes. Python objects in memory or on disk must be translated to bytes before being transmitted, and translated back from bytes on receipt. These processes are serialization and deserialization respectively, and it's this step that's failing here. The way that Python objects are serialized for TCP transmission is by first being pickled, and apparently the problem class here is one that cannot be pickled. Taking a look through the Iris source code it looks like the problem class is only used for NetCDF save, so the scope of the problem is small - but still annoying if you can't save your cube!

I think the longer-term solution to this will be to make the problem class serializable. This should be achievable by overriding the class's __getstate__ and __setstate__ methods - the trick will be ensuring that the correct state is captured by these methods. I'll have a go...

@dmcg
Copy link
Author

dmcg commented Feb 3, 2022

Thanks for this. Loading into local memory should be possible for me, so happy to have this as just an irrititating edge-case.

@cpelley
Copy link

cpelley commented Jun 13, 2022

Also experience this issue when using SPICE which involves saving a cube within the process 👍

@wjbenfold wjbenfold removed their assignment Jul 7, 2022
@wjbenfold wjbenfold removed the Peloton 🚴‍♂️ Target a breakaway issue to be caught and closed by the peloton label Jul 7, 2022
@bouweandela
Copy link
Member

#5031 may be relevant here.

@pp-mo
Copy link
Member

pp-mo commented Oct 25, 2022

#5031 may be relevant here.

Indeed.
Can you please explain if this is still an issue for you @dmcg ? -- as all this discussion is a while ago now.
I think my experiences with #5031 may cast some light on how to do this.
Also, when that is available, it would probably be a better way of doing this anyway -- but work on that is still incomplete.

So I think it would be useful to have a re-statement of whether you still have a need + if so what needs to be achieved.

@stephenworsley
Copy link
Contributor

Has this been fixed by #5191?

@scitools-ci scitools-ci bot removed this from 🚴 Peloton Dec 15, 2023
Copy link
Contributor

github-actions bot commented Sep 8, 2024

In order to maintain a backlog of relevant issues, we automatically label them as stale after 500 days of inactivity.

If this issue is still important to you, then please comment on this issue and the stale label will be removed.

Otherwise this issue will be automatically closed in 28 days time.

@github-actions github-actions bot added the Stale A stale issue/pull-request label Sep 8, 2024
@pp-mo
Copy link
Member

pp-mo commented Sep 10, 2024

I think this is probably still relevant.

However, when I now try this I get other errors to do with parallelism problems

Sample

import iris
from iris.tests import get_data_path as gdp
pth = gdp(['NetCDF', 'stereographic', 'toa_brightness_temperature.nc'])
cube = iris.load_cube(pth)

from dask.distributed import Client
client = Client(n_workers=4)

averages = cube.collapsed('projection_x_coordinate', iris.analysis.MEAN)
assert type(averages) == iris.cube.Cube
iris.io.save(averages, "delme.nc")

RUN ==>

Traceback (most recent call last):
  File "/net/project/ukmo/scitools/opt_scitools/conda/deployments/default-2024_02_20/lib/python3.11/site-packages/distributed/nanny.py", line 455, in instantiate
    result = await self.process.start()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/net/project/ukmo/scitools/opt_scitools/conda/deployments/default-2024_02_20/lib/python3.11/site-packages/distributed/nanny.py", line 755, in start
    await self.process.start()
  File "/net/project/ukmo/scitools/opt_scitools/conda/deployments/default-2024_02_20/lib/python3.11/site-packages/distributed/process.py", line 55, in _call_and_set_future
    res = func(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^
  File "/net/project/ukmo/scitools/opt_scitools/conda/deployments/default-2024_02_20/lib/python3.11/site-packages/distributed/process.py", line 215, in _start
    process.start()
  File "/net/project/ukmo/scitools/opt_scitools/conda/deployments/default-2024_02_20/lib/python3.11/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "/net/project/ukmo/scitools/opt_scitools/conda/deployments/default-2024_02_20/lib/python3.11/multiprocessing/context.py", line 288, in _Popen
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
  File "/net/project/ukmo/scitools/opt_scitools/conda/deployments/default-2024_02_20/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/net/project/ukmo/scitools/opt_scitools/conda/deployments/default-2024_02_20/lib/python3.11/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/net/project/ukmo/scitools/opt_scitools/conda/deployments/default-2024_02_20/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/net/project/ukmo/scitools/opt_scitools/conda/deployments/default-2024_02_20/lib/python3.11/multiprocessing/spawn.py", line 164, in get_preparation_data
    _check_not_importing_main()
  File "/net/project/ukmo/scitools/opt_scitools/conda/deployments/default-2024_02_20/lib/python3.11/multiprocessing/spawn.py", line 140, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

        To fix this issue, refer to the "Safe importing of main module"
        section in https://docs.python.org/3/library/multiprocessing.html

Though ... not clear if that is a general problem, or caused by local issues

@github-actions github-actions bot removed the Stale A stale issue/pull-request label Sep 11, 2024
@bouweandela
Copy link
Member

bouweandela commented Sep 11, 2024

@pp-mo If you follow the advice in the error message and make your script an importable module things seem to work fine, e.g.:

import iris
from iris.tests import get_data_path as gdp
from dask.distributed import Client

if __name__ == '__main__':
    pth = gdp(['NetCDF', 'stereographic', 'toa_brightness_temperature.nc'])
    cube = iris.load_cube(pth)

    client = Client(n_workers=4)

    averages = cube.collapsed('projection_x_coordinate', iris.analysis.MEAN)
    assert type(averages) == iris.cube.Cube
    iris.io.save(averages, "delme.nc")

runs without issues for me.

@ESadek-MO
Copy link
Contributor

From @SciTools/peloton, this was fixed in #5191.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants