Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can I choose parallel=True like in xr.open_mfdataset? #70

Open
kthyng opened this issue Jan 27, 2022 · 12 comments
Open

Can I choose parallel=True like in xr.open_mfdataset? #70

kthyng opened this issue Jan 27, 2022 · 12 comments

Comments

@kthyng
Copy link

kthyng commented Jan 27, 2022

Hi! I stumbled across this intake driver and it looks like what I need, so thank you so much for your work!

I have already made an example for myself where I have a bunch of file locations that are on a thredds server, and then I read them in with xr.open_mfdataset(), like so:

ds = xr.open_mfdataset(filelocs, drop_variables=['siglay','siglev','Itime2'], parallel=True, compat='override', 
                       combine='by_coords', data_vars='minimal', coords='minimal')

This ended up being about 2 minutes for 127 files.

But, I need to get this combined dataset represented in an intake catalog, which is where intake-thredds comes in. I think I have properly mapped the keywords I used in xr.open_mfdataset to the API in intake-thredds like this:

cat_url = 'https://opendap.co-ops.nos.noaa.gov/thredds/catalog/NOAA/LEOFS/MODELS/catalog.xml'
source = intake.open_thredds_merged(
         cat_url, path=[date.strftime('%Y'),
                   date.strftime('%m'),
                   date.strftime('%d'),
                   date.strftime(f'nos.leofs.fields.????.%Y%m%d.t12z.nc')],
    concat_kwargs={"dim": "time",
                  'data_vars': 'minimal',
                   'coords': 'minimal',
                   'compat': 'override',
                   'combine_attrs': "override"
                  },
    xarray_kwargs=dict(
        drop_variables=['siglay','siglev','Itime2'], 
    ),

)

But, when I then try to look at the resultant combined lazily loaded Dataset with source.to_dask(), it takes forever to try to load and breaks with "Bad Gateway" before it can finish. The only difference I think is that I don't see how to use parallel=True in the call to intake.open_thredds_merged which I used in calling xr.open_mfdataset.

Is there a way to use parallel=True? Thank you for your help!

@aaronspring
Copy link
Collaborator

aaronspring commented Jan 27, 2022

intake-thredds doesn't use open_mfdataset and therefore won't use parallel.

self._ds = xr.concat(data, **self.concat_kwargs)

Have you tried a smaller example e.g. just 2 instead of 127 files? Try with f11? .

@andersy005
Copy link
Member

Thank you for a thorough, reproducible example, @kthyng!

intake-thredds doesn't use open_mfdataset and therefore won't use parallel.

It's my understanding that the bottleneck is in these lines

data = [
ds(xarray_kwargs=self.xarray_kwargs).to_dask()
for ds in tqdm(_match(cat, path), desc='Dataset(s)', ncols=79)
]

As far as I can tell, there is no reason why we can't parallelize the for-loop (in some way) since there're no dependencies within this loop. We could use Python's built-in concurrent futures or Dask delayed.

@aaronspring
Copy link
Collaborator

@andersy005 could intake-thredds be refactored using open_mfdataset (to be changed also in intake-xarray) to use parallel or try to implement dask delayed here in intake-thredds? And would that speed up the call or do you think there is a different bottleneck?

@andersy005
Copy link
Member

could intake-thredds be refactored using open_mfdataset (to be changed also in intake-xarray) to use parallel

We could refactor intake-thredds and implement our own drivers, but I'm wondering if it;s worth the effort?? :). I like the current approach of outsourcing the data loading work to intake-xarray. Apart from the parallelism, are there any other limitations that would justify the cost of implementing our own drivers? Do you think we should wait for this to be implemented in intake-xarray?

@martindurant
Copy link
Member

I tink intake-xarray should pass xarray_kwargs to open_mfdataset if the URL is apparently for multiple data (list of URLs or glob-string).

@kthyng
Copy link
Author

kthyng commented Jan 27, 2022

Yes I saw that open_mfdataset wasn't being used and figured the idea of having concat_kwargs as an option was to sort of recreate this behavior in another way.

@martindurant's idea seems relatively straight-forward:

From the intake-xarray side it seems like the logic from netcdf.py:

https://github.com/intake/intake-xarray/blob/71360fe20b15a0707c37a3ed5c641f1bd070182f/intake_xarray/netcdf.py#L71-L84

could be used in opendap.py:

https://github.com/intake/intake-xarray/blob/71360fe20b15a0707c37a3ed5c641f1bd070182f/intake_xarray/opendap.py#L91-L94

But, I'm not as sure what should change in intake-thredds source.py:

