Skip to content

Commit 3f837f8

Browse files
authoredMar 25, 2025··
Merge pull request #460 from euroargodev/lazy-ftp-open_dataset
Implement laziness with FTP
2 parents 3e692f5 + 0140ba2 commit 3f837f8

File tree

8 files changed

+93
-39
lines changed

8 files changed

+93
-39
lines changed
 

Diff for: ‎argopy/stores/filesystems.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
log = logging.getLogger("argopy.stores")
2121

22+
2223
try:
2324
from tqdm import tqdm
2425
except ModuleNotFoundError:
@@ -145,4 +146,4 @@ def new_fs(
145146
# log.warning(log_msg)
146147
log.debug(log_msg)
147148
# log_argopy_callerstack()
148-
return fs, cache_registry
149+
return fs, cache_registry, fsspec_kwargs

Diff for: ‎argopy/stores/implementations/ftp.py

+11-5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,14 @@ class ftpstore(httpstore):
2929

3030
protocol = "ftp"
3131

32+
@property
33+
def host(self):
34+
return self.fs.fs.host if self.fs.protocol == 'dir' else self.fs.host
35+
36+
@property
37+
def port(self):
38+
return self.fs.fs.port if self.fs.protocol == 'dir' else self.fs.port
39+
3240
def open_dataset(self, url: str,
3341
errors: Literal["raise", "ignore", "silent"] = "raise",
3442
lazy: bool = False,
@@ -140,9 +148,7 @@ def load_lazily(
140148

141149
if "ak" not in kwargs:
142150
self.ak = ArgoKerchunker()
143-
storage_options = {"host": self.fs.fs.host if self.fs.protocol == 'dir' else self.fs.host,
144-
"port": self.fs.fs.port if self.fs.protocol == 'dir' else self.fs.port}
145-
self.ak.storage_options = storage_options
151+
self.ak.storage_options = {"host": self.host, "port": self.port}
146152
else:
147153
self.ak = kwargs["ak"]
148154

@@ -163,7 +169,7 @@ def load_lazily(
163169
return "reference://", xr_opts
164170
else:
165171
warnings.warn(
166-
"This url does not support byte range requests so we cannot load it lazily, falling back on loading in memory."
172+
"This url does not support byte range requests so we cannot load it lazily, falling back on loading in memory.\n(url='%s')" % url
167173
)
168174
log.debug("This url does not support byte range requests: %s" % self.full_path(url))
169175
return load_in_memory(
@@ -187,7 +193,7 @@ def load_lazily(
187193

188194
if "source" not in ds.encoding:
189195
if isinstance(url, str):
190-
ds.encoding["source"] = url
196+
ds.encoding["source"] = self.full_path(url)
191197

192198
self.register(url)
193199
return ds

Diff for: ‎argopy/stores/implementations/gdac.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class gdacfs:
6969

7070
@staticmethod
7171
def path2protocol(path: Union[str, Path]) -> str:
72-
"""Narrow down any path to a supported protocols, raise GdacPathError if protocol not supported"""
72+
"""Narrow down any path to a supported protocol, raise GdacPathError if protocol not supported"""
7373
if isinstance(path, Path):
7474
return "file"
7575
else:

Diff for: ‎argopy/stores/implementations/http.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,6 @@ def load_lazily(
300300
self.ak = ArgoKerchunker()
301301
if self.protocol == 's3':
302302
storage_options = {'anon': not has_aws_credentials()}
303-
# log.debug(f"AWS credentials: {has_aws_credentials()}")
304303
else:
305304
storage_options = {}
306305
self.ak.storage_options = storage_options
@@ -349,7 +348,7 @@ def load_lazily(
349348

350349
if "source" not in ds.encoding:
351350
if isinstance(url, str):
352-
ds.encoding["source"] = url
351+
ds.encoding["source"] = self.full_path(url)
353352

354353
self.register(url)
355354
return ds

Diff for: ‎argopy/stores/kerchunker.py

+42-23
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from pathlib import Path
44
import json
55
import logging
6+
from packaging import version
67

78
from ..utils import to_list
89
from . import memorystore, filestore
@@ -199,19 +200,31 @@ def __repr__(self):
199200

200201
@property
201202
def store_path(self):
202-
"""Path to the reference store, including protocol"""
203+
"""Absolute path to the reference store, including protocol"""
203204
p = getattr(self.fs, "path", str(Path(".").absolute()))
204205
# Ensure the protocol is included for non-local files:
205-
if self.fs.fs.protocol[0] == "ftp":
206-
p = "ftp://" + self.fs.fs.host + fsspec.core.split_protocol(p)[-1]
207206
if self.fs.fs.protocol[0] == "s3":
208207
p = "s3://" + fsspec.core.split_protocol(p)[-1]
209208
return p
210209

211210
def _ncfile2jsfile(self, ncfile):
211+
"""Convert a netcdf file path to a data store file path for kerchunk zarr reference (json data)"""
212212
return Path(ncfile).name.replace(".nc", "_kerchunk.json")
213213

214+
def _ncfile2ncref(self, ncfile: Union[str, Path], fs=None):
215+
"""Convert a netcdf file path to a key used in internal kerchunk_references"""
216+
# return fs.full_path(fs.info(str(ncfile))['name'], protocol=True)
217+
return fs.full_path(str(ncfile), protocol=True)
218+
214219
def _magic2chunker(self, ncfile, fs):
220+
"""Get a netcdf file path chunker alias: 'cdf3' or 'hdf5'
221+
222+
This is based on the file binary magic value.
223+
224+
Raises
225+
------
226+
:class:`ValueError` if file not recognized
227+
"""
215228
magic = fs.open(ncfile).read(3)
216229
if magic == b"CDF":
217230
return "cdf3"
@@ -228,6 +241,11 @@ def nc2reference(
228241
):
229242
"""Compute reference data for a netcdf file (kerchunk json data)
230243
244+
This method is intended to be used internally, since it's not using the kerchunk reference store.
245+
246+
Users should rather use the :meth:`to_reference` method to avoid to recompute reference data
247+
when available on the :class:`ArgoKerchunker` instance.
248+
231249
Parameters
232250
----------
233251
ncfile : Union[str, Path]
@@ -245,24 +263,27 @@ def nc2reference(
245263
-------
246264
dict
247265
"""
248-
ncfile_raw = str(ncfile)
249266
chunker = self._magic2chunker(ncfile, fs) if chunker == "auto" else chunker
267+
ncfile_full = self._ncfile2ncref(ncfile, fs=fs)
268+
269+
storage_options = self.storage_options.copy()
270+
if fs.protocol == 'ftp' and version.parse(fsspec.__version__) < version.parse("2024.10.0"):
271+
# We need https://github.com/fsspec/filesystem_spec/pull/1673
272+
storage_options.pop('host', None)
273+
storage_options.pop('port', None)
250274

251-
ncfile_full = fs.full_path(ncfile_raw, protocol=True)
252-
# log.debug(f"Computing kerchunk json zarr references for: {ncfile_full}")
253-
# log.debug(f"Kerchunker storage options: {self.storage_options}")
254275
if chunker == "cdf3":
255276
chunks = NetCDF3ToZarr(
256277
ncfile_full,
257278
inline_threshold=self.inline_threshold,
258279
max_chunk_size=self.max_chunk_size,
259-
storage_options=self.storage_options,
280+
storage_options=storage_options,
260281
)
261282
elif chunker == "hdf5":
262283
chunks = SingleHdf5ToZarr(
263284
ncfile_full,
264285
inline_threshold=self.inline_threshold,
265-
storage_options=self.storage_options,
286+
storage_options=storage_options,
266287
)
267288

268289
kerchunk_data = chunks.translate()
@@ -272,7 +293,7 @@ def nc2reference(
272293
with self.fs.open(kerchunk_jsfile, "wb") as f:
273294
f.write(json.dumps(kerchunk_data).encode())
274295

275-
return ncfile, kerchunk_jsfile, kerchunk_data
296+
return ncfile_full, kerchunk_jsfile, kerchunk_data
276297

277298
def update_kerchunk_references_from_store(self):
278299
"""Load kerchunk data already on store"""
@@ -362,14 +383,12 @@ def translate(
362383
return results
363384

364385
def to_reference(self, ncfile: Union[str, Path], fs=None, overwrite: bool = False):
365-
"""Return zarr reference data for a given netcdf file
366-
367-
If data are found on the instance file store, load them, otherwise triggers :meth:`ArgoKerchunker.translate` and
368-
save data on instance file store.
386+
"""Return zarr reference data for a given netcdf file path
369387
370-
This is basically similar to :meth:`ArgoKerchunker.translate` but for a single file and **possibly using pre-computed references**.
388+
Return data from the instance store if available, otherwise trigger :meth:`ArgoKerchunker.translate` (which save
389+
data on the instance data store).
371390
372-
This is the method to use in **argopy** file store method :meth:`ArgoStoreProto.open_dataset` to implement laziness.
391+
This is the method to use in **argopy** file store methods :meth:`ArgoStoreProto.open_dataset` to implement laziness.
373392
374393
Parameters
375394
----------
@@ -388,16 +407,16 @@ def to_reference(self, ncfile: Union[str, Path], fs=None, overwrite: bool = Fals
388407
"""
389408
if overwrite:
390409
self.translate(ncfile, fs=fs)
391-
elif str(ncfile) not in self.kerchunk_references:
410+
elif self._ncfile2ncref(ncfile, fs=fs) not in self.kerchunk_references:
392411
if self.fs.exists(self._ncfile2jsfile(ncfile)):
393412
self.kerchunk_references.update(
394-
{str(ncfile): self._ncfile2jsfile(ncfile)}
413+
{self._ncfile2ncref(ncfile, fs=fs): self._ncfile2jsfile(ncfile)}
395414
)
396415
else:
397416
self.translate(ncfile, fs=fs)
398417

399418
# Read and load the kerchunk JSON file:
400-
kerchunk_jsfile = self.kerchunk_references[str(ncfile)]
419+
kerchunk_jsfile = self.kerchunk_references[self._ncfile2ncref(ncfile, fs=fs)]
401420
with self.fs.open(kerchunk_jsfile, "r") as file:
402421
kerchunk_data = json.load(file)
403422

@@ -406,18 +425,18 @@ def to_reference(self, ncfile: Union[str, Path], fs=None, overwrite: bool = Fals
406425
target_ok = False
407426
for key, value in kerchunk_data["refs"].items():
408427
if key not in [".zgroup", ".zattrs"] and "0." in key:
409-
if value[0] == ncfile:
428+
if value[0] == self._ncfile2ncref(ncfile, fs=fs):
410429
target_ok = True
411430
break
412431
if not target_ok:
413432
kerchunk_data = self.to_reference(ncfile, overwrite=True, fs=fs)
414433

415434
return kerchunk_data
416435

417-
def pprint(self, ncfile: Union[str, Path], params: List[str] = None):
436+
def pprint(self, ncfile: Union[str, Path], params: List[str] = None, fs=None):
418437
"""Pretty print kerchunk json data for a netcdf file"""
419438
params = to_list(params) if params is not None else []
420-
kerchunk_data = self.to_reference(ncfile)
439+
kerchunk_data = self.to_reference(ncfile, fs=fs)
421440

422441
# Pretty print JSON data
423442
keys_to_select = [".zgroup", ".zattrs", ".zmetadata"]
@@ -463,4 +482,4 @@ def supported(self, ncfile: Union[str, Path], fs=None) -> bool:
463482
return fs.first(ncfile) is not None
464483
except Exception:
465484
log.debug(f"Could not read {ncfile} with {fs}")
466-
return False
485+
return False

Diff for: ‎argopy/stores/spec.py

+20-4
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
import pickle # nosec B403 only used with internal files/assets
88
import json
99
import tempfile
10+
import aiohttp
1011
from typing import Union
1112
from pathlib import Path
12-
import aiohttp
13+
import logging
1314

1415

1516
from ..options import OPTIONS
@@ -20,6 +21,9 @@
2021
from .filesystems import new_fs
2122

2223

24+
log = logging.getLogger("argopy.stores.spec")
25+
26+
2327
class ArgoStoreProto(ABC):
2428
"""Argo File System Prototype
2529
@@ -45,7 +49,7 @@ def __init__(self, cache: bool = False, cachedir: str = "", **kwargs):
4549
self.cache = cache
4650
self.cachedir = OPTIONS["cachedir"] if cachedir == "" else cachedir
4751
self._fsspec_kwargs = {**kwargs}
48-
self.fs, self.cache_registry = new_fs(
52+
self.fs, self.cache_registry, self._fsspec_kwargs = new_fs(
4953
self.protocol, self.cache, self.cachedir, **self._fsspec_kwargs
5054
)
5155

@@ -115,9 +119,19 @@ def is_read(uri):
115119
else:
116120
return None
117121

118-
def expand_path(self, path):
122+
def expand_path(self, path, **kwargs):
123+
"""Turn one or more globs or directories into a list of all matching paths to files or directories.
124+
125+
For http store, return path unchanged (not implemented).
126+
127+
kwargs are passed to fsspec expand_path which call ``glob`` or ``find``, which may in turn call ``ls``.
128+
129+
Returns
130+
-------
131+
list
132+
"""
119133
if self.protocol != "http" and self.protocol != "https":
120-
return self.fs.expand_path(path)
134+
return self.fs.expand_path(path, **kwargs)
121135
else:
122136
return [path]
123137

@@ -139,6 +153,8 @@ def full_path(self, path, protocol: bool = False):
139153
140154
"""
141155
fp = getattr(self.fs, '_join', lambda x: x)(path)
156+
if self.protocol == 'ftp':
157+
fp = f"{self.host}:{self.port}{self.fs._strip_protocol(fp)}"
142158
if not protocol:
143159
return fp
144160
else:

Diff for: ‎argopy/tests/test_stores_fsspec.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,12 @@
5555
class Test_new_fs:
5656

5757
def test_default(self):
58-
fs, cache_registry = new_fs()
58+
fs, cache_registry, fsspec_kwargs = new_fs()
5959
assert id_implementation(fs) is not None
6060
assert is_initialised(cache_registry)
6161

6262
def test_cache_type(self):
63-
fs, cache_registry = new_fs(cache=True)
63+
fs, cache_registry, fsspec_kwargs = new_fs(cache=True)
6464
assert id_implementation(fs) == ['filecache']
6565

6666

Diff for: ‎docs/whats-new.rst

+14-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,19 @@ What's New
77

88
|pypi dwn| |conda dwn|
99

10+
Coming up next
11+
--------------
12+
13+
Internals
14+
^^^^^^^^^
15+
16+
- **Open netcdf files lazily from ftp server**. Adding to s3 and http, we now support laziness with ftp using `kerchunk <https://fsspec.github.io/kerchunk/>`_. Checkout the dedicated :ref:`lazy` section of the documentation. (:pr:`460`) by |gmaze|.
17+
18+
Energy
19+
^^^^^^
20+
21+
|eqco2_since_last_release|
22+
1023

1124
v1.1.0 (18 March 2025)
1225
----------------------
@@ -103,7 +116,7 @@ Internals
103116
Energy
104117
^^^^^^
105118
106-
|eqco2_since_last_release|
119+
.. image:: https://img.shields.io/badge/Total%20carbon%20emitted%20by%20release%20v1.1.0%20%5BgCO2eq%5D-1328.99-black?style=plastic&labelColor=grey
107120
108121
109122
v1.0.0 (16 Oct. 2024)

0 commit comments

Comments
 (0)
Please sign in to comment.