diff --git a/docs/source/api.rst b/docs/source/api.rst index cb14fe7e1..78b9c3594 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -47,7 +47,8 @@ Base Classes fsspec.core.OpenFiles fsspec.core.get_fs_token_paths fsspec.core.url_to_fs - fsspec.dircache.DirCache + fsspec.dircache.MemoryDirCache + fsspec.dircache.FileDirCache fsspec.FSMap fsspec.generic.GenericFileSystem fsspec.registry.register_implementation @@ -82,7 +83,10 @@ Base Classes .. autofunction:: fsspec.core.url_to_fs -.. autoclass:: fsspec.dircache.DirCache +.. autoclass:: fsspec.dircache.MemoryDirCache + :members: __init__ + +.. autoclass:: fsspec.dircache.FileDirCache :members: __init__ .. autoclass:: fsspec.FSMap diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index f5f30fdd9..abc383ff8 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,6 +1,13 @@ Changelog ========= +Dev +-------- + +Enhancements + +- add file-based listing cache using diskcache + 2024.3.1 -------- diff --git a/docs/source/features.rst b/docs/source/features.rst index 907084e0d..5e9aaef5b 100644 --- a/docs/source/features.rst +++ b/docs/source/features.rst @@ -181,15 +181,18 @@ Listings Caching ---------------- For some implementations, getting file listings (i.e., ``ls`` and anything that -depends on it) is expensive. These implementations use dict-like instances of -:class:`fsspec.dircache.DirCache` to manage the listings. - -The cache allows for time-based expiry of entries with the ``listings_expiry_time`` -parameter, or LRU expiry with the ``max_paths`` parameter. These can be -set on any implementation instance that uses listings caching; or to skip the -caching altogether, use ``use_listings_cache=False``. That would be appropriate -when the target location is known to be volatile because it is being written -to from other sources. +depends on it) is expensive. These implementations use either dict-like instances of +:class:`fsspec.dircache.MemoryDirCache` or file-based caching with instances of +:class:`fsspec.dircache.FileDirCache` to manage the listings. + +The type of cache that is used, can be controlled via the keyword ``listings_cache_type`` +that has to be one of `memdircache` or `filedircache`. The cache allows for time-based expiry +of entries with the ``listings_expiry_time`` parameter, or LRU expiry with the ``max_paths`` +parameter. These can be set on any implementation instance that uses listings caching; or to +skip the caching altogether, use ``use_listings_cache=False``. That would be appropriate +when the target location is known to be volatile because it is being written to from other +sources. If you want to use the file-based caching, you can also provide the argument +``listings_cache_location`` to determine where the cache file is stored. When the ``fsspec`` instance writes to the backend, the method ``invalidate_cache`` is called, so that subsequent listing of the given paths will force a refresh. In diff --git a/fsspec/asyn.py b/fsspec/asyn.py index a040efc4b..caf7b0897 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -312,7 +312,15 @@ class AsyncFileSystem(AbstractFileSystem): mirror_sync_methods = True disable_throttling = False - def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): + def __init__( + self, + *args, + asynchronous=False, + loop=None, + batch_size=None, + listings_cache_options=None, + **kwargs, + ): self.asynchronous = asynchronous self._pid = os.getpid() if not asynchronous: @@ -320,7 +328,7 @@ def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwar else: self._loop = None self.batch_size = batch_size - super().__init__(*args, **kwargs) + super().__init__(listings_cache_options, *args, **kwargs) @property def loop(self): diff --git a/fsspec/dircache.py b/fsspec/dircache.py index eca19566b..819adb8b1 100644 --- a/fsspec/dircache.py +++ b/fsspec/dircache.py @@ -1,9 +1,44 @@ +import logging import time from collections.abc import MutableMapping +from enum import Enum from functools import lru_cache +from pathlib import Path +from typing import Optional, Union +logger = logging.getLogger(__name__) -class DirCache(MutableMapping): + +class DisableDirCache(MutableMapping): + def __init__(self, *args, **kwargs): + pass + + def __getitem__(self, item): + raise KeyError + + def __setitem__(self, key, value): + pass + + def __delitem__(self, key): + pass + + def __iter__(self): + return iter(()) + + def __len__(self): + return 0 + + def clear(self): + pass + + def __contains__(self, item): + return False + + def __reduce__(self): + return (DisableDirCache, ()) + + +class MemoryDirCache(MutableMapping): """ Caching of directory listings, in a structure like:: @@ -26,19 +61,14 @@ class DirCache(MutableMapping): def __init__( self, - use_listings_cache=True, - listings_expiry_time=None, + expiry_time=None, max_paths=None, - **kwargs, ): """ Parameters ---------- - use_listings_cache: bool - If False, this cache never returns items, but always reports KeyError, - and setting items has no effect - listings_expiry_time: int or float (optional) + expiry_time: int or float (optional) Time in seconds that a listing is considered valid. If None, listings do not expire. max_paths: int (optional) @@ -49,15 +79,14 @@ def __init__( self._times = {} if max_paths: self._q = lru_cache(max_paths + 1)(lambda key: self._cache.pop(key, None)) - self.use_listings_cache = use_listings_cache - self.listings_expiry_time = listings_expiry_time - self.max_paths = max_paths + self._expiry_time = expiry_time + self._max_paths = max_paths def __getitem__(self, item): - if self.listings_expiry_time is not None: - if self._times.get(item, 0) - time.time() < -self.listings_expiry_time: + if self._expiry_time is not None: + if self._times.get(item, 0) - time.time() < -self._expiry_time: del self._cache[item] - if self.max_paths: + if self._max_paths: self._q(item) return self._cache[item] # maybe raises KeyError @@ -75,12 +104,10 @@ def __contains__(self, item): return False def __setitem__(self, key, value): - if not self.use_listings_cache: - return - if self.max_paths: + if self._max_paths: self._q(key) self._cache[key] = value - if self.listings_expiry_time is not None: + if self._expiry_time is not None: self._times[key] = time.time() def __delitem__(self, key): @@ -93,6 +120,99 @@ def __iter__(self): def __reduce__(self): return ( - DirCache, - (self.use_listings_cache, self.listings_expiry_time, self.max_paths), + MemoryDirCache, + (self._expiry_time, self._max_paths), + ) + + +class FileDirCache(MutableMapping): + def __init__( + self, + expiry_time: Optional[int], + directory: Optional[Path], + ): + """ + + Parameters + ---------- + expiry_time: int or float (optional) + Time in seconds that a listing is considered valid. If None, + listings do not expire. + directory: str (optional) + Directory path at which the listings cache file is stored. If None, + an autogenerated path at the user folder is created. + + """ + try: + import platformdirs + from diskcache import Cache + except ImportError as e: + raise ImportError( + "The optional dependencies ``platformdirs`` and ``diskcache`` are required for file-based dircache." + ) from e + + if not directory: + directory = platformdirs.user_cache_dir(appname="fsspec") + directory = Path(directory) / "dircache" / str(expiry_time) + + try: + directory.mkdir(exist_ok=True, parents=True) + except OSError as e: + logger.error(f"Directory for dircache could not be created at {directory}.") + raise e + else: + logger.info(f"Dircache located at {directory}.") + + self._expiry_time = expiry_time + self._directory = directory + self._cache = Cache(directory=str(directory)) + + def __getitem__(self, item): + """Draw item as fileobject from cache, retry if timeout occurs""" + return self._cache.get(key=item, read=True, retry=True) + + def clear(self): + self._cache.clear() + + def __len__(self): + return len(list(self._cache.iterkeys())) + + def __contains__(self, item): + value = self._cache.get(item, retry=True) # None, if expired + if value: + return True + return False + + def __setitem__(self, key, value): + self._cache.set(key=key, value=value, expire=self._expiry_time, retry=True) + + def __delitem__(self, key): + del self._cache[key] + + def __iter__(self): + return (k for k in self._cache.iterkeys() if k in self) + + def __reduce__(self): + return ( + FileDirCache, + (self._expiry_time, self._directory), ) + + +class CacheType(Enum): + DISABLE = DisableDirCache + MEMORY = MemoryDirCache + FILE = FileDirCache + + +def create_dircache( + cache_type: CacheType, + expiry_time: Optional[int], + **kwargs, +) -> Optional[Union[MemoryDirCache, FileDirCache]]: + cache_map = { + CacheType.DISABLE: DisableDirCache, + CacheType.MEMORY: MemoryDirCache, + CacheType.FILE: FileDirCache, + } + return cache_map[cache_type](expiry_time, **kwargs) diff --git a/fsspec/implementations/http.py b/fsspec/implementations/http.py index 4580764ce..d50e11702 100644 --- a/fsspec/implementations/http.py +++ b/fsspec/implementations/http.py @@ -58,6 +58,7 @@ def __init__( client_kwargs=None, get_client=get_client, encoded=False, + listings_cache_options=None, **storage_options, ): """ @@ -83,11 +84,19 @@ def __init__( A callable which takes keyword arguments and constructs an aiohttp.ClientSession. It's state will be managed by the HTTPFileSystem class. + listings_cache_options: dict + Options for the listings cache. storage_options: key-value Any other parameters passed on to requests cache_type, cache_options: defaults used in open """ - super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options) + super().__init__( + self, + asynchronous=asynchronous, + loop=loop, + listings_cache_options=listings_cache_options, + **storage_options, + ) self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE self.simple_links = simple_links self.same_schema = same_scheme @@ -104,10 +113,6 @@ def __init__( # TODO: Maybe rename `self.kwargs` to `self.request_options` to make # it clearer. request_options = copy(storage_options) - self.use_listings_cache = request_options.pop("use_listings_cache", False) - request_options.pop("listings_expiry_time", None) - request_options.pop("max_paths", None) - request_options.pop("skip_instance_cache", None) self.kwargs = request_options @property @@ -201,7 +206,7 @@ async def _ls_real(self, url, detail=True, **kwargs): return sorted(out) async def _ls(self, url, detail=True, **kwargs): - if self.use_listings_cache and url in self.dircache: + if url in self.dircache: out = self.dircache[url] else: out = await self._ls_real(url, detail=detail, **kwargs) diff --git a/fsspec/implementations/tests/test_http.py b/fsspec/implementations/tests/test_http.py index fdae51ff5..e01120f64 100644 --- a/fsspec/implementations/tests/test_http.py +++ b/fsspec/implementations/tests/test_http.py @@ -10,6 +10,7 @@ import fsspec.asyn import fsspec.utils +from fsspec.dircache import CacheType, FileDirCache, MemoryDirCache from fsspec.implementations.http import HTTPStreamFile from fsspec.tests.conftest import data, reset_files, server, win # noqa: F401 @@ -27,87 +28,157 @@ def test_list_invalid_args(server): def test_list_cache(server): - h = fsspec.filesystem("http", use_listings_cache=True) + h = fsspec.filesystem("http", listings_cache_options=True) + assert h.listings_cache_options == { + "cache_type": CacheType.MEMORY, + "expiry_time": None, + } + assert issubclass(h.dircache.__class__, MemoryDirCache) out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] -def test_list_cache_with_expiry_time_cached(server): - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=30) +@pytest.mark.parametrize("expiry_time", [None, 10]) +def test_list_cache_memory(server, expiry_time): + h = fsspec.filesystem( + "http", + listings_cache_options={"cache_type": "memory", "expiry_time": expiry_time}, + ) + assert h.listings_cache_options == { + "cache_type": CacheType.MEMORY, + "expiry_time": expiry_time, + } + assert issubclass(h.dircache.__class__, MemoryDirCache) + start = time.time() + out = h.glob(server + "/index/*") + normal_duration = time.time() - start + assert out == [server + "/index/realfile"] + # Verify cache content. + assert len(h.dircache) == 1 + start = time.time() + out = h.glob(server + "/index/*") + cached_duration = time.time() - start + assert out == [server + "/index/realfile"] + assert normal_duration / cached_duration > 1.5 - # First, the directory cache is not initialized. - assert not h.dircache - # By querying the filesystem with "use_listings_cache=True", - # the cache will automatically get populated. +@pytest.mark.parametrize("expiry_time", [None, 10]) +def test_list_cache_file(server, tmp_path, expiry_time): + h = fsspec.filesystem( + "http", + listings_cache_options={ + "cache_type": "file", + "expiry_time": expiry_time, + "directory": tmp_path, + }, + ) + assert h.listings_cache_options == { + "cache_type": CacheType.FILE, + "expiry_time": expiry_time, + "directory": tmp_path, + } + assert issubclass(h.dircache.__class__, FileDirCache) + h.dircache.clear() # Needed for filedircache + start = time.time() out = h.glob(server + "/index/*") + normal_duration = time.time() - start assert out == [server + "/index/realfile"] - # Verify cache content. assert len(h.dircache) == 1 - + start = time.time() out = h.glob(server + "/index/*") + cached_duration = time.time() - start assert out == [server + "/index/realfile"] + assert normal_duration / cached_duration > 1.5 + h.dircache.clear() # clean up -def test_list_cache_with_expiry_time_purged(server): - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=0.3) - +@pytest.mark.parametrize("listings_cache_type", ["memory", "file"]) +def test_list_cache_with_expiry_time_purged(server, listings_cache_type): + h = fsspec.filesystem( + "http", + listings_cache_options={ + "cache_type": listings_cache_type, + "expiry_time": 3, + }, + ) + expected_listings_cache_options = { + "cache_type": ( + CacheType.MEMORY if listings_cache_type == "memory" else CacheType.FILE + ), + "expiry_time": 3, + } + if listings_cache_type == "file": + expected_listings_cache_options["directory"] = None + assert h.listings_cache_options == expected_listings_cache_options + h.dircache.clear() # Needed for filedircache # First, the directory cache is not initialized. assert not h.dircache - # By querying the filesystem with "use_listings_cache=True", # the cache will automatically get populated. out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] assert len(h.dircache) == 1 - # Verify cache content. assert server + "/index/" in h.dircache assert len(h.dircache.get(server + "/index/")) == 1 - # Wait beyond the TTL / cache expiry time. - time.sleep(0.31) - + time.sleep(4) # Verify that the cache item should have been purged. cached_items = h.dircache.get(server + "/index/") assert cached_items is None - # Verify that after clearing the item from the cache, # it can get populated again. out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] cached_items = h.dircache.get(server + "/index/") assert len(cached_items) == 1 + h.dircache.clear() # clean up -def test_list_cache_reuse(server): - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=5) - +@pytest.mark.parametrize("listings_cache_type", ["memory", "file"]) +def test_list_cache_reuse(server, listings_cache_type): + h = fsspec.filesystem( + "http", + listings_cache_options={"cache_type": listings_cache_type, "expiry_time": 5}, + ) + expected_listings_cache_options = { + "cache_type": ( + CacheType.MEMORY if listings_cache_type == "memory" else CacheType.FILE + ), + "expiry_time": 5, + } + if listings_cache_type == "file": + expected_listings_cache_options["directory"] = None + assert h.listings_cache_options == expected_listings_cache_options + # Needed for filedircache + h.dircache.clear() # First, the directory cache is not initialized. assert not h.dircache - # By querying the filesystem with "use_listings_cache=True", # the cache will automatically get populated. out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] - # Verify cache content. assert len(h.dircache) == 1 - # Verify another instance without caching enabled does not have cache content. h = fsspec.filesystem("http", use_listings_cache=False) assert not h.dircache - # Verify that yet another new instance, with caching enabled, # will see the same cache content again. - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=5) + h = fsspec.filesystem( + "http", + listings_cache_options={"cache_type": listings_cache_type, "expiry_time": 5}, + ) assert len(h.dircache) == 1 - # However, yet another instance with a different expiry time will also not have # any valid cache content. - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=666) + h = fsspec.filesystem( + "http", + listings_cache_options={"cache_type": listings_cache_type, "expiry_time": 666}, + ) assert len(h.dircache) == 0 + h.dircache.clear() # clean up def test_ls_raises_filenotfound(server): @@ -123,12 +194,6 @@ def test_list_cache_with_max_paths(server): assert out == [server + "/index/realfile"] -def test_list_cache_with_skip_instance_cache(server): - h = fsspec.filesystem("http", use_listings_cache=True, skip_instance_cache=True) - out = h.glob(server + "/index/*") - assert out == [server + "/index/realfile"] - - def test_glob_return_subfolders(server): h = fsspec.filesystem("http") out = h.glob(server + "/simple/*") diff --git a/fsspec/spec.py b/fsspec/spec.py index 9a7e4e8b9..640c8081e 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -9,16 +9,17 @@ from errno import ESPIPE from glob import has_magic from hashlib import sha256 -from typing import ClassVar +from typing import ClassVar, Optional from .callbacks import DEFAULT_CALLBACK from .config import apply_config, conf -from .dircache import DirCache +from .dircache import create_dircache from .transaction import Transaction from .utils import ( _unstrip_protocol, glob_translate, isfilelike, + normalize_dir_cache_options, other_paths, read_block, stringify_path, @@ -115,7 +116,12 @@ class AbstractFileSystem(metaclass=_Cached): #: Extra *class attributes* that should be considered when hashing. _extra_tokenize_attributes = () - def __init__(self, *args, **storage_options): + def __init__( + self, + listings_cache_options: Optional[bool, dict] = None, + *args, + **storage_options, + ): """Create and configure file-system instance Instances may be cachable, so if similar enough arguments are seen @@ -128,10 +134,11 @@ def __init__(self, *args, **storage_options): Parameters ---------- - use_listings_cache, listings_expiry_time, max_paths: - passed to ``DirCache``, if the implementation supports - directory listing caching. Pass use_listings_cache=False - to disable such caching. + listings_cache_options: bool or dict + If True, a default MemoryDirCache cache is created. + If dict of arguments, used to create a directory cache using + argument cache_type ("memory" or "file"), expiry_time, and + other arguments passed to ``MemoryDirCache`` or ``FileDirCache``. skip_instance_cache: bool If this is a cachable implementation, pass True here to force creating a new instance even if a matching instance exists, and prevent @@ -146,7 +153,9 @@ def __init__(self, *args, **storage_options): self._intrans = False self._transaction = None self._invalidated_caches_in_transaction = [] - self.dircache = DirCache(**storage_options) + listings_cache_options = normalize_dir_cache_options(listings_cache_options) + self.listings_cache_options = listings_cache_options + self.dircache = create_dircache(**listings_cache_options) if storage_options.pop("add_docs", None): warnings.warn("add_docs is no longer supported.", FutureWarning) diff --git a/fsspec/tests/test_utils.py b/fsspec/tests/test_utils.py index 0efd5d91d..6f88a23f6 100644 --- a/fsspec/tests/test_utils.py +++ b/fsspec/tests/test_utils.py @@ -6,6 +6,7 @@ import pytest import fsspec.utils +from fsspec.dircache import CacheType from fsspec.utils import ( can_be_local, common_prefix, @@ -13,6 +14,7 @@ infer_storage_options, merge_offset_ranges, mirror_from, + normalize_dir_cache_options, other_paths, read_block, seek_delimiter, @@ -440,3 +442,49 @@ def test_size(): f = io.BytesIO(b"hello") assert fsspec.utils.file_size(f) == 5 assert f.tell() == 0 + + +@pytest.mark.parametrize("dir_cache_options", [None, False, {}]) +def test_normalize_dir_cache_options_disable(dir_cache_options): + assert normalize_dir_cache_options(dir_cache_options) == { + "cache_type": CacheType.DISABLE, + "expiry_time": None, + } + + +def test_normalize_dir_cache_options_enable(): + assert normalize_dir_cache_options(True) == { + "cache_type": CacheType.MEMORY, + "expiry_time": None, + } + + +def test_normalize_dir_cache_options_with_cache_type(): + assert normalize_dir_cache_options({"cache_type": CacheType.FILE}) == { + "cache_type": CacheType.FILE, + "directory": None, + "expiry_time": None, + } + + +def test_normalize_dir_cache_options_with_expiry_time(): + assert normalize_dir_cache_options({"expiry_time": 10}) == { + "cache_type": CacheType.MEMORY, + "expiry_time": 10, + } + + +def test_normalize_dir_cache_options_file_cache_with_directory(): + assert normalize_dir_cache_options( + {"cache_type": CacheType.FILE, "directory": "foo"} + ) == {"cache_type": CacheType.FILE, "directory": "foo", "expiry_time": None} + + +def test_normalize_dir_cache_options_invalid_cache_type(): + with pytest.raises(ValueError): + normalize_dir_cache_options({"cache_type": "invalid"}) + + +def test_normalize_dir_cache_options_invalid_expiry_time(): + with pytest.raises(ValueError): + normalize_dir_cache_options({"expiry_time": -1}) diff --git a/fsspec/utils.py b/fsspec/utils.py index ba3f80be3..2dac9eadc 100644 --- a/fsspec/utils.py +++ b/fsspec/utils.py @@ -18,17 +18,20 @@ Callable, Iterable, Iterator, + Optional, Sequence, TypeVar, + Union, ) from urllib.parse import urlsplit +from fsspec.dircache import CacheType + if TYPE_CHECKING: from typing_extensions import TypeGuard from fsspec.spec import AbstractFileSystem - DEFAULT_BLOCK_SIZE = 5 * 2**20 T = TypeVar("T") @@ -740,3 +743,40 @@ def glob_translate(pat): results.append(any_sep) res = "".join(results) return rf"(?s:{res})\Z" + + +def normalize_dir_cache_options(dir_cache_options: Optional[Union[bool, dict]]) -> dict: + default_dir_cache_options = { + "cache_type": CacheType.MEMORY, + "expiry_time": None, + } + if not dir_cache_options: + return {"cache_type": CacheType.DISABLE, "expiry_time": None} + elif dir_cache_options is True: + return default_dir_cache_options + else: + dir_cache_options = default_dir_cache_options | dir_cache_options + # disassemble and reassemble dir_cache_options + cache_type = dir_cache_options["cache_type"] + expiry_time = dir_cache_options["expiry_time"] + directory = dir_cache_options.get("directory") + try: + cache_type = CacheType(cache_type) + except ValueError: + try: + cache_type = CacheType[cache_type.upper()] + except KeyError as e: + raise ValueError( + f"Cache type must be one of {', '.join(ct.name.lower() for ct in CacheType)}" + ) from e + if expiry_time: + if expiry_time < 0: + raise ValueError("Expiry time must be positive") + expiry_time = int(expiry_time) if expiry_time else None + dir_cache_options = { + "cache_type": cache_type, + "expiry_time": expiry_time, + } + if cache_type == CacheType.FILE: + dir_cache_options["directory"] = directory + return dir_cache_options