Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClientResponseError:400 message='Bad Request' when opening OpenDAP urls with xarray #525

Closed
pbranson opened this issue Feb 2, 2021 · 5 comments

Comments

@pbranson
Copy link

pbranson commented Feb 2, 2021

Not sure if this is an fsspec/async issue or xarray/intake-xarray. Since the change to async operation with aiohttp it is no longer possible to use an OpenDAP server endpoint with xarray when a file-like object is passed. Minimum reproducible example:

import intake
import xarray as xr
from intake.catalog import Catalog
from intake.catalog.local import LocalCatalogEntry
mycat = Catalog.from_dict({
    'eta': LocalCatalogEntry('test', 'test single file', 'netcdf', args={'urlpath':'https://dapds00.nci.org.au/thredds/dodsC/rr6/oceanmaps_datasets/version_3.3/forecast/latest/ocean_fc_2021011512_000_eta.nc'}),
    })
print(mycat.eta.yaml())
mycat.eta.to_dask()

Results in:

sources:
  test:
    args:
      urlpath: https://dapds00.nci.org.au/thredds/dodsC/rr6/oceanmaps_datasets/version_3.3/forecast/latest/ocean_fc_2021011512_000_eta.nc
    description: test single file
    driver: intake_xarray.netcdf.NetCDFSource
    metadata:
      catalog_dir: ''

---------------------------------------------------------------------------
ClientResponseError                       Traceback (most recent call last)
<ipython-input-28-667a22c3f2d5> in <module>
      5     })
      6 print(mycat.spectra.yaml())
----> 7 mycat.spectra.to_dask()

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/intake_xarray/base.py in to_dask(self)
     67     def to_dask(self):
     68         """Return xarray object where variables are dask arrays"""
---> 69         return self.read_chunked()
     70 
     71     def close(self):

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/intake_xarray/base.py in read_chunked(self)
     42     def read_chunked(self):
     43         """Return xarray object (which will have chunks)"""
---> 44         self._load_metadata()
     45         return self._ds
     46 

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/intake/source/base.py in _load_metadata(self)
    124         """load metadata only if needed"""
    125         if self._schema is None:
--> 126             self._schema = self._get_schema()
    127             self.datashape = self._schema.datashape
    128             self.dtype = self._schema.dtype

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/intake_xarray/base.py in _get_schema(self)
     16 
     17         if self._ds is None:
---> 18             self._open_dataset()
     19 
     20             metadata = {

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/intake_xarray/netcdf.py in _open_dataset(self)
     90             url = fsspec.open(self.urlpath, **self.storage_options).open()
     91 
---> 92         self._ds = _open_dataset(url, chunks=self.chunks, **kwargs)
     93 
     94     def _add_path_to_ds(self, ds):

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/xarray/backends/api.py in open_dataset(filename_or_obj, group, decode_cf, mask_and_scale, decode_times, autoclose, concat_characters, decode_coords, engine, chunks, lock, cache, drop_variables, backend_kwargs, use_cftime, decode_timedelta)
    555     else:
    556         if engine is None:
--> 557             engine = _autodetect_engine(filename_or_obj)
    558 
    559         extra_kwargs = {}

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/xarray/backends/api.py in _autodetect_engine(filename_or_obj)
    161         engine = _get_default_engine(filename_or_obj, allow_remote=True)
    162     else:
--> 163         engine = _get_engine_from_magic_number(filename_or_obj)
    164     return engine
    165 

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/xarray/backends/api.py in _get_engine_from_magic_number(filename_or_obj)
    130                 "please close and reopen, or use a context manager"
    131             )
--> 132         magic_number = filename_or_obj.read(8)
    133         filename_or_obj.seek(0)
    134 

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/implementations/http.py in read(self, length)
    434             )  # all fits in one block anyway
    435         ):
--> 436             self._fetch_all()
    437         if self.size is None:
    438             if length < 0:

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
    119     def wrapper(*args, **kwargs):
    120         self = obj or args[0]
--> 121         return maybe_sync(func, self, *args, **kwargs)
    122 
    123     return wrapper

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in maybe_sync(func, self, *args, **kwargs)
     98         if inspect.iscoroutinefunction(func):
     99             # run the awaitable on the loop
--> 100             return sync(loop, func, *args, **kwargs)
    101         else:
    102             # just call the blocking function

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in sync(loop, func, callback_timeout, *args, **kwargs)
     69     if error[0]:
     70         typ, exc, tb = error[0]
