Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

differences between gcsfs and fsspec behavior when opening gs:// url paths #476

Open
rabernat opened this issue Nov 18, 2020 · 11 comments
Open

Comments

@rabernat
Copy link
Contributor

rabernat commented Nov 18, 2020

I would like to open a gcs path using fsspec's url resolver and then read it with xarray:

import xarray as xr
import fsspec

url = 'gs://ldeo-glaciology/bedmachine/BedMachineAntarctica_2019-11-05_v01.nc'  
openfile = fsspec.open(url, mode='rb') 
dsgcs = xr.open_dataset(openfile, chunks=3000)

this raises the following error:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-26-9f11dae45dd1> in <module>
      1 url = 'gs://ldeo-glaciology/bedmachine/BedMachineAntarctica_2019-11-05_v01.nc'
      2 openfile = fsspec.open(url, mode='rb')
----> 3 dsgcs = xr.open_dataset(openfile, chunks=3000)
      4 
      5 get_ipython().run_line_magic('time', 'dsgcs.surface.mean().compute()')

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/api.py in open_dataset(filename_or_obj, group, decode_cf, mask_and_scale, decode_times, autoclose, concat_characters, decode_coords, engine, chunks, lock, cache, drop_variables, backend_kwargs, use_cftime, decode_timedelta)
    534                 "with engine='scipy' or 'h5netcdf'"
    535             )
--> 536         engine = _get_engine_from_magic_number(filename_or_obj)
    537         if engine == "scipy":
    538             store = backends.ScipyDataStore(filename_or_obj, **backend_kwargs)

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/api.py in _get_engine_from_magic_number(filename_or_obj)
    113         magic_number = filename_or_obj[:8]
    114     else:
--> 115         if filename_or_obj.tell() != 0:
    116             raise ValueError(
    117                 "file-like object read/write pointer not at zero "

AttributeError: 'OpenFile' object has no attribute 'tell'

However, if I do the same thing with gcsfs, it works

import gcsfs
gcs = gcsfs.GCSFileSystem()

openfile = gcs.open(url, mode='rb') 
dsgcs = xr.open_dataset(openfile, chunks=3000)

This feels like a bug. And it breaks my mental model of how fsspec works. I thought that fsspec was just dispatching to gcsfs based on url matching. Help me understand why that is not the case.

xref pydata/xarray#4591, which helped me discover this (but is about something different)

@martindurant
Copy link
Member

Should be

openfile = fsspec.open(url, mode='rb') 
with openfile as f:
    dsgcs = xr.open_dataset(f, chunks=3000)

@rabernat
Copy link
Contributor Author

If I use your recommended syntax, it doesn't work with distributed because the file is closed.

from dask.distributed import Client
client = Client()
dsgcs.surface.mean().compute()
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-25-136bf6160302> in <module>
      1 url = 'gs://ldeo-glaciology/bedmachine/BedMachineAntarctica_2019-11-05_v01.nc'
      2 with  fsspec.open(url, mode='rb')  as openfile:
----> 3     dsgcs = xr.open_dataset(openfile, chunks=3000)
      4 
      5 get_ipython().run_line_magic('time', 'dsgcs.surface.mean().compute()')

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/api.py in open_dataset(filename_or_obj, group, decode_cf, mask_and_scale, decode_times, autoclose, concat_characters, decode_coords, engine, chunks, lock, cache, drop_variables, backend_kwargs, use_cftime, decode_timedelta)
    543 
    544     with close_on_error(store):
--> 545         ds = maybe_decode_store(store)
    546 
    547     # Ensure source filename always stored in dataset object (GH issue #2550)

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/api.py in maybe_decode_store(store, lock)
    449 
    450     def maybe_decode_store(store, lock=False):
--> 451         ds = conventions.decode_cf(
    452             store,
    453             mask_and_scale=mask_and_scale,

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/conventions.py in decode_cf(obj, concat_characters, mask_and_scale, decode_times, decode_coords, drop_variables, use_cftime, decode_timedelta)
    598         decode_timedelta=decode_timedelta,
    599     )
--> 600     ds = Dataset(vars, attrs=attrs)
    601     ds = ds.set_coords(coord_names.union(extra_coords).intersection(vars))
    602     ds._file_obj = file_obj

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/dataset.py in __init__(self, data_vars, coords, attrs)
    541             coords = coords.variables
    542 
--> 543         variables, coord_names, dims, indexes, _ = merge_data_and_coords(
    544             data_vars, coords, compat="broadcast_equals"
    545         )

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/merge.py in merge_data_and_coords(data, coords, compat, join)
    465     explicit_coords = coords.keys()
    466     indexes = dict(_extract_indexes_from_coords(coords))
--> 467     return merge_core(
    468         objects, compat, join, explicit_coords=explicit_coords, indexes=indexes
    469     )

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/merge.py in merge_core(objects, compat, join, combine_attrs, priority_arg, explicit_coords, indexes, fill_value)
    592         coerced, join=join, copy=False, indexes=indexes, fill_value=fill_value
    593     )