def _open_dataset(self):
import xarray as xr
if self._ds is None:
cat = ThreddsCatalog(self.urlpath, driver=self.driver)
for i in range(len(self.path)):
part = self.path[i]
if '*' not in part and '?' not in part:
cat = cat[part](driver=self.driver)
else:
break
path = self.path[i:]
data = [
ds(xarray_kwargs=self.xarray_kwargs).to_dask()
for ds in tqdm(_match(cat, path), desc='Dataset(s)', ncols=79)
]
if self.concat_kwargs:
self._ds = xr.concat(data, **self.concat_kwargs)
else:
self._ds = xr.combine_by_coords(data, combine_attrs='override')

@aaronspring
Copy link
Collaborator

Have you tried a smaller example e.g. just 2 instead of 127 files? Try with f11? .

works for me:

import xarray as xr
dates = xr.cftime_range(start='2021-12-29',periods=3,freq='1D')
date=dates[0]
cat_url = 'https://opendap.co-ops.nos.noaa.gov/thredds/catalog/NOAA/LEOFS/MODELS/catalog.xml'
source = intake.open_thredds_merged(
         cat_url, path=[date.strftime('%Y'),
                   date.strftime('%m'),
                   date.strftime('%d'),
                   date.strftime(f'nos.leofs.fields.f11?.%Y%m%d.t12z.nc')],
    concat_kwargs={"dim": "time",
                  'data_vars': 'minimal',
                   'coords': 'minimal',
                   'compat': 'override',
                   'combine_attrs': "override"
                  },
    xarray_kwargs=dict(
        drop_variables=['siglay','siglev','Itime2'], 
    ),

)
source.to_dask()
Dataset(s): 100%|██████████████████████████████| 10/10 [01:25<00:00,  8.51s/it]

xarray.Dataset

    Dimensions:
        nele: 11509node: 6106three: 3time: 10maxnode: 11maxelem: 9four: 4siglay: 20siglev: 21

This only fetches 10 datasets and took ~30sec.

@kthyng
Copy link
Author

kthyng commented Jan 28, 2022

@aaronspring Thanks for demonstrating that. That makes sense that it will work for fewer files, but I do really need to be able to access more files at once to be able to construct enough of a model time series to work with. Plus, I would think that access to the files should be comparable whether using xarray directly or using it through intake/intake-thredds/intake-xarray.

@kthyng
Copy link
Author

kthyng commented Jan 28, 2022

As I have been trying to work on this, I see that a difficulty in the change I'm looking for is that a catalog entry would need to be created that has a list of urlpaths to then be passed to intake-xarray opendap which can wrap open_mfdataset. I so far haven't had any success figuring out how to make such a catalog entry in intake-thredds. Do any of you have an idea of how to approach that?

@kthyng
Copy link
Author

kthyng commented Jan 29, 2022

I had a realization since I posted this! I can get the behavior I want with intake-xarray by making the simpler change of adding open_mfdataset to the opendap driver (PR). This works because I already have to figure out the names of the files I will use from the thredds server to create the correct time series of model output, and so have their full URLs (and therefore don't need to use the clever thredds stuff to dig into the catalog). It would be handy to use intake-thredds instead in similar situations, but this is a reasonable compromise.

With the small code change in that PR, I can do this (just showing 2 files to save space):

filelocs = ['https://opendap.co-ops.nos.noaa.gov/thredds/dodsC/NOAA/LEOFS/MODELS/2022/01/28/nos.leofs.fields.n000.20220128.t18z.nc',
            'https://opendap.co-ops.nos.noaa.gov/thredds/dodsC/NOAA/LEOFS/MODELS/2022/01/28/nos.leofs.fields.n001.20220128.t18z.nc']
ds = intake.open_opendap(filelocs, engine='netcdf4', chunks={'time': 1},
                         drop_variables=['siglay','siglev','Itime2'], parallel=True,
                         compat='override', combine='by_coords',
                         data_vars='minimal', coords='minimal'},
)
ds.to_dask()

With the full 127 file locations this just took 2.5 min so seems comparable to running with just xarray.

I'll close this next week if people don't have anything else they want to say at that point.

@aaronspring
Copy link
Collaborator

I would like to keep this issue open for a parallelisation of intake-thredds

@aaronspring aaronspring reopened this Feb 1, 2022
@kthyng
Copy link
Author

kthyng commented Feb 1, 2022

The intake-xarray part is finished now with PR#113 but what to do to allow the intake-thredds catalog to have a list of paths is, from what I learned digging in, the main question to figure out for this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants