Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap cache-file-management operations for CachingFileSystem in thread locks #1127

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 80 additions & 73 deletions fsspec/implementations/cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tempfile
import time
from shutil import rmtree
from threading import RLock

from fsspec import AbstractFileSystem, filesystem
from fsspec.callbacks import _DEFAULT_CALLBACK
Expand Down Expand Up @@ -120,6 +121,7 @@ def __init__(
if isinstance(target_protocol, str)
else (fs.protocol if isinstance(fs.protocol, str) else fs.protocol[0])
)
self.lock = RLock()
self.load_cache()
self.fs = fs if fs is not None else filesystem(target_protocol, **self.kwargs)

Expand All @@ -134,62 +136,64 @@ def _mkcache(self):

def load_cache(self):
"""Read set of stored blocks from file"""
cached_files = []
for storage in self.storage:
fn = os.path.join(storage, "cache")
if os.path.exists(fn):
with open(fn, "rb") as f:
# TODO: consolidate blocks here
loaded_cached_files = pickle.load(f)
for c in loaded_cached_files.values():
if isinstance(c["blocks"], list):
c["blocks"] = set(c["blocks"])
cached_files.append(loaded_cached_files)
else:
cached_files.append({})
self._mkcache()
self.cached_files = cached_files or [{}]
self.last_cache = time.time()
with self.lock:
cached_files = []
for storage in self.storage:
fn = os.path.join(storage, "cache")
if os.path.exists(fn):
with open(fn, "rb") as f:
# TODO: consolidate blocks here
loaded_cached_files = pickle.load(f)
for c in loaded_cached_files.values():
if isinstance(c["blocks"], list):
c["blocks"] = set(c["blocks"])
cached_files.append(loaded_cached_files)
else:
cached_files.append({})
self._mkcache()
self.cached_files = cached_files or [{}]
self.last_cache = time.time()

def save_cache(self):
"""Save set of stored blocks from file"""
fn = os.path.join(self.storage[-1], "cache")
# TODO: a file lock could be used to ensure file does not change
# between re-read and write; but occasional duplicated reads ok.
cache = self.cached_files[-1]
if os.path.exists(fn):
with open(fn, "rb") as f:
cached_files = pickle.load(f)
for k, c in cached_files.items():
if k in cache:
if c["blocks"] is True or cache[k]["blocks"] is True:
c["blocks"] = True
else:
# self.cached_files[*][*]["blocks"] must continue to
# point to the same set object so that updates
# performed by MMapCache are propagated back to
# self.cached_files.
blocks = cache[k]["blocks"]
blocks.update(c["blocks"])
c["blocks"] = blocks
c["time"] = max(c["time"], cache[k]["time"])
c["uid"] = cache[k]["uid"]

# Files can be added to cache after it was written once
for k, c in cache.items():
if k not in cached_files:
cached_files[k] = c
else:
cached_files = cache
cache = {k: v.copy() for k, v in cached_files.items()}
for c in cache.values():
if isinstance(c["blocks"], set):
c["blocks"] = list(c["blocks"])
self._mkcache()
with atomic_write(fn) as f:
pickle.dump(cache, f)
self.cached_files[-1] = cached_files
self.last_cache = time.time()
with self.lock:
if os.path.exists(fn):
with open(fn, "rb") as f:
cached_files = pickle.load(f)
for k, c in cached_files.items():
if k in cache:
if c["blocks"] is True or cache[k]["blocks"] is True:
c["blocks"] = True
else:
# self.cached_files[*][*]["blocks"] must continue to
# point to the same set object so that updates
# performed by MMapCache are propagated back to
# self.cached_files.
blocks = cache[k]["blocks"]
blocks.update(c["blocks"])
c["blocks"] = blocks
c["time"] = max(c["time"], cache[k]["time"])
c["uid"] = cache[k]["uid"]

# Files can be added to cache after it was written once
for k, c in cache.items():
if k not in cached_files:
cached_files[k] = c
else:
cached_files = cache
cache = {k: v.copy() for k, v in cached_files.items()}
for c in cache.values():
if isinstance(c["blocks"], set):
c["blocks"] = list(c["blocks"])
self._mkcache()
with atomic_write(fn) as f:
pickle.dump(cache, f)
self.cached_files[-1] = cached_files
self.last_cache = time.time()

def _check_cache(self):
"""Reload caches if time elapsed or any disappeared"""
Expand Down Expand Up @@ -228,8 +232,9 @@ def clear_cache(self):
In the case of multiple cache locations, this clears only the last one,
which is assumed to be the read/write one.
"""
rmtree(self.storage[-1])
self.load_cache()
with self.lock:
rmtree(self.storage[-1])
self.load_cache()

def clear_expired_cache(self, expiry_time=None):
"""Remove all expired files and metadata from the cache
Expand All @@ -248,26 +253,27 @@ def clear_expired_cache(self, expiry_time=None):
if not expiry_time:
expiry_time = self.expiry

self._check_cache()

for path, detail in self.cached_files[-1].copy().items():
if time.time() - detail["time"] > expiry_time:
if self.same_names:
basename = os.path.basename(detail["original"])
fn = os.path.join(self.storage[-1], basename)
else:
fn = os.path.join(self.storage[-1], detail["fn"])
if os.path.exists(fn):
os.remove(fn)
self.cached_files[-1].pop(path)
with self.lock:
self._check_cache()

if self.cached_files[-1]:
cache_path = os.path.join(self.storage[-1], "cache")
with atomic_write(cache_path) as fc:
pickle.dump(self.cached_files[-1], fc)
else:
rmtree(self.storage[-1])
self.load_cache()
for path, detail in self.cached_files[-1].copy().items():
if time.time() - detail["time"] > expiry_time:
if self.same_names:
basename = os.path.basename(detail["original"])
fn = os.path.join(self.storage[-1], basename)
else:
fn = os.path.join(self.storage[-1], detail["fn"])
if os.path.exists(fn):
os.remove(fn)
self.cached_files[-1].pop(path)

if self.cached_files[-1]:
cache_path = os.path.join(self.storage[-1], "cache")
with atomic_write(cache_path) as fc:
pickle.dump(self.cached_files[-1], fc)
else:
rmtree(self.storage[-1])
self.load_cache()

def pop_from_cache(self, path):
"""Remove cached version of given file
Expand All @@ -283,9 +289,10 @@ def pop_from_cache(self, path):
_, fn = details
if fn.startswith(self.storage[-1]):
# is in in writable cache
os.remove(fn)
self.cached_files[-1].pop(path)
self.save_cache()
with self.lock:
os.remove(fn)
self.cached_files[-1].pop(path)
self.save_cache()
else:
raise PermissionError(
"Can only delete cached file in last, writable cache location"
Expand Down