Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance of GRIB files read with dask and dask.distributed can be improved. #33

Open
alexamici opened this issue Nov 12, 2018 · 10 comments
Labels
enhancement New feature or request

Comments

@alexamici
Copy link
Contributor

Support for dask and dask.distributed is currently quite good from the point of view of the correctness. We are able to perform complex operations on 320Gb of data from 20 files on a 10 VM x 8 core dask.distributed cluster with no problem.

Performance is not stellar and more work is needed in this respect.

@alexamici alexamici added the enhancement New feature or request label Nov 12, 2018
@alexamici alexamici changed the title Improve performance of GRIB files read with dask and dask.distributed Performance of GRIB files read with dask and dask.distributed is not stellar Nov 12, 2018
@alexamici alexamici changed the title Performance of GRIB files read with dask and dask.distributed is not stellar Performance of GRIB files read with dask and dask.distributed can be improved. Nov 12, 2018
@guidocioni
Copy link

I don't know whether this could be flagged as issue but I've noticed some weird behaviour trying to use cfgrib as xarray engine.

I'm running some benchmarks on GRIB data containing output of global models with different resolutions (ranging from 80 km to 2.5 km) written with compression. The shape of the array that I'm trying to process ranges within (~300 timesteps, ~80 levels, ~8e5-80e6 points).

My initial idea was to speed up the processing of data by parallelising instead than using CDO which still works relatively well on our machine. For the 80km and 40km everythin works pretty well and the performance is actually not bad! Here a comparison of the serial and parallel approaches.

Serial (not parallel) approach

In [1]:
import xarray as xr
In [15]:
%%time
dss = xr.open_mfdataset('/work/ka1081/DYAMOND/ICON-80km/nwp_R2B05_lkm1008_atm_3d_tot_qc_dia_ml_*.grb', engine='cfgrib')
CPU times: user 1min 40s, sys: 982 ms, total: 1min 41s
Wall time: 1min 48s
In [16]:
%%time
dss.unknown.mean(axis=2).compute()
CPU times: user 3min 4s, sys: 22.5 s, total: 3min 26s
Wall time: 3min 13s
Out[16]:
<xarray.DataArray 'unknown' (time: 321, generalVerticalLayer: 77)>
array([[0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.542062e-06,
        1.236650e-06, 1.581882e-06],
       [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.137736e-06,
        7.532420e-07, 4.611456e-07],
       [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.473110e-06,
        8.896610e-07, 5.851762e-07],
       ...,
       [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 9.012958e-07,
        5.529808e-07, 4.198118e-07],
       [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.179397e-06,
        7.647039e-07, 6.093065e-07],
       [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.571679e-06,
        1.017117e-06, 8.398871e-07]], dtype=float32)
Coordinates:
    step                  timedelta64[ns] 00:00:00
  * generalVerticalLayer  (generalVerticalLayer) int64 14 15 16 17 ... 88 89 90
  * time                  (time) datetime64[ns] 2016-08-01 ... 2016-09-10
    valid_time            (time) datetime64[ns] 2016-08-01 ... 2016-09-10

Parallel Approach

In [1]:
import json
data=[]
with open('/work/mh0731/m300382/dask/scheduler.json') as f:
    data = json.load(f)
scheduler_address=data['address']
from dask.distributed import Client, progress
client = Client(scheduler_address)
client
Out[1]:

Client

Cluster

  • Workers: 132
  • Cores: 9504
  • Memory: 8.86 TB
In [2]:
import xarray as xr 
In [4]:
%%time
dss = xr.open_mfdataset('/work/ka1081/DYAMOND/ICON-80km/nwp_R2B05_lkm1008_atm_3d_tot_qc_dia_ml_*.grb', engine='cfgrib', parallel=True)
CPU times: user 659 ms, sys: 218 ms, total: 877 ms
Wall time: 11.6 s
In [6]:
%%time
dss.unknown.mean(axis=2).compute()
CPU times: user 275 ms, sys: 23 ms, total: 298 ms
Wall time: 11.4 s
Out[6]:
<xarray.DataArray 'unknown' (time: 321, generalVerticalLayer: 77)>
array([[0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.542062e-06,
        1.236650e-06, 1.581882e-06],
       [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.137736e-06,
        7.532420e-07, 4.611456e-07],
       [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.473110e-06,
        8.896610e-07, 5.851762e-07],
       ...,
       [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 9.012958e-07,
        5.529808e-07, 4.198118e-07],
       [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.179397e-06,
        7.647039e-07, 6.093065e-07],
       [0.000000e+00, 0.000000e+00, 0.000000e+00, ..., 1.571679e-06,
        1.017117e-06, 8.398871e-07]], dtype=float32)
