Skip to content

Commit

Permalink
feat(gcsfs/core): support storage_emulator_host for http requests (#424)
Browse files Browse the repository at this point in the history
* feat(gcsfs/core): support storage_emulator_host for http requests

* Start to migrate tests

* Move from vcr to emulator

* stop

* enable fuse test

* make fuse optional again

* all pass

* type-oh

* lint

* removed wrong import

* update testing docs

Co-authored-by: Martin Durant <martin.durant@utoronto.ca>
  • Loading branch information
awill1988 and Martin Durant committed Oct 22, 2021
1 parent 2720d29 commit 92fcbc1
Show file tree
Hide file tree
Showing 87 changed files with 1,149 additions and 75,235 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ jobs:
- name: Run Tests
shell: bash -l {0}
run: |
export GCSFS_RECORD_MODE=none
export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/gcsfs/tests/fake-secret.json
py.test -vv gcsfs
Expand Down
57 changes: 16 additions & 41 deletions docs/source/developer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,19 @@ Please file issues and requests on github_ and we welcome pull requests.

.. _github: https://github.com/dask/gcsfs/issues

Testing and VCR
---------------

VCR_ records requests to the remote server, so that they can be replayed during
tests - so long as the requests match exactly the original. It is set to strip
out sensitive information before writing the request and responses into yaml
files in the tests/recordings/ directory; the file-name matches the test name,
so all tests must have unique names, across all test files.

The process is as follows:

- Create a bucket for testing
- Set environment variables so that the tests run against your GCS
credentials, and recording occurs

.. code-block:: bash
export GCSFS_RECORD_MODE=all
export GCSFS_TEST_PROJECT='...'
export GCSFS_TEST_BUCKET='...' # the bucket from step 1 (without gs:// prefix).
export GCSFS_GOOGLE_TOKEN=~/.config/gcloud/application_default_credentials.json
py.test -vv -x -s gcsfs
If ~/.config/gcloud/application_default_credentials.json file does not exist,
run ``gcloud auth application-default login``
These variables can also be set in ``gcsfs/tests/settings.py``

- Run this again, setting ``GCSFS_RECORD_MODE=once``, which should alert you
if your tests make different requests the second time around

- Finally, test as TravisCI will, using only the recordings

.. code-block:: bash
export GCSFS_RECORD_MODE=none
py.test -vv -x -s gcsfs
To reset recording and start again, delete the yaml file corresponding to the
test in ``gcsfs/tests/recordings/*.yaml``.

.. _VCR: https://vcrpy.readthedocs.io/en/latest/
Testing
-------

The testing framework supports using your own GCS-compliant endpoint, by
setting the "STORAGE_EMULATOR_HOST" environment variable. If this is
not set, then an emulator will be spun up using ``docker`` and
`fake-gcs-server`_. This emulator has almost all the functionality of
real GCS. A small number of tests run differently or are skipped.

If you want to actually test against real GCS, then you should set
STORAGE_EMULATOR_HOST to "https://storage.googleapis.com" and also
provide appropriate GCSFS_TEST_BUCKET and GCSFS_TEST_PROJECT, as well
as setting your default google credentials (or providing them via the
fsspec config).

.. _fake-gcs-server: https://github.com/fsouza/fake-gcs-server
134 changes: 92 additions & 42 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
GCS_MAX_BLOCK_SIZE = 2 ** 28
DEFAULT_BLOCK_SIZE = 5 * 2 ** 20


QUOTE_TABLE = str.maketrans(
{
"%": "%25",
Expand Down Expand Up @@ -93,6 +94,22 @@ async def _req_to_text(r):
return (await r.read()).decode()


def _location():
"""
Resolves GCS HTTP location as http[s]://host
Enables storage emulation for integration tests.
Returns
-------
valid http location
"""
_emulator_location = os.environ.get("STORAGE_EMULATOR_HOST", None)
return (
_emulator_location if _emulator_location else "https://storage.googleapis.com"
)


class GCSFileSystem(AsyncFileSystem):
r"""
Connect to Google Cloud Storage.
Expand Down Expand Up @@ -192,11 +209,15 @@ class GCSFileSystem(AsyncFileSystem):
session_kwargs: dict
passed on to aiohttp.ClientSession; can contain, for example,
proxy settings.
endpoin_url: str
If given, use this URL (format protocol://host:port , *without* any
path part) for communication. If not given, defaults to the value
of environment variable "STORAGE_EMULATOR_HOST"; if that is not set
either, will use the standard Google endpoint.
"""

scopes = {"read_only", "read_write", "full_control"}
retries = 6 # number of retries on http failure
base = "https://storage.googleapis.com/storage/v1/"
default_block_size = DEFAULT_BLOCK_SIZE
protocol = "gcs", "gs"
async_impl = True
Expand All @@ -217,6 +238,7 @@ def __init__(
session_kwargs=None,
loop=None,
timeout=None,
endpoint_url=None,
**kwargs,
):
super().__init__(
Expand All @@ -238,6 +260,7 @@ def __init__(
self.requests_timeout = requests_timeout
self.timeout = timeout
self._session = None
self._endpoint = endpoint_url
self.session_kwargs = session_kwargs or {}

self.credentials = GoogleCredentials(project, access, token, check_connection)
Expand All @@ -248,6 +271,14 @@ def __init__(
)
weakref.finalize(self, self.close_session, self.loop, self._session)

@property
def _location(self):
return self._endpoint or _location()

@property
def base(self):
return f"{self._location}/storage/v1/"

@property
def project(self):
return self.credentials.project
Expand Down Expand Up @@ -380,6 +411,14 @@ def _process_object(bucket, object_metadata):

return result

async def _make_bucket_requester_pays(self, path, state=True):
# this is really some form of setACL/chmod
# perhaps should be automatic if gcs.requester_pays
json = {"billing": {"requesterPays": state}}
await self._call("PATCH", f"b/{path}", json=json)

make_bucket_requester_pays = sync_wrapper(_make_bucket_requester_pays)

async def _get_object(self, path):
"""Return object information at the given path."""
bucket, key = self.split_path(path)
Expand Down Expand Up @@ -407,7 +446,7 @@ async def _get_object(self, path):
if not str(e).startswith("Forbidden"):
raise
resp = await self._call(
"GET", "b/{}/o/", bucket, json_out=True, prefix=key, maxResults=1
"GET", "b/{}/o", bucket, json_out=True, prefix=key, maxResults=1
)
for item in resp.get("items", []):
if item["name"] == key:
Expand Down Expand Up @@ -467,7 +506,7 @@ async def _do_list_objects(self, path, max_results=None, delimiter="/", prefix="
items = []
page = await self._call(
"GET",
"b/{}/o/",
"b/{}/o",
bucket,
delimiter=delimiter,
prefix=prefix,
Expand All @@ -482,7 +521,7 @@ async def _do_list_objects(self, path, max_results=None, delimiter="/", prefix="
while next_page_token is not None:
page = await self._call(
"GET",
"b/{}/o/",
"b/{}/o",
bucket,
delimiter=delimiter,
prefix=prefix,
Expand All @@ -503,7 +542,7 @@ async def _list_buckets(self):
"""Return list of all buckets under the current project."""
if "" not in self.dircache:
items = []
page = await self._call("GET", "b/", project=self.project, json_out=True)
page = await self._call("GET", "b", project=self.project, json_out=True)

assert page["kind"] == "storage#buckets"
items.extend(page.get("items", []))
Expand All @@ -512,7 +551,7 @@ async def _list_buckets(self):
while next_page_token is not None:
page = await self._call(
"GET",
"b/",
"b",
project=self.project,
pageToken=next_page_token,
json_out=True,
Expand Down Expand Up @@ -569,7 +608,7 @@ async def _mkdir(
return
await self._call(
method="POST",
path="b/",
path="b",
predefinedAcl=acl,
project=self.project,
predefinedDefaultObjectAcl=default_acl,
Expand Down Expand Up @@ -601,6 +640,10 @@ async def _rmdir(self, bucket):
async def _info(self, path, **kwargs):
"""File information about this path."""
path = self._strip_protocol(path).rstrip("/")
if "/" not in path:
out = await self._call("GET", f"b/{path}", json_out=True)
out.update(size=0, type="directory")
return out
# Check directory cache for parent dir
parent_path = self._parent(path)
parent_cache = self._ls_from_cache(parent_path)
Expand Down Expand Up @@ -667,13 +710,12 @@ async def _ls(self, path, detail=False, prefix="", **kwargs):
else:
return sorted([o["name"] for o in out])

@classmethod
def url(cls, path):
def url(self, path):
""" Get HTTP URL of the given path """
u = "https://storage.googleapis.com/download/storage/v1/b/{}/o/{}?alt=media"
bucket, object = cls.split_path(path)
u = "{}/download/storage/v1/b/{}/o/{}?alt=media"
bucket, object = self.split_path(path)
object = quote_plus(object)
return u.format(bucket, object)
return u.format(self._location, bucket, object)

async def _cat_file(self, path, start=None, end=None):
""" Simple one-shot get of file data """
Expand Down Expand Up @@ -781,12 +823,11 @@ async def _cp_file(self, path1, path2, acl=None, **kwargs):
json_out=True,
)

async def _rm_file(self, path):
async def _rm_file(self, path, **kwargs):
bucket, key = self.split_path(path)
if key:
await self._call("DELETE", "b/{}/o/{}", bucket, key)
self.invalidate_cache(posixpath.dirname(self._strip_protocol(path)))
return True
else:
await self._rmdir(path)

Expand All @@ -812,7 +853,7 @@ async def _rm_files(self, paths):
)
headers, content = await self._call(
"POST",
"https://storage.googleapis.com/batch/storage/v1",
f"{self._location}/batch/storage/v1",
headers={
"Content-Type": 'multipart/mixed; boundary="=========='
'=====7330845974216740156=="'
Expand All @@ -832,20 +873,38 @@ async def _rm_files(self, paths):
out = set(re.findall(pattern, txt))
raise OSError(out)

@property
def on_google(self):
return "torage.googleapis.com" in self._location

async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20):
paths = await self._expand_path(path, recursive=recursive, maxdepth=maxdepth)
files = [p for p in paths if self.split_path(p)[1]]
dirs = [p for p in paths if not self.split_path(p)[1]]
exs = await asyncio.gather(
*(
[
self._rm_files(files[i : i + batchsize])
for i in range(0, len(files), batchsize)
]
),
return_exceptions=True,
)
exs = [ex for ex in exs if ex is not None and "No such object" not in str(ex)]
if self.on_google:
# emulators do not support batch
exs = await asyncio.gather(
*(
[
self._rm_files(files[i : i + batchsize])
for i in range(0, len(files), batchsize)
]
),
return_exceptions=True,
)
else:
exs = await asyncio.gather(
*([self._rm_file(f) for f in files]),
return_exceptions=True,
)

exs = [
ex
for ex in exs
if ex is not None
and "No such object" not in str(ex)
and not isinstance(ex, FileNotFoundError)
]
if exs:
raise exs[0]
await asyncio.gather(*[self._rmdir(d) for d in dirs])
Expand Down Expand Up @@ -1030,6 +1089,8 @@ async def _get_file_request(

async def _get_file(self, rpath, lpath, callback=None, **kwargs):
u2 = self.url(rpath)
if os.path.isdir(lpath):
return
callback = callback or NoOpCallback()
await self._get_file_request(u2, lpath, callback=callback, **kwargs)

Expand Down Expand Up @@ -1203,7 +1264,7 @@ def info(self):

def url(self):
""" HTTP link to this file's data """
return self.details["mediaLink"]
return self.fs.url(self.path)

def _upload_chunk(self, final=False):
"""Write one part of a multi-block file upload
Expand Down Expand Up @@ -1255,7 +1316,7 @@ def _upload_chunk(self, final=False):
if "Range" in headers:
end = int(headers["Range"].split("-")[1])
shortfall = (self.offset + l - 1) - end
if shortfall:
if shortfall > 0:
self.checker.update(data[:-shortfall])
self.buffer = io.BytesIO(data[-shortfall:])
self.buffer.seek(shortfall)
Expand Down Expand Up @@ -1303,8 +1364,7 @@ def discard(self):
uid = re.findall("upload_id=([^&=?]+)", self.location)
self.gcsfs.call(
"DELETE",
"https://storage.googleapis.com/upload/storage/v1/b/%s/o"
"" % quote_plus(self.bucket),
f"{self.fs._location}/upload/storage/v1/b/{quote_plus(self.bucket)}/o",
params={"uploadType": "resumable", "upload_id": uid},
json_out=True,
)
Expand Down Expand Up @@ -1332,15 +1392,8 @@ def _fetch_range(self, start=None, end=None):
start, end : None or integers
if not both None, fetch only given range
"""
if start is not None or end is not None:
start = start or 0
end = end or 0
head = {"Range": "bytes=%i-%i" % (start, end - 1)}
else:
head = None
try:
_, data = self.gcsfs.call("GET", self.details["mediaLink"], headers=head)
return data
return self.gcsfs.cat_file(self.path, start=start, end=end)
except RuntimeError as e:
if "not satisfiable" in str(e):
return b""
Expand Down Expand Up @@ -1372,8 +1425,7 @@ async def initiate_upload(
j["metadata"] = metadata
headers, _ = await fs._call(
method="POST",
path="https://storage.googleapis.com/upload/storage"
"/v1/b/%s/o" % quote_plus(bucket),
path=f"{fs._location}/upload/storage/v1/b/{quote_plus(bucket)}/o",
uploadType="resumable",
json=j,
headers={"X-Upload-Content-Type": content_type},
Expand All @@ -1395,9 +1447,7 @@ async def simple_upload(
content_type="application/octet-stream",
):
checker = get_consistency_checker(consistency)
path = "https://storage.googleapis.com/upload/storage/v1/b/%s/o" % quote_plus(
bucket
)
path = f"{fs._location}/upload/storage/v1/b/{quote_plus(bucket)}/o"
metadata = {"name": key}
if metadatain is not None:
metadata["metadata"] = metadatain
Expand Down

0 comments on commit 92fcbc1

Please sign in to comment.