---> 71         raise exc.with_traceback(tb)
     72     else:
     73         return result[0]

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in f()
     53             if callback_timeout is not None:
     54                 future = asyncio.wait_for(future, callback_timeout)
---> 55             result[0] = await future
     56         except Exception:
     57             error[0] = sys.exc_info()

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/implementations/http.py in async_fetch_all(self)
    451             r = await self.session.get(self.url, **self.kwargs)
    452             async with r:
--> 453                 r.raise_for_status()
    454                 out = await r.read()
    455                 self.cache = AllBytes(

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/aiohttp/client_reqrep.py in raise_for_status(self)
   1003                 status=self.status,
   1004                 message=self.reason,
-> 1005                 headers=self.headers,
   1006             )
   1007 

ClientResponseError: 400, message='Bad Request', url=URL('https://dapds00.nci.org.au/thredds/dodsC/rr6/oceanmaps_datasets/version_3.3/forecast/latest/ocean_fc_2021011512_000_eta.nc')

Similar to intake/intake-xarray#98 this previously worked.

Testing that the URL is correct - this works:

ds = xr.open_dataset(mycat.spectra.urlpath)
ds

However using the h5netcdf engine with a file-like object causes the same error:

import fsspec

with fsspec.open(mycat.eta.urlpath) as f:
    ds = xr.open_dataset(f,engine='h5netcdf')

---------------------------------------------------------------------------
ClientResponseError                       Traceback (most recent call last)
<ipython-input-41-ad0d50259212> in <module>
      2 
      3 with fsspec.open(mycat.eta.urlpath) as f:
----> 4     ds = xr.open_dataset(f,engine='h5netcdf')
      5 ds

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/xarray/backends/api.py in open_dataset(filename_or_obj, group, decode_cf, mask_and_scale, decode_times, autoclose, concat_characters, decode_coords, engine, chunks, lock, cache, drop_variables, backend_kwargs, use_cftime, decode_timedelta)
    570 
    571         opener = _get_backend_cls(engine)
--> 572         store = opener(filename_or_obj, **extra_kwargs, **backend_kwargs)
    573 
    574     with close_on_error(store):

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/xarray/backends/h5netcdf_.py in open(cls, filename, mode, format, group, lock, autoclose, invalid_netcdf, phony_dims)
    136                 )
    137             else:
--> 138                 magic_number = filename.read(8)
    139                 filename.seek(0)
    140                 if not magic_number.startswith(b"\211HDF\r\n\032\n"):

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/implementations/http.py in read(self, length)
    434             )  # all fits in one block anyway
    435         ):
--> 436             self._fetch_all()
    437         if self.size is None:
    438             if length < 0:

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
    119     def wrapper(*args, **kwargs):
    120         self = obj or args[0]
--> 121         return maybe_sync(func, self, *args, **kwargs)
    122 
    123     return wrapper

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in maybe_sync(func, self, *args, **kwargs)
     98         if inspect.iscoroutinefunction(func):
     99             # run the awaitable on the loop
--> 100             return sync(loop, func, *args, **kwargs)
    101         else:
    102             # just call the blocking function

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in sync(loop, func, callback_timeout, *args, **kwargs)
     69     if error[0]:
     70         typ, exc, tb = error[0]
---> 71         raise exc.with_traceback(tb)
     72     else:
     73         return result[0]

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in f()
     53             if callback_timeout is not None:
     54                 future = asyncio.wait_for(future, callback_timeout)
---> 55             result[0] = await future
     56         except Exception:
     57             error[0] = sys.exc_info()

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/implementations/http.py in async_fetch_all(self)
    451             r = await self.session.get(self.url, **self.kwargs)
    452             async with r:
--> 453                 r.raise_for_status()
    454                 out = await r.read()
    455                 self.cache = AllBytes(

~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/aiohttp/client_reqrep.py in raise_for_status(self)
   1003                 status=self.status,
   1004                 message=self.reason,
-> 1005                 headers=self.headers,
   1006             )
   1007 

ClientResponseError: 400, message='Bad Request', url=URL('https://dapds00.nci.org.au/thredds/dodsC/rr6/oceanmaps_datasets/version_3.3/forecast/latest/ocean_fc_2021011512_000_eta.nc')

This seems to be something unique to the opendap endpoint, as the standard http server works fine, e.g:

import fsspec

with fsspec.open('https://dapds00.nci.org.au/thredds/fileServer/rr6/oceanmaps_datasets/version_3.3/forecast/latest/ocean_fc_2021011512_000_eta.nc') as f:
    ds = xr.open_dataset(f,engine='h5netcdf')
ds

I have confirmed this with a few different opendap servers.

@martindurant
Copy link
Member

Can you please go into debug and find out what the headers of the request were? Perhaps this is trying to grab a specific range, and dap doesn't handle that.

@pbranson
Copy link
Author

pbranson commented Feb 5, 2021

The RequestInfo object associated with the exception is:

RequestInfo(url=URL('https://dapds00.nci.org.au/thredds/dodsC/rr6/oceanmaps_datasets/version_3.3/forecast/latest/ocean_fc_2021011512_000_eta.nc'), method='GET', headers=<CIMultiDictProxy('Host': 'dapds00.nci.org.au', 'Accept': '/', 'Accept-Encoding': 'gzip, deflate', 'User-Agent': 'Python/3.7 aiohttp/3.7.3')>, real_url=URL('https://dapds00.nci.org.au/thredds/dodsC/rr6/oceanmaps_datasets/version_3.3/forecast/latest/ocean_fc_2021011512_000_eta.nc'))

I read through the xarray PR pydata/xarray#4823 and seems that this is all related possibly to your reasons of limiting that PR to Zarr only.

I have followed the call stack through, the read is prompted by the 8 byte read to inform Xarray._get_engine_from_magic_number, however at HTTPFile.Read (line 436) HTTPFile.size is (65) < blocksize so self._fetch_all() is requested.

The size of 65 is is set based on the number of characters in the URL (http.py:231) - the actual file is ~10MB. Setting the storage option 'block_size' to 40 to force a range request results in the same error and this RequestInfo:

RequestInfo(url=URL('https://dapds00.nci.org.au/thredds/dodsC/rr6/oceanmaps_datasets/version_3.3/forecast/latest/ocean_fc_2021011512_000_eta.nc'), method='GET', headers=<CIMultiDictProxy('Host': 'dapds00.nci.org.au', 'Range': 'bytes=0-47', 'Accept': '/', 'Accept-Encoding': 'gzip, deflate', 'User-Agent': 'Python/3.7 aiohttp/3.7.3')>, real_url=URL('https://dapds00.nci.org.au/thredds/dodsC/rr6/oceanmaps_datasets/version_3.3/forecast/latest/ocean_fc_2021011512_000_eta.nc'))

@martindurant
Copy link
Member

If the server didn't accept range requests, that would not be too much of a surprise - but I don't see that in the original exception-causing request. _fetch_all should get the whole file, which is actually what we want, what could be going wrong?

to inform Xarray._get_engine_from_magic_number

Se we could specify the engine and avoid this?

The size of 65 is is set based on the number of characters in the URL (http.py:231)

That's not what self.size(path) does - it tries HEAD or streaming GET. Actually, that's the intent, but I don't now see where this might happen. Actually, the URL is longer than that, and I think it's the length of

Error {
    code = 400;
    message = "Unrecognized request";
};

which is what I get in requests.get(url).text - so we are getting a correct length of the payload, but the payload is not the file we want!

I conclude that DAP shouldn't just be fetched like this. I don't know what it does internally, but presumably it passes additional parameters/queries which expose the real URL for reading.

EDIT

A little digging in pydap reveals:

url + ".dds" : dataset description (size, coords)
url + ".das" : attributes
To get the data variables, you need to format a query +"&..." in a way I haven't yet figured out. Best not to recreate DAP!

By the way, this is also a thredds server, maybe intake_thredds takes care of it?

@pbranson
Copy link
Author

pbranson commented Feb 6, 2021

I was hoping to have an intake-xarray Catalog entry that provided a parameter and aggregated across several files on a thredds server. The intake-thredds syntax is a bit clunky still to descend the catalog hierarchy.

I only stumbled across this due to the issues introduced by including fsspec into the NetCDFSource intake/intake-xarray#98

Agree that this isn't the correct approach for accessing DAP, but the correct backend clients exposed through the XArray 'engine' argument don't accept file-like objects.

This issue can probably be closed.

@martindurant
Copy link
Member

Yes, I think this issue should be closed, perhaps we can iron out the details in intake-xarray and/or intake-thredds can talk with xarray a bit clearer to concat/merge datasets.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants