-
Notifications
You must be signed in to change notification settings - Fork 140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Per-directory metadata cache. #57
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a good start to me. There is quite a lot of code here, so the proof will have to be in the testing - but I generally agree with the approach.
I feel like there are more private functions and dict creation/extracting than necessary, but this is probably a style question, and maybe your way makes testing easier.
gcsfs/core.py
Outdated
@@ -64,19 +65,33 @@ def quote_plus(s): | |||
return s | |||
|
|||
|
|||
def norm_path(path): | |||
"""Canonicalize path by split and rejoining.""" | |||
# TODO Should canonical path include protocol? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking, we should strip the protocol as early as possible within this library.
@@ -159,6 +174,9 @@ class GCSFileSystem(object): | |||
(see description of authentication methods, above) | |||
consistency: 'none', 'size', 'md5' | |||
Check method when writing files. Can be overridden in open(). | |||
cache_timeout: float, seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this idea.
items.extend(page.get("items", [])) | ||
next_page_token = page.get('nextPageToken', None) | ||
|
||
result = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the dict? As far as I can see, the only place that this is used, we immediately pick out the 'items' key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is repacking the result in the form of a de-paginated view of the standard GCS object listing. The prefixes
list it used later to generated pseudo-directory listings for ls
and info
calls.
gcsfs/core.py
Outdated
@@ -389,7 +554,6 @@ def mkdir(self, bucket, acl='projectPrivate', | |||
predefinedDefaultObjectAcl=default_acl, | |||
json={"name": bucket}) | |||
self.invalidate_cache(bucket) | |||
self.invalidate_cache('') | |||
|
|||
def rmdir(self, bucket): | |||
"""Delete an empty bucket""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder, if you delete the last key within a given prefix, which calls invalidate cache on the parent, do we expect the apparent directory to disappear?
e.g.,
gcs.ls('bucket/')
['bucket/thing/']
gcs.ls('bucket/thing/')
['bucket/thing/key']
gcs.rm('bucket/thing/key')
gcs.ls('bucket/')
[] # directory should be gone
gcsfs/core.py
Outdated
@@ -398,65 +562,77 @@ def rmdir(self, bucket): | |||
for v in self.dirs[''][:]: | |||
if v['name'] == bucket: | |||
self.dirs[''].remove(v) | |||
self.dirs.pop(bucket, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do want to remove the entry from a cached bucket listing, no? Also, the references to dirs
should be renamed anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see - you expect invalidate, below, to do this. Still true about dirs
.
gcsfs/core.py
Outdated
if not bucket: | ||
raise ValueError('Cannot walk all of GCS') | ||
raise ValueError( | ||
"walk path must include target bucket: %s" % path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
path
is always empty here, so it is not very useful to report it. "Path must include at least a bucket" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've included it in the logging in case there is some kind of malformed input string.
gcsfs/core.py
Outdated
path = '/'.join([bucket, prefix]) | ||
files = self._list_bucket(bucket) | ||
|
||
if path.endswith('/'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So by convention directories end with '/' and files do not? The user may expect walk('bucket/path')
to get files below 'bucket/path/' too; also actual keys may end with '/', although I am not sure how that gets listed with the delimited. Should be a test for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This the current semantic. Walks targeting bucket/key
will walk all objects below bucket/key/
.
self._list_bucket(bucket) | ||
# Bucket may be present & viewable, but not owned by | ||
# the current project. Attempt to list. | ||
self._list_objects(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exists will be True whether path points to a directory or a file?
gcsfs/core.py
Outdated
# Return a pseudo dir for the bucket root | ||
return { | ||
'bucket': bucket, | ||
'name': "/", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be what a directory entry looks like, but in user-facing methods, the name shoulod be expanded, in this case to 'bucket/'
@@ -586,9 +788,9 @@ def rm(self, path, recursive=False): | |||
for p in self.walk(path): | |||
self.rm(p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you happen to know if there is a bulk-delete option in GCS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be very nice if this were the case. Some of @jhamman 's benchmarks with Zarr spend a lot of time removing tiny files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately the GCS api doesn't have a bulk-delete operation. There are a number of possibilities to speed up object deletion. The easiest, and what's implemented in gsutil, would be to issue a number of concurrent delete requests. You probably have a better grounding with if/how this should be integrated into existing async runloop, but requests-futures would be an easy, standalone solution.
Refactor gcsfs to list file contents via prefixed bucket listing, rather than cached exhaustive bucket listing. In progress, but provides basic interface compatibility for walk, glob, ls, info. Intended to support re-addition of metadata caching via the _list_objects interface to provide prefix-specific listing caches. Update `info` to retrieve object info via object get. Add per-directory listing cache to GCSFS caching object metadata under the given directory. Resolves listing requests via cache, supporting walk/ls/glob/etc. Resolve `info` requests via cache if the parent directory has been listed, otherwise directly request object data. Updates cache invalidation logic to function on path prefixes, allowing object writes to invalidate their parent/sibling caches, rather than entire listing cache.
Merged this branch into my more-fuse for this, added the following to GCSFS's init:
and called as
(the message saying |
Reposted from gitter: This may be an osx-specific behaviour. I think, if the parent directory is in the cache, trying to do Your comment |
On my branch
|
@asford , are you meaning to add more here, or is it only the testing that remains outstanding? |
Sorry for the slow updates on this, the dissertation has sunk it's claws into me... I'm not intending on pushing any more logic into the pull, it's mostly just testing updates. I'll rebase 1526f51 into a separate pull with an associated issue. |
No worries. This is an example of the currently failing case (raises exception on current master)
|
b18f83a
to
94d9bfc
Compare
Add decorator-based method tracing to `gcsfuse.GCSFS` and `core.GCSFileSystem` interface methods. Add `--verbose` command-line option to `gcsfuse` to support debug logging control.
Prototype `per_dir_cache` integration for gcsfuse. Minimal fixup to gcsfuse to support directory listing.
Fix error in GCSFS::read() cache key resolution.
Resolve error when writing small partitions via dask.bag.to_textfiles. Error occurs when partition size is below minimum GCS multipart upload size. Close logic in dask.bytes.core calls flush(force=False), followed by flush(force=True) on GCSFile. Current logic initializes multipart upload on non-force flush and attempts to write a non-final block below the minimum GCS upload block size. Fixup logic to skip flush if buffer size is below minimum upload size on non-forced flush. This, incidentally, avoids initialization of multipart upload in cases where final file size will be below the minimum block size, which was resulting in duplicate uploads for small output partitions. Add tracing logic to GCSFile file operations for debugging. Update `_tracemethod` to perform, optional, traceback logging at `DEBUG-1` log level.
Updates `ls` to return non-prefix separated prefix search, needs to be verified? Should this be glob-like? Fix error from dask.bytes when read-only file is flushed. Fixup returning listing with "path" attribute.
Retry on requests failing due to `google.auth.exceptions.RefreshError`, partial resolution of fsspec#71.
e4e24b4
to
2ccf856
Compare
Resolve error when writing small partitions via dask.bag.to_textfiles when partition size is below minimum GCS multipart upload size. Close logic in dask.bytes.core calls flush(force=False), followed by flush(force=True) on GCSFile. Current logic initializes multipart upload on non-force flush and then attempts to write a non-final block below the minimum GCS upload block size. Fixup logic to skip flush if buffer size is below minimum upload size on non-forced flush and instead issue a warning. This, incidentally, avoids initialization of multipart upload in cases where final file size will be below the minimum block size, which was resulting in duplicate uploads for small output partitions. Update core.py to lift GCS block size limits into module level constants. Replace use of constants in core.py with symbolic names.
From fsspec#73 review. Defer multipart upload if a simple upload may be at the specified block size on non-forced flush. Minor reorganization of `flush` logic to group error handling vs deferral. Relax block size restrictions on fetch, no longer aligning `range`-ed fetch requests to block boundaries. Fix minor logging error in `_fetch`.
Noting issues from pangeo-data/pangeo#112:
|
Adds explict flag to control stacktrace debugging for traced methods. Reduces log size on test failures.
Agreed and updated. |
@martindurant This is now is a tests-passing state. I've expanded to GCSFileSystem docstring to include the updated object details semantics. |
@asford , at some point we floated the idea of restricting the fields that are pulled down with list_objects; we can do that in a future PR (e.g., |
It should be doable. I think we should maintain a cache of the raw results from GCS, which is what this pull implements, and then process the cached results as needed to produce the limited listings. |
From review, cleanup `walk` implementation. Fix pseudodir creation on bucket-level `info` call. Remove `norm_path` todo.
Soryr, you still seem to end up with a conflict - I expect it's small. |
`flush` on an open, but read-only, file should be a no-op, not raise a ValueError, compare to builtin `open("read_only", "r").flush()`. Updates `flush` logic and adds test case covering flush behavior.
I believe this is essentially ready for final review. I'm going to run some more integration-style testing in my environment, and I think there's some testing to be done on pangeo-data/pangeo#112. |
@martindurant Would you prefer to have this rebased to cleanup the commit history or merged as-is? |
I am happy to leave the commit history as is, whichever you prefer. |
Great! I'm then +1 to merge. |
@asford , having made this big effort, would you like to become a committer on this repo? |
Sounds great! I'd be glad to lend a hand in keeping this feature working; I suspect we'll find a few more bugs in the future. |
+1 ! |
I've just sent @asford an invitation to join. Welcome @asford , we're lucky to have you! @martindurant I've also just set it so that you have admin rights. |
glob
logic to restrict search to subdirs and prefixes.@martindurant Work in progress solution for #24 and #21, likely supersedes #22. Would you mind taking a quick look at this pull for a sanity check? I've performed some initial integration and manual testing in my deployment and this implementation appears to resolve the primary performance issues I've encountered.
Refactors the
GCSFileSystem
to operate on a per-directory object metadata cache, rather than a full-bucket cache, to support file reads in buckets with multiple directory structures. This resolves performance issues due to full-bucket listing when reading a subset of keys from a bucket or whenglob
ing within a subdirectory of the bucket.