Coordinates:
    step                  timedelta64[ns] 00:00:00
  * generalVerticalLayer  (generalVerticalLayer) int64 14 15 16 17 ... 88 89 90
  * time                  (time) datetime64[ns] 2016-08-01 ... 2016-09-10
    valid_time            (time) datetime64[ns] 2016-08-01 ... 2016-09-10

However, when trying to process in the same way data from the 20 km resolution model, which should still be quite lightweight, the computation hangs and some messages are printed in the logs of dask workers.

15: distributed.core - INFO - Event loop was unresponsive in Worker for 26.74s.  This is often caused by long-running GIL-holding functions or moving large chunks       of data. This can cause timeouts and instability.
63: distributed.core - INFO - Event loop was unresponsive in Worker for 26.60s.  This is often caused by long-running GIL-holding functions or moving large chunks       of data. This can cause timeouts and instability.

This causes also the scheduler to hangs and crash. I'm wondering whether it has to do with the fact that data is also compressed, and that maybe decompressing and parallelizing at the same time is just too much, or whether the dask support may still not be completely functional.

I'm testing this on our cluster where dask, eccodes and cfgrib were all installed in a conda environment to minimise dependencies issues. In the past I've succesfully processed large NETCDF datasets (order of ~1000 timesteps, 20e6 points).

@alexamici
Copy link
Contributor Author

@guidocioni I didn't know the parallel keyword argument, but it looks like it only affect the multi-file preprocessing step.

In order to enable chunked (thus parallel) processing in dask you need to supply the chinks keyword argument. Try opening the dataset with:

dss = xr.open_mfdataset('files*.grb', engine='cfgrib', chunks={'time': 1, 'generalVerticalLayer': 1}, parallel=True)

@guidocioni
Copy link

@guidocioni I didn't know the parallel keyword argument, but it looks like it only affect the multi-file preprocessing step.

In order to enable chunked (thus parallel) processing in dask you need to supply the chinks keyword argument. Try opening the dataset with:

dss = xr.open_mfdataset('files*.grb', engine='cfgrib', chunks={'time': 1, 'generalVerticalLayer': 1}, parallel=True)