--> 594     collected = collect_variables_and_indexes(aligned)
    595 
    596     prioritized = _get_priority_vars_and_indexes(aligned, priority_arg, compat=compat)

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/merge.py in collect_variables_and_indexes(list_of_mappings)
    276                 append_all(coords, indexes)
    277 
--> 278             variable = as_variable(variable, name=name)
    279             if variable.dims == (name,):
    280                 variable = variable.to_index_variable()

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/variable.py in as_variable(obj, name)
    158                 "dimensions." % (name, obj.dims)
    159             )
--> 160         obj = obj.to_index_variable()
    161 
    162     return obj

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/variable.py in to_index_variable(self)
    524     def to_index_variable(self):
    525         """Return this variable as an xarray.IndexVariable"""
--> 526         return IndexVariable(
    527             self.dims, self._data, self._attrs, encoding=self._encoding, fastpath=True
    528         )

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/variable.py in __init__(self, dims, data, attrs, encoding, fastpath)
   2345         # Unlike in Variable, always eagerly load values into memory
   2346         if not isinstance(self._data, PandasIndexAdapter):
-> 2347             self._data = PandasIndexAdapter(self._data)
   2348 
   2349     def __dask_tokenize__(self):

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __init__(self, array, dtype)
   1387 
   1388     def __init__(self, array: Any, dtype: DTypeLike = None):
-> 1389         self.array = utils.safe_cast_to_index(array)
   1390         if dtype is None:
   1391             if isinstance(array, pd.PeriodIndex):

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/utils.py in safe_cast_to_index(array)
    102         if hasattr(array, "dtype") and array.dtype.kind == "O":
    103             kwargs["dtype"] = object
--> 104         index = pd.Index(np.asarray(array), **kwargs)
    105     return _maybe_cast_to_cftimeindex(index)
    106 

/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order)
     81 
     82     """
---> 83     return array(a, dtype, copy=False, order=order)
     84 
     85 

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    558     def __array__(self, dtype=None):
    559         array = as_indexable(self.array)
--> 560         return np.asarray(array[self.key], dtype=None)
    561 
    562     def transpose(self, order):

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in __getitem__(self, key)
     26 
     27     def __getitem__(self, key):
---> 28         return indexing.explicit_indexing_adapter(
     29             key, self.shape, indexing.IndexingSupport.OUTER_1VECTOR, self._getitem
     30         )

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in explicit_indexing_adapter(key, shape, indexing_support, raw_indexing_method)
    843     """
    844     raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
--> 845     result = raw_indexing_method(raw_key.tuple)
    846     if numpy_indices.tuple:
    847         # index the loaded np.ndarray

/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in _getitem(self, key)
     36         with self.datastore.lock:
     37             array = self.get_array(needs_lock=False)
---> 38             return array[key]
     39 
     40 

/srv/conda/envs/notebook/lib/python3.8/site-packages/h5netcdf/core.py in __getitem__(self, key)
    144 
    145     def __getitem__(self, key):
--> 146         return self._h5ds[key]
    147 
    148     def __setitem__(self, key, value):

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

/srv/conda/envs/notebook/lib/python3.8/site-packages/h5py/_hl/dataset.py in __getitem__(self, args)
    571         mspace = h5s.create_simple(mshape)
    572         fspace = selection.id
--> 573         self.id.read(mspace, fspace, arr, mtype, dxpl=self._dxpl)
    574 
    575         # Patch up the output for NumPy

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/h5d.pyx in h5py.h5d.DatasetID.read()

h5py/_proxy.pyx in h5py._proxy.dset_rw()

h5py/_proxy.pyx in h5py._proxy.H5PY_H5Dread()

h5py/defs.pyx in h5py.defs.H5Dread()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in readinto(self, b)
   1407         """
   1408         out = memoryview(b).cast("B")
-> 1409         data = self.read(out.nbytes)
   1410         out[: len(data)] = data
   1411         return len(data)

/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in read(self, length)
   1392             length = self.size - self.loc
   1393         if self.closed:
-> 1394             raise ValueError("I/O operation on closed file.")
   1395         logger.debug("%s read: %i - %i" % (self, self.loc, self.loc + length))
   1396         if length == 0:

ValueError: I/O operation on closed file.

@martindurant
Copy link
Member

In that case you take responsibility for the file

openfile = fsspec.open(url, mode='rb').open()
dsgcs = xr.open_dataset(openfile, chunks=3000)
...
openfile.close()

@rabernat
Copy link
Contributor Author

rabernat commented Nov 18, 2020

I don't understand what you mean.

Surely you understand what I want to do here: open the file and compute on it with a distributed cluster. If I am doing it wrong, please tell me the recommended way to do this.

@rabernat
Copy link
Contributor Author

I thought that fsspec was just dispatching to gcsfs based on url matching. Help me understand why that is not the case.

I would also still be interested in getting an answer to this question.

@scottyhq
Copy link

I just came across this while trying various opening options via intake-xarray (intake/intake-xarray#88).

In that case you take responsibility for the file

@martindurant. I'm struggling with how best to do this for the intake-xarray case. If I understand correctly the issue is needing to use a context manager to have access to OpenFile methods, right? But how do we do that in the case of intake-xarray, given the current syntax? This issue is not unique to GCSFS, it also applies to S3 and HTTP - the following snippet leads to the same traceback in the first comment:

import intake
uri = 's3://its-live-data.jpl.nasa.gov/icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
ds = intake.open_netcdf(uri, 
                        xarray_kwargs=dict(group='gt1l/land_ice_segments', engine='h5netcdf'),
                        storage_options=dict(anon=True)
                       ).to_dask()
print(ds.h_li.mean())
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/api.py in _get_engine_from_magic_number(filename_or_obj)
    113         magic_number = filename_or_obj[:8]
    114     else:
--> 115         if filename_or_obj.tell() != 0:
    116             raise ValueError(
    117                 "file-like object read/write pointer not at zero "

AttributeError: 'OpenFile' object has no attribute 'tell'

@scottyhq
Copy link

scottyhq commented Nov 22, 2020

A partial workaround is using simplecache::, but that only works for a LocalCluster, not a distributed cluster like dask gateway:

import intake
uri = 'simplecache::s3://its-live-data.jpl.nasa.gov/icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
ds = intake.open_netcdf(uri, 
                        chunks=dict(delta_time=20000),
                        xarray_kwargs=dict(group='gt1l/land_ice_segments', engine='h5netcdf'),
                        storage_options=dict(s3={'anon': True}, #default_cache_type='all',
                                             #simplecache=dict(cache_storage="/tmp/atl06", same_names=True),
                                            )
                       ).to_dask()

# GatewayCluster
with Client(cluster) as client:
    result = ds['h_li'].mean().compute()
/srv/conda/envs/notebook/lib/python3.8/site-packages/h5py/_hl/files.py in make_fid()
    171         if swmr and swmr_support:
    172             flags |= h5f.ACC_SWMR_READ
--> 173         fid = h5f.open(name, flags, fapl=fapl)
    174     elif mode == 'r+':
    175         fid = h5f.open(name, h5f.ACC_RDWR, fapl=fapl)

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/h5f.pyx in h5py.h5f.open()

OSError: Unable to open file (unable to open file: name = '/tmp/tmphulzch2c/96968a7bb03c66de5724914b4116a866819162a33560f08134284e08f670ad38', errno = 2, error message = 'No such file or directory', flags = 0, o_flags = 0)

One possibility is using a globally available S3:// scratch location for the cache: simplecache=dict(cache_storage="s3://pangeo-scratch/tmp/atl06"). But it does seem like AttributeError: 'OpenFile' object has no attribute 'tell' could be addressed in some way to avoid needing caching...

@martindurant
Copy link
Member

Was this situation working before? I expect that xarray's open with fsspec paths of file objects must have been working for some time, so I'm not sure why this is different. We should figure out why intake-xarray is apparently getting this wrong.

One possibility is using a globally available S3:// scratch location for the cache

I wouldn't mind making this happen - most of the code would stay the same, but we would need to be careful to to read/write cache metadata often.

@scottyhq
Copy link

scottyhq commented Nov 23, 2020

Was this situation working before? I expect that xarray's open with fsspec paths of file objects must have been working for some time, so I'm not sure why this is different. We should figure out why intake-xarray is apparently getting this wrong.

I'm not sure. I'm not sure remote HDF or NetCDF files without OpenDAP was really explored or tested before. I opened a PR in intake-xarray to explore possible fixes intake/intake-xarray#93

I wouldn't mind making this happen - most of the code would stay the same, but we would need to be careful to to read/write cache metadata often.

Seems like this could be particularly powerful, especially for rechunker or pangeo-forge conversion workflows for datasets on legacy servers.

To avoid getting too side-tracked with intake from the original issue here, I think this is the key question, what is the difference between using fsspec.open() (doesn't work) versus s3fs.open() or gcsfs.open() (works!) with xarray:

import xarray as xr
import fsspec
import s3fs

# Works
s3 = s3fs.S3FileSystem(anon=True)
url = 's3://its-live-data.jpl.nasa.gov/icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
openfile = s3.open(url) 
ds = xr.open_dataset(openfile, group='gt1l/land_ice_segments', engine='h5netcdf', chunks={})

# AttributeError: 'OpenFile' object has no attribute 'tell'
url = 's3://its-live-data.jpl.nasa.gov/icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
openfile = fsspec.open(url) 
ds = xr.open_dataset(openfile, group='gt1l/land_ice_segments', engine='h5netcdf', chunks={})

@martindurant
Copy link
Member

martindurant commented Nov 23, 2020

what is the difference between using fsspec.open() (doesn't work) versus s3fs.open() or gcsfs.open() (works!) with xarray

fsspec.open('s3://...').open() == s3.open('s3://...')

fsspec.open('s3://...') -> OpenFile, which is a context manager that produces the underlying file-like object in a with clause. I can also wrap that file-like in a text or compression wrapper - or indeed other file-systems like simplecache; all are flushed and closed when exiting the context. It is designed for serialisation.

The file-like objects are also context managers: they are auto-closed when leaving the context.

@tdhopper
Copy link

tdhopper commented Feb 3, 2021

I'm facing what I think is the same issue. The simplecache:: workaround also seems to work here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants