From 2792a489c341ef623b43d16e996bc6ce68651386 Mon Sep 17 00:00:00 2001 From: Shane A Date: Wed, 22 Nov 2023 11:24:01 -0500 Subject: [PATCH 1/9] Add R2 scheme client for cached_path --- scripts/storage_cleaner.py | 43 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index 57ef3d659..a7201cbc8 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -13,8 +13,12 @@ from typing import Any, Dict, List, Optional, Tuple from urllib.parse import urlparse +import boto3.session +import botocore.client import botocore.exceptions as boto_exceptions import google.cloud.storage as gcs +from cached_path import add_scheme_client +from cached_path.schemes import S3Client from google.api_core.exceptions import NotFound from rich.progress import Progress @@ -672,6 +676,44 @@ def perform_operation(args: argparse.Namespace): raise NotImplementedError(args.op) +def _add_cached_path_r2_client(r2_account_id: str): + endpoint_url = f"https://{r2_account_id}.r2.cloudflarestorage.com" + + class R2SchemeClient(S3Client): + """ + A class that the `cached_path` module can use to retrieve resources from + R2. Refer to + [cached_path docs](https://github.com/allenai/cached_path/blob/main/docs/source/overview.md#supported-url-schemes). + """ + scheme = "r2" + + def __init__(self, resource: str) -> None: + super().__init__(resource) + parsed_path = urlparse(resource) + bucket_name = parsed_path.netloc + r2_path = parsed_path.path.lstrip("/") + + session = boto3.session.Session() + if session.get_credentials() is None: + # Use unsigned requests. + s3_resource = session.resource( + "s3", + endpoint_url=endpoint_url, + config=botocore.client.Config(signature_version=botocore.UNSIGNED) + ) + else: + s3_resource = session.resource("s3", endpoint_url=endpoint_url) + self.s3_object = s3_resource.Object(bucket_name, r2_path) # type: ignore + + add_scheme_client(R2SchemeClient) + + +def _add_cached_path_scheme_clients(): + r2_account_id = os.environ.get("R2_ACCOUNT_ID") + if r2_account_id is not None: + _add_cached_path_r2_client(r2_account_id) + + def _add_delete_subparser(subparsers: _SubParsersAction): delete_runs_parser: ArgumentParser = subparsers.add_parser( "clean", help="Delete bad runs (e.g. runs with no non-trivial checkpoints)" @@ -723,6 +765,7 @@ def main(): args = get_parser().parse_args() util.prepare_cli_environment() + _add_cached_path_scheme_clients() perform_operation(args) From 97a6df7c32741ac269dbf90500c9a2b3c2652f6d Mon Sep 17 00:00:00 2001 From: Shane A Date: Wed, 22 Nov 2023 11:25:08 -0500 Subject: [PATCH 2/9] Migrate cloud file download and unarchiving to cached_path --- scripts/storage_cleaner.py | 56 ++++++-------------------------------- 1 file changed, 9 insertions(+), 47 deletions(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index a7201cbc8..2f0e84885 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -3,7 +3,6 @@ import os import re import shutil -import tarfile import tempfile from abc import ABC, abstractmethod from argparse import ArgumentParser, _SubParsersAction @@ -17,10 +16,9 @@ import botocore.client import botocore.exceptions as boto_exceptions import google.cloud.storage as gcs -from cached_path import add_scheme_client +from cached_path import add_scheme_client, cached_path from cached_path.schemes import S3Client from google.api_core.exceptions import NotFound -from rich.progress import Progress from olmo import util from olmo.aliases import PathOrStr @@ -141,6 +139,12 @@ def _list_entries( self, path: PathOrStr, include_files: bool = True, max_file_size: Optional[int] = None ) -> List[str]: path = Path(path) + if path.is_file(): + if not self.has_supported_archive_extension(path): + raise ValueError(f"File does not have a supported archive extension: {path}") + + path = cached_path(path, extract_archive=True) + if path.is_dir(): return [ entry.name @@ -153,16 +157,6 @@ def _list_entries( ) ] - if self.has_supported_archive_extension(path): - if not include_files or max_file_size is not None: - raise NotImplementedError("Filtering out entries from a tar file is not yet supported") - - with tarfile.open(path) as tar: - log.info("Listing entries from archive %s", path) - return [ - Path(tar_subpath).name for tar_subpath in tar.getnames() if len(Path(tar_subpath).parts) == 2 - ] - raise ValueError(f"Path does not correspond to directory or supported archive file: {path}") def list_entries(self, path: str, max_file_size: Optional[int] = None) -> List[str]: @@ -256,17 +250,6 @@ def _get_size(self, bucket_name: str, key: str) -> int: return self._get_blob_size(blob) - def _download_file(self, bucket_name: str, key: str) -> str: - extension = "".join(Path(key).suffixes) - temp_file = self.local_fs_adapter.create_temp_file(suffix=extension) - - bucket = self.gcs_client.bucket(bucket_name) - blob = bucket.get_blob(key) - if blob is None: - raise ValueError(f"Downloading invalid object: {self._get_path(bucket_name, key)}") - blob.download_to_filename(temp_file) - return temp_file - def _get_directory_entries( self, bucket_name: str, @@ -308,8 +291,7 @@ def _list_entries( bucket_name, key = self._get_bucket_name_and_key(path) if self.local_fs_adapter.has_supported_archive_extension(path): - log.info("Downloading archive %s", path) - file_path = self._download_file(bucket_name, key) + file_path = str(cached_path(path, extract_archive=True)) if not include_files: return self.local_fs_adapter.list_dirs(file_path) @@ -400,25 +382,6 @@ def _get_size(self, bucket_name: str, key: str) -> int: raise RuntimeError(f"Failed to get size for file: {self._get_path(bucket_name, key)}") return head_response["ContentLength"] - def _download_file(self, bucket_name: str, key: str) -> str: - extension = "".join(Path(key).suffixes) - temp_file = self.local_fs_adapter.create_temp_file(suffix=extension) - - size_in_bytes = self._get_size(bucket_name, key) - - with Progress(transient=True) as progress: - download_task = progress.add_task(f"Downloading {key}", total=size_in_bytes) - - def progress_callback(bytes_downloaded: int): - progress.update(download_task, advance=bytes_downloaded) - - self._s3_client.download_file(bucket_name, key, temp_file, Callback=progress_callback) - - if not self.local_fs_adapter.is_file(temp_file): - raise RuntimeError(f"Failed to download file: {self._get_path(bucket_name, key)}") - - return temp_file - def _get_directory_entries( self, bucket_name: str, @@ -458,8 +421,7 @@ def _list_entries( bucket_name, key = self._get_bucket_name_and_key(path) if self.local_fs_adapter.has_supported_archive_extension(path): - log.info("Downloading archive %s", path) - file_path = self._download_file(bucket_name, key) + file_path = str(cached_path(path, extract_archive=True)) if not include_files: return self.local_fs_adapter.list_dirs(file_path) From dd2ddc79c247bc0bef06fdb1957bdd7bdfd98c89 Mon Sep 17 00:00:00 2001 From: Shane A Date: Wed, 22 Nov 2023 12:07:25 -0500 Subject: [PATCH 3/9] Run ruff --- scripts/storage_cleaner.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index 2f0e84885..93dde5dbe 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -647,6 +647,7 @@ class R2SchemeClient(S3Client): R2. Refer to [cached_path docs](https://github.com/allenai/cached_path/blob/main/docs/source/overview.md#supported-url-schemes). """ + scheme = "r2" def __init__(self, resource: str) -> None: @@ -661,11 +662,11 @@ def __init__(self, resource: str) -> None: s3_resource = session.resource( "s3", endpoint_url=endpoint_url, - config=botocore.client.Config(signature_version=botocore.UNSIGNED) + config=botocore.client.Config(signature_version=botocore.UNSIGNED), ) else: s3_resource = session.resource("s3", endpoint_url=endpoint_url) - self.s3_object = s3_resource.Object(bucket_name, r2_path) # type: ignore + self.s3_object = s3_resource.Object(bucket_name, r2_path) # type: ignore add_scheme_client(R2SchemeClient) From 368a6ed4451a0ebd82472ce37919d35f959a05ad Mon Sep 17 00:00:00 2001 From: Shane A Date: Wed, 29 Nov 2023 14:28:52 -0800 Subject: [PATCH 4/9] Fix type check errors in _list_entries --- scripts/storage_cleaner.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index 93dde5dbe..0006a7eab 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -138,17 +138,17 @@ def has_supported_archive_extension(self, path: PathOrStr) -> bool: def _list_entries( self, path: PathOrStr, include_files: bool = True, max_file_size: Optional[int] = None ) -> List[str]: - path = Path(path) - if path.is_file(): - if not self.has_supported_archive_extension(path): + path_obj = Path(path) + if path_obj.is_file(): + if not self.has_supported_archive_extension(path_obj): raise ValueError(f"File does not have a supported archive extension: {path}") - path = cached_path(path, extract_archive=True) + path_obj = cached_path(path_obj, extract_archive=True) - if path.is_dir(): + if path_obj.is_dir(): return [ entry.name - for entry in path.iterdir() + for entry in path_obj.iterdir() if ( (include_files or not entry.is_file()) and ( From 36e485f717f648b5ba3cbb0c318dd12b02242b5d Mon Sep 17 00:00:00 2001 From: Shane A Date: Wed, 6 Dec 2023 09:41:51 -0800 Subject: [PATCH 5/9] Use updated util methods to setup S3 adapter --- scripts/storage_cleaner.py | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index 0006a7eab..5093830ac 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -33,11 +33,11 @@ class CleaningOperations(Enum): DELETE_BAD_RUNS = auto() -class StorageType(Enum): - LOCAL_FS = auto() - GCS = auto() - S3 = auto() - R2 = auto() +class StorageType(util.StrEnum): + LOCAL_FS = "" + GCS = "gs" + S3 = "s3" + R2 = "r2" class StorageAdapter(ABC): @@ -82,15 +82,8 @@ def create_storage_adapter(cls, storage_type: StorageType): return LocalFileSystemAdapter() if storage_type == StorageType.GCS: return GoogleCloudStorageAdapter() - if storage_type == StorageType.S3: + if storage_type in (StorageType.S3, StorageType.R2): return S3StorageAdapter(storage_type) - if storage_type == StorageType.R2: - r2_account_id = os.environ.get("R2_ACCOUNT_ID") - if r2_account_id is None: - raise ValueError( - "R2_ACCOUNT_ID environment variable not set with R2 account id, cannot connect to R2" - ) - return S3StorageAdapter(storage_type, endpoint_url=f"https://{r2_account_id}.r2.cloudflarestorage.com") raise NotImplementedError(f"No storage adapter implemented for storage type {storage_type}") @@ -340,10 +333,10 @@ def is_dir(self, path: str) -> bool: class S3StorageAdapter(StorageAdapter): - def __init__(self, storage_type: StorageType, endpoint_url: Optional[str] = None): + def __init__(self, storage_type: StorageType): super().__init__() self._storage_type = storage_type - self._s3_client = util._get_s3_client(endpoint_url=endpoint_url) + self._s3_client = util._get_s3_client(str(storage_type)) self._local_fs_adapter: Optional[LocalFileSystemAdapter] = None self._temp_dirs: List[tempfile.TemporaryDirectory] = [] From 4b86ebb4470b724300d07692de10d19b13bc66c7 Mon Sep 17 00:00:00 2001 From: Shane A Date: Wed, 6 Dec 2023 09:50:08 -0800 Subject: [PATCH 6/9] Use updated util methods to set up cached path client --- scripts/storage_cleaner.py | 38 ++++++++++++++------------------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index 5093830ac..d2acf2229 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -1,6 +1,5 @@ import argparse import logging -import os import re import shutil import tempfile @@ -13,7 +12,6 @@ from urllib.parse import urlparse import boto3.session -import botocore.client import botocore.exceptions as boto_exceptions import google.cloud.storage as gcs from cached_path import add_scheme_client, cached_path @@ -631,43 +629,35 @@ def perform_operation(args: argparse.Namespace): raise NotImplementedError(args.op) -def _add_cached_path_r2_client(r2_account_id: str): - endpoint_url = f"https://{r2_account_id}.r2.cloudflarestorage.com" +def _add_cached_path_s3_client(): - class R2SchemeClient(S3Client): + class S3SchemeClient(S3Client): """ A class that the `cached_path` module can use to retrieve resources from - R2. Refer to + S3 (and R2, which is S3-based). Refer to [cached_path docs](https://github.com/allenai/cached_path/blob/main/docs/source/overview.md#supported-url-schemes). """ - scheme = "r2" + schemes = ("s3", "r2") def __init__(self, resource: str) -> None: super().__init__(resource) parsed_path = urlparse(resource) bucket_name = parsed_path.netloc - r2_path = parsed_path.path.lstrip("/") - - session = boto3.session.Session() - if session.get_credentials() is None: - # Use unsigned requests. - s3_resource = session.resource( - "s3", - endpoint_url=endpoint_url, - config=botocore.client.Config(signature_version=botocore.UNSIGNED), - ) - else: - s3_resource = session.resource("s3", endpoint_url=endpoint_url) - self.s3_object = s3_resource.Object(bucket_name, r2_path) # type: ignore + key = parsed_path.path.lstrip("/") + + profile_name = util._get_s3_profile_name(parsed_path.scheme) + endpoint_url = util._get_s3_endpoint_url(parsed_path.scheme) + + session = boto3.session.Session(profile_name=profile_name) + s3_resource = session.resource("s3", endpoint_url=endpoint_url) + self.s3_object = s3_resource.Object(bucket_name, key) # type: ignore - add_scheme_client(R2SchemeClient) + add_scheme_client(S3SchemeClient) def _add_cached_path_scheme_clients(): - r2_account_id = os.environ.get("R2_ACCOUNT_ID") - if r2_account_id is not None: - _add_cached_path_r2_client(r2_account_id) + _add_cached_path_s3_client() def _add_delete_subparser(subparsers: _SubParsersAction): From 1913b7a8fe385557624f79bc6068bd559d360e7f Mon Sep 17 00:00:00 2001 From: Shane A Date: Wed, 6 Dec 2023 10:55:11 -0800 Subject: [PATCH 7/9] Fix typo in cached path client schemes --- scripts/storage_cleaner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index d2acf2229..35c2b8d4e 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -638,7 +638,8 @@ class S3SchemeClient(S3Client): [cached_path docs](https://github.com/allenai/cached_path/blob/main/docs/source/overview.md#supported-url-schemes). """ - schemes = ("s3", "r2") + # This is used by cached_path to get the schemes are handled by this client + scheme = ("s3", "r2") def __init__(self, resource: str) -> None: super().__init__(resource) From 4f59fc5004c3a9fdc829946972dd12776a071a18 Mon Sep 17 00:00:00 2001 From: Shane A Date: Wed, 6 Dec 2023 10:58:03 -0800 Subject: [PATCH 8/9] Add temp dir argument --- scripts/storage_cleaner.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index 35c2b8d4e..2f240a2f3 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -14,7 +14,7 @@ import boto3.session import botocore.exceptions as boto_exceptions import google.cloud.storage as gcs -from cached_path import add_scheme_client, cached_path +from cached_path import add_scheme_client, cached_path, set_cache_dir from cached_path.schemes import S3Client from google.api_core.exceptions import NotFound @@ -657,7 +657,10 @@ def __init__(self, resource: str) -> None: add_scheme_client(S3SchemeClient) -def _add_cached_path_scheme_clients(): +def _setup_cached_path(args: argparse.Namespace): + if args.temp_dir is not None: + set_cache_dir(args.temp_dir) + _add_cached_path_s3_client() @@ -701,6 +704,10 @@ def get_parser() -> ArgumentParser: action="store_true", help="If set, indicate actions but do not do them", ) + parser.add_argument( + "--temp_dir", + help="Directory where artifacts (e.g. unarchived directories) can be stored temporarily", + ) subparsers = parser.add_subparsers(dest="command", help="Cleaning commands", required=True) _add_delete_subparser(subparsers) @@ -712,7 +719,7 @@ def main(): args = get_parser().parse_args() util.prepare_cli_environment() - _add_cached_path_scheme_clients() + _setup_cached_path(args) perform_operation(args) From a22cddc50a19a3f81601ed3d0d1c6ba1dbb97b9e Mon Sep 17 00:00:00 2001 From: Shane A Date: Wed, 6 Dec 2023 10:58:52 -0800 Subject: [PATCH 9/9] Run ruff --- scripts/storage_cleaner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index 2f240a2f3..d47308226 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -630,7 +630,6 @@ def perform_operation(args: argparse.Namespace): def _add_cached_path_s3_client(): - class S3SchemeClient(S3Client): """ A class that the `cached_path` module can use to retrieve resources from