Yes, the parallel keyword argument only affects the preprocessing step, which is faster. However, the open_mfdataset function always returns a dataset composed by dask arrays (see here http://xarray.pydata.org/en/stable/generated/xarray.open_mfdataset.html). Chunks are determined automatically based on the dimensions. Thus, the computation is always parallelized depending on the scheduler used when adopting open_mfdataset. I think using lazy arrays prevents xarray from loading big dataset directly into memory.

Furthermore I've also used xr.open_dataset manually defining the the chunks and I still have the same behaviour.

@alexamici
Copy link
Contributor Author

My guess is that for the 20km you have 321 files with one time per file and approx 4Gb each for a total of 1.3Tb. If this is the case the automatic chunking would be equivalent to chunks={'time': 1} and processing 4Gb of data per chunk may be too big for one core of your (otherwise massive!) dask cluster.

Is it a public dataset that I can see? How many files do you have? What size is every file? How are they structured?

@guidocioni
Copy link

For the 20km-resolution model 3D variables are written in GRIB2 aec compressed files containing 8 timesteps (1 day every 3 hours), 77 vertical levels and exactly 1310720 horizontal points (grid is unstructured). They are about 1 GB in size and there are 41 of them.

It would be really nice to share these files to test the cfgrib and dask capabilities but unfortunately they are not public but live on our cluster. Still, I could transfer 40 GB of data somewhere if you're interested to test them. Moving data from the 10, 5 and 2.5 km resolution domain will be a little bit more complicated for obvious reasons :)

@guidocioni
Copy link

guidocioni commented Dec 6, 2018

It looks like it is a memory issue... By refining the chunks I was able to run the computation for a while before the job crashed due to limited memory.

Maybe I just have to refine the way I ask for the resources on the cluster size. Thus, I don't think this is strictly cfgrib related but mostly on the dask-infrastructure side.

@guidocioni
Copy link

Small update..apparently using the same approach with netcdf and grib files produce different results. This what I usually do after having launched a connected to a dask scheduler which controls 15 different workers (for a total of 16 nodes on the machine with 72 cores each).

import xarray as xr 
from glob import glob
import dask.array as da

chunk_space=1114111
chunk_time=50

main_folder='/work/bm0834/k203095/ICON_LEM_DE/'
file_prefix='2d_cloud_day'
domain='DOM01'
folders=['20130620-default-ifs']
variable='rain_gsp_rate'

filenames=[]
for folder in folders:
    filenames=filenames+(sorted(glob(main_folder+folder+'/DATA/'+file_prefix+'_'+domain+'*')))

temps=[xr.open_dataset(fn).variables[variable] for fn in filenames]

arrays=[da.from_array(t, chunks=(chunk_time,chunk_space)) for t in temps]
var_normal=da.concatenate(arrays, axis=0)
mean=var_normal.mean(axis=1)
mean=mean.persist()

This completes in about 1 minute since the dataset is not that big (1080 timesteps, 2323968 points equals to about total 2.5e9 individual values).

However if I just try to use the same approach with cfgrib and files from the other simulation then workers start to crash and I cannot complete the computation. This is the second approach

chunk_space=1000000
chunk_levels=77
chunk_time=10

main_folder='/work/ka1081/DYAMOND/ICON-80km/'

variables=['u']
filenames=[]
for variable in variables:
   file_prefix='nwp_R2B05_lkm1008_atm_3d_'+variable+'_ml_'
   filenames=filenames+sorted(glob(main_folder+file_prefix+'*.grb'))

temps=[xr.open_dataset(fn, engine='cfgrib').variables[variable] for fn in filenames[0:-2]]
arrays=[da.from_array(t, chunks=(chunk_time,chunk_levels,chunk_space)) for t in temps]
var_normal=da.concatenate(arrays, axis=0)
mean = var_normal.mean(axis=2)
mean = mean.persist()

and I'm using exactly the same session with the same dask schedulers and workers. Note that in this case the total size of the data should be even smaller as I have dimensions (312 timesteps, 77 levels, 81920 points) equals to about 2e9 individual values.

Honestly I don't know where the problem is...with this dataset the individual chunks of data should be so small not to create any problem.

I'm attaching the LOG file from the dask scheduler and workers.

LOG_dask.14437424.txt

@alexamici
Copy link
Contributor Author

The log file contains the following errors from ecCodes:

ECCODES ERROR   :  grib_handle_create: cannot create handle, no definitions found
ecCodes assertion failed: `h' in .../src/grib_query.c:458

which is probably what causes the problems, but also:

No handlers could be found for logger "cfgrib.messages"

so we don't see the messages from cfgrib itself.

I'm not sure how to enable cfgrib logging in dask workers so I cannot debug more that this at the moment.

Anyway, I'll be working on performance and especially performance under dask.distributed early next year.

@MatthewLennie
Copy link

Hi @alexamici
I would just like to add a data point regarding performance.

I am trying to open a 320mb file on a HPC system where bandwidth is not an issue for this size of file. Forming the index takes a really long time considering the size of the file. Time scale is in mins not seconds 7mins to open a 320mb file. It's much slower that nio which also has to create an index from nothing. In my use case, pre-generating index files is not really possible and anyway, it would still be computationally expensive to do it just once. It's a heterogeneous file with many variables and many index keys.

It seems like the specific getitem calls are a large part of the expense of the indexing cost. Perhaps there is some gains to be made here before diving into the Dask optimization. See the profile (incidentally from Dask) below. the largest chunk of index is getitem.

I am currently using the cfgrib.open_datasets interface and doing the Dask multi file stuff myself because I have heterogeneous files. I thought before profiling that it was the merging and sorting costing the time, but it's really just indexing the files.

Screen Shot 2020-11-10 at 3 44 42 PM

Regarding dask optimization. My experience is that creating single processor clients yields the best performance. This is probably due to the classic python not releasing the GIL multithreading issue. A solution to that would also make life a lot easier down the road when you want to use the data and workers for a multithreaded process, like training a neural net. Perhaps this is helpful?

Thanks for taking the time to read. :) I would love to hear any suggestions to improve the performance from the outside or alternatively I would also be very excited to hear if you managed to find a way of getting some improvements.

@guidocioni
Copy link

Chiming in again as I gave another go to dask these days :)

Just to test I wanted to compute yearly averages from era5 monthly data. I have 2 grib files with different time periods downloaded from the Copernicus hub.

import xarray as xr
from dask.distributed import Client

client = Client(n_workers=4)

ds = xr.open_mfdataset('Downloads/adaptor.mars.internal-*.grib',
                       chunks={"time": 10},
                       parallel=True)

Screen Shot 2022-01-31 at 10 01 24

results = ds.resample({'time':'1Y'}).mean().compute()

Takes about 40 seconds and the average CPU usage (I have 6 core i7-8700) sits at about 60-70%.

The same with cdo

(base) guidocioni@Guidos-iMac:~/Downloads$ cdo yearavg -mergetime adaptor.mars.internal-*.grib test.grib
cdo(1) mergetime: Process started
cgribexGetTsteptype: Time range indicator 133 unsupported, set to 0!
cdo(1) mergetime: Processed 5601830400 values from 2 variables over 864 timesteps.
cdo    yearavg: Processed 5601830400 values from 1 variable over 935 timesteps [36.85s 262MB].

takes about 36 seconds with a CPU usage of just 10%.

Is this huge difference in performance really related only to the fact that CDO is highly optimized and written directly in C?

To be fair, I think xarray, dask and cfgrib made a lot of progress as many years ago one could barely open and concatenate files without getting errors...:D But for me it is still far from being optimal and I'm wondering whether I'm doing something wrong.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants