From 19f314be144a873dffa469880b45b4692963dea6 Mon Sep 17 00:00:00 2001 From: Peter Bull Date: Sun, 18 Dec 2022 13:15:53 -0800 Subject: [PATCH] Use list_objects_v2 API for S3 (#302) * Initial work * Fix drive-level exists checks * perf measurement * implement listing with list_objects_v2 * formatting * make tests pass * update makefile documentation * revert rig skipping * Update Makefile Co-authored-by: Jay Qi <2721979+jayqi@users.noreply.github.com> * extract out default containers and buckets * remove dotenv from runner * Move google import to avoid import issues * Add pip caching and test independent installs * missing run keys * skip slow oses to test new steps * add notification dependency * install build tools * activate env * Instantiate properties first * Allow azure to instantiate * oses back in and debug commands removed Co-authored-by: Jay Qi <2721979+jayqi@users.noreply.github.com> --- .github/workflows/tests.yml | 56 +++++++- .gitignore | 5 + Makefile | 12 +- cloudpathlib/azure/azblobclient.py | 4 + cloudpathlib/client.py | 2 +- cloudpathlib/gs/gsclient.py | 10 ++ cloudpathlib/s3/s3client.py | 72 ++++++---- requirements-dev.txt | 5 + setup.cfg | 2 +- tests/conftest.py | 23 ++-- tests/mock_clients/mock_azureblob.py | 14 +- tests/mock_clients/mock_gs.py | 17 ++- tests/mock_clients/mock_s3.py | 40 +++++- tests/performance/__init__.py | 0 tests/performance/cli.py | 120 +++++++++++++++++ tests/performance/perf_file_listing.py | 5 + tests/performance/runner.py | 177 +++++++++++++++++++++++++ tests/test_cloudpath_file_io.py | 14 +- 18 files changed, 524 insertions(+), 54 deletions(-) create mode 100644 tests/performance/__init__.py create mode 100644 tests/performance/cli.py create mode 100644 tests/performance/perf_file_listing.py create mode 100644 tests/performance/runner.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 224ba120..292ba669 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -18,9 +18,10 @@ jobs: - uses: actions/checkout@v2 - name: Set up Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: 3.8 + cache: 'pip' # caching pip dependencies - name: Install dependencies run: | @@ -44,9 +45,10 @@ jobs: - uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} + cache: 'pip' # caching pip dependencies - name: Install dependencies run: | @@ -82,9 +84,10 @@ jobs: - uses: actions/checkout@v2 - name: Set up Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: 3.8 + cache: 'pip' # caching pip dependencies - name: Install dependencies run: | @@ -119,9 +122,54 @@ jobs: file: ./coverage.xml fail_ci_if_error: true + extras-test: + name: Test independent installs of clients + needs: tests + runs-on: ubuntu-latest + strategy: + matrix: + extra: ["s3", "azure", "gs"] + include: + - prefix: "s3" + extra: "s3" + - prefix: "az" + extra: "azure" + - prefix: "gs" + extra: "gs" + + steps: + - uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: 3.8 + cache: "pip" + + - name: Build cloudpathlib + run: | + pip install --upgrade pip wheel setuptools + make dist # build cloudpathlib wheel + + - name: Create empty venv + run: | + python -m venv ${{ matrix.extra }}-env + + - name: Install cloudpathlib[${{ matrix.extra }}] + run: | + source ${{ matrix.extra }}-env/bin/activate + pip install "$(find dist -name 'cloudpathlib*.whl')[${{ matrix.extra }}]" + + - name: Test ${{ matrix.extra }} usage + run: | + source ${{ matrix.extra }}-env/bin/activate + python -c 'from cloudpathlib import CloudPath; CloudPath("${{ matrix.prefix }}://bucket/test")' + env: + AZURE_STORAGE_CONNECTION_STRING: ${{ secrets.AZURE_STORAGE_CONNECTION_STRING }} + notify: name: Notify failed build - needs: [code-quality, tests, live-tests] + needs: [code-quality, tests, live-tests, extras-test] if: failure() && github.event.pull_request == null runs-on: ubuntu-latest steps: diff --git a/.gitignore b/.gitignore index bd43885e..71a74a49 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,9 @@ docs/docs/command-reference/*.md docs/docs/index.md docs/docs/changelog.md +# perf output +perf-results.csv + ## GitHub Python .gitignore ## # https://github.com/github/gitignore/blob/master/Python.gitignore @@ -76,6 +79,7 @@ target/ # Jupyter Notebook .ipynb_checkpoints +scratchpad.ipynb # pyenv .python-version @@ -88,6 +92,7 @@ celerybeat-schedule # dotenv .env +.gscreds.json # virtualenv .venv diff --git a/Makefile b/Makefile index 27952618..b39cb15c 100644 --- a/Makefile +++ b/Makefile @@ -51,7 +51,7 @@ docs: clean-docs docs-setup ## build the static version of the docs docs-serve: clean-docs docs-setup ## serve documentation to livereload while you work cd docs && mkdocs serve -format: +format: ## run black to format codebase black cloudpathlib tests docs help: @@ -60,7 +60,7 @@ help: install: clean ## install the package to the active Python's site-packages python setup.py install -lint: ## check style with flake8 +lint: ## check style with black, flake8, and mypy black --check cloudpathlib tests docs flake8 cloudpathlib tests docs mypy cloudpathlib @@ -71,11 +71,17 @@ release: dist ## package and upload a release release-test: dist twine upload --repository pypitest dist/* -reqs: +reqs: ## install development requirements pip install -U -r requirements-dev.txt test: ## run tests with mocked cloud SDKs python -m pytest -vv +test-debug: ## rerun tests that failed in last run and stop with pdb at failures + python -m pytest -n=0 -vv --lf --pdb + test-live-cloud: ## run tests on live cloud backends USE_LIVE_CLOUD=1 python -m pytest -vv + +perf: ## run performance measurement suite for s3 and save results to perf-results.csv + python tests/performance/cli.py s3 --save-csv=perf-results.csv \ No newline at end of file diff --git a/cloudpathlib/azure/azblobclient.py b/cloudpathlib/azure/azblobclient.py index d2e3648c..1f738ef0 100644 --- a/cloudpathlib/azure/azblobclient.py +++ b/cloudpathlib/azure/azblobclient.py @@ -142,6 +142,10 @@ def _is_file_or_dir(self, cloud_path: AzureBlobPath) -> Optional[str]: return None def _exists(self, cloud_path: AzureBlobPath) -> bool: + # short circuit when only the container + if not cloud_path.blob: + return self.service_client.get_container_client(cloud_path.container).exists() + return self._is_file_or_dir(cloud_path) in ["file", "dir"] def _list_dir( diff --git a/cloudpathlib/client.py b/cloudpathlib/client.py index 5b2dc12c..db7ae6a3 100644 --- a/cloudpathlib/client.py +++ b/cloudpathlib/client.py @@ -31,9 +31,9 @@ def __init__( local_cache_dir: Optional[Union[str, os.PathLike]] = None, content_type_method: Optional[Callable] = mimetypes.guess_type, ): + self._cache_tmp_dir = None self._cloud_meta.validate_completeness() # setup caching and local versions of file and track if it is a tmp dir - self._cache_tmp_dir = None if local_cache_dir is None: self._cache_tmp_dir = TemporaryDirectory() local_cache_dir = self._cache_tmp_dir.name diff --git a/cloudpathlib/gs/gsclient.py b/cloudpathlib/gs/gsclient.py index 25c47055..8b66d737 100644 --- a/cloudpathlib/gs/gsclient.py +++ b/cloudpathlib/gs/gsclient.py @@ -4,6 +4,7 @@ from pathlib import Path, PurePosixPath from typing import Any, Callable, Dict, Iterable, Optional, TYPE_CHECKING, Tuple, Union + from ..client import Client, register_client_class from ..cloudpath import implementation_registry from .gspath import GSPath @@ -12,6 +13,7 @@ if TYPE_CHECKING: from google.auth.credentials import Credentials + from google.api_core.exceptions import NotFound from google.auth.exceptions import DefaultCredentialsError from google.cloud.storage import Client as StorageClient @@ -135,6 +137,14 @@ def _is_file_or_dir(self, cloud_path: GSPath) -> Optional[str]: return None def _exists(self, cloud_path: GSPath) -> bool: + # short-circuit the root-level bucket + if not cloud_path.blob: + try: + next(self.client.bucket(cloud_path.bucket).list_blobs()) + return True + except NotFound: + return False + return self._is_file_or_dir(cloud_path) in ["file", "dir"] def _list_dir(self, cloud_path: GSPath, recursive=False) -> Iterable[Tuple[GSPath, bool]]: diff --git a/cloudpathlib/s3/s3client.py b/cloudpathlib/s3/s3client.py index 62d92500..72ac8573 100644 --- a/cloudpathlib/s3/s3client.py +++ b/cloudpathlib/s3/s3client.py @@ -143,12 +143,19 @@ def _is_file_or_dir(self, cloud_path: S3Path) -> Optional[str]: return "dir" def _exists(self, cloud_path: S3Path) -> bool: + # check if this is a bucket + if not cloud_path.key: + try: + self.client.head_bucket(Bucket=cloud_path.bucket) + return True + except ClientError: + return False + return self._s3_file_query(cloud_path) is not None def _s3_file_query(self, cloud_path: S3Path): """Boto3 query used for quick checks of existence and if path is file/dir""" - # first check if this is an object that we can access directly - + # check if this is an object that we can access directly try: obj = self.s3.Object(cloud_path.bucket, cloud_path.key) obj.load() @@ -169,40 +176,55 @@ def _s3_file_query(self, cloud_path: S3Path): ) def _list_dir(self, cloud_path: S3Path, recursive=False) -> Iterable[Tuple[S3Path, bool]]: - bucket = self.s3.Bucket(cloud_path.bucket) - prefix = cloud_path.key if prefix and not prefix.endswith("/"): prefix += "/" yielded_dirs = set() - if recursive: - for o in bucket.objects.filter(Prefix=prefix): - # get directory from this path - for parent in PurePosixPath(o.key[len(prefix) :]).parents: - # if we haven't surfaced their directory already - if parent not in yielded_dirs and str(parent) != ".": - yield (self.CloudPath(f"s3://{cloud_path.bucket}/{prefix}{parent}"), True) - yielded_dirs.add(parent) + paginator = self.client.get_paginator("list_objects_v2") - yield (self.CloudPath(f"s3://{o.bucket_name}/{o.key}"), False) - else: - # non recursive is best done with old client API rather than resource - paginator = self.client.get_paginator("list_objects") - - for result in paginator.paginate( - Bucket=cloud_path.bucket, Prefix=prefix, Delimiter="/" - ): - # sub directory names - for result_prefix in result.get("CommonPrefixes", []): + for result in paginator.paginate( + Bucket=cloud_path.bucket, Prefix=prefix, Delimiter=("" if recursive else "/") + ): + # yield everything in common prefixes as directories + for result_prefix in result.get("CommonPrefixes", []): + canonical = result_prefix.get("Prefix").rstrip("/") # keep a canonical form + if canonical not in yielded_dirs: + yield ( + self.CloudPath(f"s3://{cloud_path.bucket}/{canonical}"), + True, + ) + yielded_dirs.add(canonical) + + # check all the keys + for result_key in result.get("Contents", []): + # yield all the parents of any key that have not been yielded already + o_relative_path = result_key.get("Key")[len(prefix) :] + for parent in PurePosixPath(o_relative_path).parents: + parent_canonical = prefix + str(parent).rstrip("/") + if parent_canonical not in yielded_dirs and str(parent) != ".": + yield ( + self.CloudPath(f"s3://{cloud_path.bucket}/{parent_canonical}"), + True, + ) + yielded_dirs.add(parent_canonical) + + # if we already yielded this dir, go to next item in contents + canonical = result_key.get("Key").rstrip("/") + if canonical in yielded_dirs: + continue + + # s3 fake directories have 0 size and end with "/" + if result_key.get("Key").endswith("/") and result_key.get("Size") == 0: yield ( - self.CloudPath(f"s3://{cloud_path.bucket}/{result_prefix.get('Prefix')}"), + self.CloudPath(f"s3://{cloud_path.bucket}/{canonical}"), True, ) + yielded_dirs.add(canonical) - # files in the directory - for result_key in result.get("Contents", []): + # yield object as file + else: yield ( self.CloudPath(f"s3://{cloud_path.bucket}/{result_key.get('Key')}"), False, diff --git a/requirements-dev.txt b/requirements-dev.txt index 49c7123c..115aa945 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -5,6 +5,7 @@ flake8 ipytest ipython jupyter +loguru matplotlib mike mkdocs>=1.2.2 @@ -19,8 +20,12 @@ pydantic pytest pytest-cases pytest-cov +pytest-xdist python-dotenv pywin32; sys_platform == 'win32' +rich shortuuid tabulate +tqdm +typer wheel diff --git a/setup.cfg b/setup.cfg index 04281486..128298a7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,7 +21,7 @@ ignore_errors = True [tool:pytest] testpaths = tests/ -addopts = --cov=cloudpathlib --cov-report=term --cov-report=html --cov-report=xml +addopts = --cov=cloudpathlib --cov-report=term --cov-report=html --cov-report=xml -n=auto [coverage:report] include = cloudpathlib/**.py diff --git a/tests/conftest.py b/tests/conftest.py index 4e84e0e1..b818cdff 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,9 +25,12 @@ ) import cloudpathlib.azure.azblobclient import cloudpathlib.s3.s3client -from .mock_clients.mock_azureblob import mocked_client_class_factory -from .mock_clients.mock_gs import mocked_client_class_factory as mocked_gsclient_class_factory -from .mock_clients.mock_s3 import mocked_session_class_factory +from .mock_clients.mock_azureblob import mocked_client_class_factory, DEFAULT_CONTAINER_NAME +from .mock_clients.mock_gs import ( + mocked_client_class_factory as mocked_gsclient_class_factory, + DEFAULT_GS_BUCKET_NAME, +) +from .mock_clients.mock_s3 import mocked_session_class_factory, DEFAULT_S3_BUCKET_NAME if os.getenv("USE_LIVE_CLOUD") == "1": @@ -93,7 +96,7 @@ def create_test_dir_name(request) -> str: @fixture() def azure_rig(request, monkeypatch, assets_dir): - drive = os.getenv("LIVE_AZURE_CONTAINER", "container") + drive = os.getenv("LIVE_AZURE_CONTAINER", DEFAULT_CONTAINER_NAME) test_dir = create_test_dir_name(request) live_server = os.getenv("USE_LIVE_CLOUD") == "1" @@ -144,7 +147,7 @@ def azure_rig(request, monkeypatch, assets_dir): @fixture() def gs_rig(request, monkeypatch, assets_dir): - drive = os.getenv("LIVE_GS_BUCKET", "bucket") + drive = os.getenv("LIVE_GS_BUCKET", DEFAULT_GS_BUCKET_NAME) test_dir = create_test_dir_name(request) live_server = os.getenv("USE_LIVE_CLOUD") == "1" @@ -191,7 +194,7 @@ def gs_rig(request, monkeypatch, assets_dir): @fixture() def s3_rig(request, monkeypatch, assets_dir): - drive = os.getenv("LIVE_S3_BUCKET", "bucket") + drive = os.getenv("LIVE_S3_BUCKET", DEFAULT_S3_BUCKET_NAME) test_dir = create_test_dir_name(request) live_server = os.getenv("USE_LIVE_CLOUD") == "1" @@ -243,7 +246,7 @@ def custom_s3_rig(request, monkeypatch, assets_dir): - CEPH (https://ceph.io/ceph-storage/object-storage/) - others """ - drive = os.getenv("CUSTOM_S3_BUCKET", "bucket") + drive = os.getenv("CUSTOM_S3_BUCKET", DEFAULT_S3_BUCKET_NAME) test_dir = create_test_dir_name(request) custom_endpoint_url = os.getenv("CUSTOM_S3_ENDPOINT", "https://s3.us-west-1.drivendatabws.com") @@ -307,7 +310,7 @@ def custom_s3_rig(request, monkeypatch, assets_dir): @fixture() def local_azure_rig(request, monkeypatch, assets_dir): - drive = os.getenv("LIVE_AZURE_CONTAINER", "container") + drive = os.getenv("LIVE_AZURE_CONTAINER", DEFAULT_CONTAINER_NAME) test_dir = create_test_dir_name(request) # copy test assets @@ -333,7 +336,7 @@ def local_azure_rig(request, monkeypatch, assets_dir): @fixture() def local_gs_rig(request, monkeypatch, assets_dir): - drive = os.getenv("LIVE_GS_BUCKET", "bucket") + drive = os.getenv("LIVE_GS_BUCKET", DEFAULT_GS_BUCKET_NAME) test_dir = create_test_dir_name(request) # copy test assets @@ -358,7 +361,7 @@ def local_gs_rig(request, monkeypatch, assets_dir): @fixture() def local_s3_rig(request, monkeypatch, assets_dir): - drive = os.getenv("LIVE_S3_BUCKET", "bucket") + drive = os.getenv("LIVE_S3_BUCKET", DEFAULT_S3_BUCKET_NAME) test_dir = create_test_dir_name(request) # copy test assets diff --git a/tests/mock_clients/mock_azureblob.py b/tests/mock_clients/mock_azureblob.py index 6861f5d3..281562fb 100644 --- a/tests/mock_clients/mock_azureblob.py +++ b/tests/mock_clients/mock_azureblob.py @@ -12,6 +12,9 @@ TEST_ASSETS = Path(__file__).parent.parent / "assets" +DEFAULT_CONTAINER_NAME = "container" + + def mocked_client_class_factory(test_dir: str): class MockBlobServiceClient: def __init__(self, *args, **kwargs): @@ -33,7 +36,7 @@ def get_blob_client(self, container, blob): return MockBlobClient(self.tmp_path, blob, service_client=self) def get_container_client(self, container): - return MockContainerClient(self.tmp_path) + return MockContainerClient(self.tmp_path, container_name=container) return MockBlobServiceClient @@ -106,8 +109,15 @@ def content_as_bytes(self): class MockContainerClient: - def __init__(self, root): + def __init__(self, root, container_name): self.root = root + self.container_name = container_name + + def exists(self): + if self.container_name == DEFAULT_CONTAINER_NAME: # name used by passing tests + return True + else: + return False def list_blobs(self, name_starts_with=None): return mock_item_paged(self.root, name_starts_with) diff --git a/tests/mock_clients/mock_gs.py b/tests/mock_clients/mock_gs.py index 60b4a01e..ccf675bd 100644 --- a/tests/mock_clients/mock_gs.py +++ b/tests/mock_clients/mock_gs.py @@ -3,9 +3,12 @@ import shutil from tempfile import TemporaryDirectory +from google.api_core.exceptions import NotFound + from .utils import delete_empty_parents_up_to_root TEST_ASSETS = Path(__file__).parent.parent / "assets" +DEFAULT_GS_BUCKET_NAME = "bucket" def mocked_client_class_factory(test_dir: str): @@ -30,7 +33,7 @@ def __del__(self): self.tmp.cleanup() def bucket(self, bucket): - return MockBucket(self.tmp_path, client=self) + return MockBucket(self.tmp_path, bucket, client=self) return MockClient @@ -87,8 +90,9 @@ def content_type(self): class MockBucket: - def __init__(self, name, client=None): + def __init__(self, name, bucket_name, client=None): self.name = name + self.bucket_name = bucket_name self.client = client def blob(self, blob): @@ -113,7 +117,14 @@ def list_blobs(self, max_results=None, prefix=None): for f in path.glob("**/*") if f.is_file() and not f.name.startswith(".") ] - return MockHTTPIterator(items, max_results) + + # bucket name for passing tests + if self.bucket_name == DEFAULT_GS_BUCKET_NAME: + return iter(MockHTTPIterator(items, max_results)) + else: + raise NotFound( + f"Bucket {self.name} not expected as mock bucket; only '{DEFAULT_GS_BUCKET_NAME}' exists." + ) class MockHTTPIterator: diff --git a/tests/mock_clients/mock_s3.py b/tests/mock_clients/mock_s3.py index bb9bf192..994b24a4 100644 --- a/tests/mock_clients/mock_s3.py +++ b/tests/mock_clients/mock_s3.py @@ -10,6 +10,7 @@ from .utils import delete_empty_parents_up_to_root TEST_ASSETS = Path(__file__).parent.parent / "assets" +DEFAULT_S3_BUCKET_NAME = "bucket" # Since we don't contol exactly when the filesystem finishes writing a file # and the test files are super small, we can end up with race conditions in @@ -169,7 +170,8 @@ def __init__(self, items, root, session=None): self.full_paths = items self.s3_obj_paths = [ - s3_obj(bucket_name="bucket", key=str(i.relative_to(self.root))) for i in items + s3_obj(bucket_name=DEFAULT_S3_BUCKET_NAME, key=str(i.relative_to(self.root))) + for i in items ] def __iter__(self): @@ -199,6 +201,19 @@ def __init__(self, root, session=None): def get_paginator(self, api): return MockBoto3Paginator(self.root, session=self.session) + def head_bucket(self, Bucket): + if Bucket == DEFAULT_S3_BUCKET_NAME: # used in passing tests + return {"Bucket": Bucket} + else: + raise ClientError( + { + "Error": { + "Message": f"Bucket {Bucket} not expected as mock bucket; only '{DEFAULT_S3_BUCKET_NAME}' exists." + } + }, + {}, + ) + @property def exceptions(self): Ex = collections.namedtuple("Ex", "NoSuchKey") @@ -214,7 +229,10 @@ def __init__(self, root, per_page=2, session=None): def paginate(self, Bucket=None, Prefix="", Delimiter=None): new_dir = self.root / Prefix - items = [f for f in new_dir.iterdir() if not f.name.startswith(".")] + if Delimiter == "/": + items = [f for f in new_dir.iterdir() if not f.name.startswith(".")] + else: + items = [f for f in new_dir.rglob("*") if not f.name.startswith(".")] for ix in range(0, len(items), self.per_page): page = items[ix : ix + self.per_page] @@ -222,6 +240,22 @@ def paginate(self, Bucket=None, Prefix="", Delimiter=None): {"Prefix": str(_.relative_to(self.root).as_posix())} for _ in page if _.is_dir() ] files = [ - {"Key": str(_.relative_to(self.root).as_posix())} for _ in page if _.is_file() + { + "Key": str(_.relative_to(self.root).as_posix()), + "Size": 123 + if not _.relative_to(self.root).exists() + else _.relative_to(self.root).stat().st_size, + } + for _ in page + if _.is_file() ] + + # s3 can have "fake" directories where size is 0, but it is listed in "Contents" (see #198) + # add one in here for testing + if dirs: + fake_dir = dirs.pop(-1) + fake_dir["Size"] = 0 + fake_dir["Key"] = fake_dir.pop("Prefix") + "/" # fake dirs have '/' appended + files.append(fake_dir) + yield {"CommonPrefixes": dirs, "Contents": files} diff --git a/tests/performance/__init__.py b/tests/performance/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/performance/cli.py b/tests/performance/cli.py new file mode 100644 index 00000000..61a33f83 --- /dev/null +++ b/tests/performance/cli.py @@ -0,0 +1,120 @@ +from datetime import datetime, timedelta +import enum +import os +from pathlib import Path +from typing import Optional + +from dotenv import find_dotenv, load_dotenv +from rich.console import Console +from rich.table import Table +from tqdm import tqdm +import typer +from loguru import logger + +from cloudpathlib import implementation_registry + + +from runner import main, normalize_results, results_to_csv + +# make loguru and tqdm play nicely together +logger.remove() +logger.add(lambda msg: tqdm.write(msg, end=""), colorize=True) + +# get environement variables +load_dotenv(find_dotenv()) + +# enumerate cloudpathlib implementations +CloudEnum = enum.Enum("CloudEnum", {k: k for k in implementation_registry.keys()}) + +# initialize CLI +cli = typer.Typer() + + +def _configure( + backend: CloudEnum, + bucket: Optional[str] = None, +): + # common configuration for all commands + logger.info(f"Setting up tests with backend {backend}.") + + if bucket is None: + logger.info("Bucket not set explicitly, loading from environment variable.") + bucket = { + "s3": os.environ.get("LIVE_S3_BUCKET"), + "gs": os.environ.get("LIVE_AZURE_CONTAINER"), + "azure": os.environ.get("LIVE_GS_BUCKET"), + }.get(backend.value) + + logger.info(f"Bucket: {bucket}") + + # get the actual implementation + backend = implementation_registry.get(backend.value) + + return backend.path_class(f"{backend.path_class.cloud_prefix}{bucket}/performance_tests") + + +def results_to_rich_table(results): + table = Table(title=f"Performance suite results: ({datetime.now().isoformat().split('Z')[0]})") + + var_to_title = lambda x: (" ".join(x.split("_"))).title() + + all_fields, row_list = normalize_results(results) + + for field in all_fields: + col_kwargs = {} + # get a sample value + val = row_list[0][field] + + if isinstance(val, (int, float)): + col_kwargs["justify"] = "right" + + # average performance value is highlighted in green + if field == "mean": + col_kwargs["style"] = "green" + + table.add_column(var_to_title(field), **col_kwargs) + + def _format_row(r): + formatted = [] + for f in all_fields: + val = r[f] + + if f in ["mean", "min", "max", "std"]: + val = str(timedelta(seconds=val)) + elif isinstance(val, int): + val = f"{val:,}" + + if f == "std": + val = "± " + val + + formatted.append(val) + + return formatted + + for row in row_list: + table.add_row(*_format_row(row)) + + return table + + +@cli.command(short_help="Runs peformance test suite against a specific backend and bucket.") +def run( + backend: CloudEnum, + bucket: Optional[str] = None, + iterations: int = 10, + burn_in: int = 2, + save_csv: Optional[Path] = None, +): + root = _configure(backend, bucket) + + results = main(root, iterations, burn_in) + + c = Console() + c.print(results_to_rich_table(results)) + + if save_csv is not None: + results_to_csv(results, save_csv) + + +if __name__ == "__main__": + cli() diff --git a/tests/performance/perf_file_listing.py b/tests/performance/perf_file_listing.py new file mode 100644 index 00000000..9144338d --- /dev/null +++ b/tests/performance/perf_file_listing.py @@ -0,0 +1,5 @@ +def folder_list(folder, recursive): + """Tests *Client._list_dir function and returns + the number of items listed + """ + return {"n_items": len(list(folder.client._list_dir(folder, recursive=recursive)))} diff --git a/tests/performance/runner.py b/tests/performance/runner.py new file mode 100644 index 00000000..1137b31c --- /dev/null +++ b/tests/performance/runner.py @@ -0,0 +1,177 @@ +import csv +from dataclasses import dataclass +from pathlib import Path +from time import perf_counter +from typing import Callable, Dict, List + +from statistics import mean, stdev + + +from tqdm import tqdm +from tqdm.contrib.concurrent import thread_map +from loguru import logger + +from cloudpathlib import CloudPath + + +from perf_file_listing import folder_list + + +# make loguru and tqdm play nicely together +logger.remove() +logger.add(lambda msg: tqdm.write(msg, end=""), colorize=True) + + +def construct_tree(folder_depth, sub_folders, items_per_folder): + for limit in range(0, folder_depth + 1): + if limit == 0: + prefix = "" + else: + prefix = "/".join(f"level_{d:05}" for d in range(1, limit + 1)) + prefix += "/" # append slash for when this gets combined later + + for f in range(0, sub_folders + 1): + if f == 0: + folder_prefix = prefix + else: + folder_prefix = f"{prefix}folder_{f:05}/" + + for i in range(1, items_per_folder + 1): + yield f"{folder_prefix}{i:05}.item" + + +def setup_test(root, folder_depth, sub_folders, items_per_folder, overwrite=False): + test_config_str = f"{folder_depth}_{sub_folders}_{items_per_folder}" + test_folder = CloudPath(root) / test_config_str + + if test_folder.exists(): + if not overwrite: + logger.info( + f"Folder '{test_folder}' already exists, setup complete. Pass 'overwrite=True' to delete and rewrite" + ) + return test_folder + else: + logger.info( + f"Folder '{test_folder}' already exists, and overwrite=True. Removing existing folder." + ) + test_folder.rmtree() + + logger.info(f"Setting up files for testing in folder: {test_folder}") + + # create folders and files for test in parallel + thread_map( + lambda x: (test_folder / x).touch(), + list(construct_tree(folder_depth, sub_folders, items_per_folder)), + desc="creating...", + ) + + return test_folder + + +@dataclass +class PerfRunConfig: + name: str + args: List + kwargs: Dict + + +def run_single_perf_test( + func: Callable, iterations: int, burn_in: int, configs: List[PerfRunConfig] +): + all_results = {} + for c in configs: + measurements = [] + for i in tqdm(range(iterations + burn_in), desc=c.name): + t0 = perf_counter() + + result = func(*c.args, **c.kwargs) + + if i >= burn_in: + measurements.append(perf_counter() - t0) + + stats = {} + stats["mean"] = mean(measurements) + stats["max"] = max(measurements) + stats["std"] = stdev(measurements) + stats["iterations"] = iterations + + if isinstance(result, dict): + stats.update(result) + else: + stats["result"] = result + + all_results[c.name] = stats # add any return information from the test itself + + return all_results + + +def main(root, iterations, burn_in): + # required folder setups; all totals over ~5,000 so that we get + # automatically paginated by AWS S3 API + shallow = setup_test(root, 0, 0, 5_500) + normal = setup_test(root, 5, 100, 12) + deep = setup_test(root, 50, 5, 25) + + test_suite = [ + ( + "List Folders", + folder_list, + [ + PerfRunConfig(name="List shallow recursive", args=[shallow, True], kwargs={}), + PerfRunConfig(name="List shallow non-recursive", args=[shallow, False], kwargs={}), + PerfRunConfig(name="List normal recursive", args=[normal, True], kwargs={}), + PerfRunConfig(name="List normal non-recursive", args=[normal, False], kwargs={}), + PerfRunConfig(name="List deep recursive", args=[deep, True], kwargs={}), + PerfRunConfig(name="List deep non-recursive", args=[deep, False], kwargs={}), + ], + ), + ] + + logger.info( + f"Running performance test suite: root={root}, iter={iterations}, burn_in={burn_in}" + ) + + all_results = {} + + for name, func, confs in tqdm(test_suite, desc="Tests"): + all_results[name] = run_single_perf_test(func, iterations, burn_in, confs) + + return all_results + + +def normalize_results(results): + """Convert nested dict of results to a list of dicts with all the same keys.""" + rows_list = [] + observed_fields = set() + + for test_name, test_results in results.items(): + for conf_name, conf_results in test_results.items(): + row_dict = {"test_name": test_name, "config_name": conf_name} + row_dict.update(conf_results) + rows_list.append(row_dict) + + observed_fields |= set(row_dict.keys()) + + # order fields for ones present in all tests; then any extra fields at the end + common_fields = ["test_name", "config_name", "iterations", "mean", "std", "max"] + + all_fields = common_fields + list(observed_fields - set(common_fields)) + + return all_fields, rows_list + + +def results_to_csv(results, path): + """Save the results from `runner.main` to a csv file.""" + all_fields, row_list = normalize_results(results) + + with Path(path).open("w", newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=all_fields) + writer.writeheader() + + for row in row_list: + # normalize so all rows have all possible fields + for field in all_fields: + if field not in row: + row[field] = None + + writer.writerow(row) diff --git a/tests/test_cloudpath_file_io.py b/tests/test_cloudpath_file_io.py index 4ff4cf28..96ff1c1d 100644 --- a/tests/test_cloudpath_file_io.py +++ b/tests/test_cloudpath_file_io.py @@ -6,6 +6,7 @@ from time import sleep import pytest +from cloudpathlib import CloudPath from cloudpathlib.exceptions import ( CloudPathIsADirectoryError, @@ -211,13 +212,13 @@ def test_glob_exceptions(rig): # non-relative paths with pytest.raises(CloudPathNotImplementedError, match="Non-relative patterns"): - list(cp.glob(f"{rig.path_class.cloud_prefix}bucket/path/**/*.jpg")) + list(cp.glob(f"{rig.path_class.cloud_prefix}{rig.drive}/path/**/*.jpg")) with pytest.raises(CloudPathNotImplementedError, match="Non-relative patterns"): list(cp.glob("/path/**/*.jpg")) with pytest.raises(CloudPathNotImplementedError, match="Non-relative patterns"): - list(cp.rglob(f"{rig.path_class.cloud_prefix}bucket/path/**/*.jpg")) + list(cp.rglob(f"{rig.path_class.cloud_prefix}{rig.drive}/path/**/*.jpg")) with pytest.raises(CloudPathNotImplementedError, match="Non-relative patterns"): list(cp.rglob("/path/**/*.jpg")) @@ -345,3 +346,12 @@ def test_pickle(rig, tmpdir): assert str(pickled) == str(p) assert pickled.client == p.client assert rig.client_class._default_client == pickled.client + + +def test_drive_exists(rig): + """Tests the exists call for top level bucket/container""" + p = rig.create_cloud_path("dir_0/file0_0.txt") + + assert CloudPath(f"{rig.cloud_prefix}{p.drive}").exists() + + assert not CloudPath(f"{rig.cloud_prefix}totally-fake-not-existing-bucket-for-tests").exists()