Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to S3Client internals to pass extras and more #307

Merged
merged 5 commits into from Dec 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 15 additions & 2 deletions cloudpathlib/azure/azblobclient.py
Expand Up @@ -73,6 +73,8 @@ def __init__(
content_type_method (Optional[Callable]): Function to call to guess media type (mimetype) when
writing a file to the cloud. Defaults to `mimetypes.guess_type`. Must return a tuple (content type, content encoding).
"""
super().__init__(local_cache_dir=local_cache_dir, content_type_method=content_type_method)

if connection_string is None:
connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING", None)

Expand All @@ -90,8 +92,6 @@ def __init__(
"Credentials are required; see docs for options."
)

super().__init__(local_cache_dir=local_cache_dir, content_type_method=content_type_method)

def _get_metadata(self, cloud_path: AzureBlobPath) -> Union["BlobProperties", Dict[str, Any]]:
blob = self.service_client.get_blob_client(
container=cloud_path.container, blob=cloud_path.blob
Expand Down Expand Up @@ -151,6 +151,19 @@ def _exists(self, cloud_path: AzureBlobPath) -> bool:
def _list_dir(
self, cloud_path: AzureBlobPath, recursive: bool = False
) -> Iterable[Tuple[AzureBlobPath, bool]]:
# shortcut if listing all available containers
if not cloud_path.container:
if recursive:
raise NotImplementedError(
"Cannot recursively list all containers and contents; you can get all the containers then recursively list each separately."
)

yield from (
(self.CloudPath(f"az://{c.name}"), True)
for c in self.service_client.list_containers()
)
return

container_client = self.service_client.get_container_client(cloud_path.container)

prefix = cloud_path.blob
Expand Down
12 changes: 12 additions & 0 deletions cloudpathlib/gs/gsclient.py
Expand Up @@ -148,6 +148,18 @@ def _exists(self, cloud_path: GSPath) -> bool:
return self._is_file_or_dir(cloud_path) in ["file", "dir"]

def _list_dir(self, cloud_path: GSPath, recursive=False) -> Iterable[Tuple[GSPath, bool]]:
# shortcut if listing all available buckets
if not cloud_path.bucket:
if recursive:
raise NotImplementedError(
"Cannot recursively list all buckets and contents; you can get all the buckets then recursively list each separately."
)

yield from (
(self.CloudPath(f"gs://{str(b)}"), True) for b in self.client.list_buckets()
)
return

bucket = self.client.bucket(cloud_path.bucket)

prefix = cloud_path.blob
Expand Down
3 changes: 2 additions & 1 deletion cloudpathlib/local/implementations/azure.py
Expand Up @@ -25,12 +25,13 @@ def __init__(self, *args, **kwargs):
kwargs.get("account_url", None),
os.getenv("AZURE_STORAGE_CONNECTION_STRING", None),
]
super().__init__(*args, **kwargs)

if all(opt is None for opt in cred_opts):
raise MissingCredentialsError(
"AzureBlobClient does not support anonymous instantiation. "
"Credentials are required; see docs for options."
)
super().__init__(*args, **kwargs)


LocalAzureBlobClient.AzureBlobPath = LocalAzureBlobClient.CloudPath # type: ignore
Expand Down
156 changes: 103 additions & 53 deletions cloudpathlib/s3/s3client.py
Expand Up @@ -3,14 +3,16 @@
from pathlib import Path, PurePosixPath
from typing import Any, Callable, Dict, Iterable, Optional, Tuple, Union

from cloudpathlib.exceptions import CloudPathException


from ..client import Client, register_client_class
from ..cloudpath import implementation_registry
from .s3path import S3Path

try:
from boto3.session import Session
from boto3.s3.transfer import TransferConfig
from boto3.s3.transfer import TransferConfig, S3Transfer
from botocore.config import Config
from botocore.exceptions import ClientError
import botocore.session
Expand All @@ -37,6 +39,7 @@ def __init__(
endpoint_url: Optional[str] = None,
boto3_transfer_config: Optional["TransferConfig"] = None,
content_type_method: Optional[Callable] = mimetypes.guess_type,
extra_args: Optional[dict] = None,
):
"""Class constructor. Sets up a boto3 [`Session`](
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html).
Expand All @@ -52,9 +55,9 @@ def __init__(
aws_secret_access_key (Optional[str]): AWS secret access key.
aws_session_token (Optional[str]): Session key for your AWS account. This is only
needed when you are using temporarycredentials.
no_sign_request: (Optional[bool]): If `True`, credentials are not looked for and we use unsigned
no_sign_request (Optional[bool]): If `True`, credentials are not looked for and we use unsigned
requests to fetch resources. This will only allow access to public resources. This is equivalent
to `--no-sign-request` in the AWS CLI (https://docs.aws.amazon.com/cli/latest/reference/).
to `--no-sign-request` in the [AWS CLI](https://docs.aws.amazon.com/cli/latest/reference/).
botocore_session (Optional[botocore.session.Session]): An already instantiated botocore
Session.
profile_name (Optional[str]): Profile name of a profile in a shared credentials file.
Expand All @@ -63,10 +66,14 @@ def __init__(
for downloaded files. If None, will use a temporary directory.
endpoint_url (Optional[str]): S3 server endpoint URL to use for the constructed boto3 S3 resource and client.
Parameterize it to access a customly deployed S3-compatible object store such as MinIO, Ceph or any other.
boto3_transfer_config (Optional[dict]): Instantiated TransferConfig for managing s3 transfers.
(https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html#boto3.s3.transfer.TransferConfig)
boto3_transfer_config (Optional[dict]): Instantiated TransferConfig for managing
[s3 transfers](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html#boto3.s3.transfer.TransferConfig)
content_type_method (Optional[Callable]): Function to call to guess media type (mimetype) when
writing a file to the cloud. Defaults to `mimetypes.guess_type`. Must return a tuple (content type, content encoding).
extra_args (Optional[dict]): A dictionary of extra args passed to download, upload, and list functions as relevant. You
can include any keys supported by upload or download, and we will pass on only the relevant args. To see the extra
args that are supported look at the upload and download lists in the
[boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html#boto3.s3.transfer.S3Transfer).
"""
endpoint_url = endpoint_url or os.getenv("AWS_ENDPOINT_URL")
if boto3_session is not None:
Expand Down Expand Up @@ -97,10 +104,32 @@ def __init__(

self.boto3_transfer_config = boto3_transfer_config

if extra_args is None:
extra_args = {}

self._extra_args = extra_args
self.boto3_dl_extra_args = {
k: v for k, v in extra_args.items() if k in S3Transfer.ALLOWED_DOWNLOAD_ARGS
}
self.boto3_ul_extra_args = {
k: v for k, v in extra_args.items() if k in S3Transfer.ALLOWED_UPLOAD_ARGS
}

# listing ops (list_objects_v2, filter, delete) only accept these extras:
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html
self.boto3_list_extra_args = {
k: self._extra_args[k]
for k in ["RequestPayer", "ExpectedBucketOwner"]
if k in self._extra_args
}

super().__init__(local_cache_dir=local_cache_dir, content_type_method=content_type_method)

def _get_metadata(self, cloud_path: S3Path) -> Dict[str, Any]:
data = self.s3.ObjectSummary(cloud_path.bucket, cloud_path.key).get()
# get accepts all download extra args
data = self.s3.ObjectSummary(cloud_path.bucket, cloud_path.key).get(
**self.boto3_dl_extra_args
)

return {
"last_modified": data["LastModified"],
Expand All @@ -114,7 +143,9 @@ def _download_file(self, cloud_path: S3Path, local_path: Union[str, os.PathLike]
local_path = Path(local_path)
obj = self.s3.Object(cloud_path.bucket, cloud_path.key)

obj.download_file(str(local_path), Config=self.boto3_transfer_config)
obj.download_file(
str(local_path), Config=self.boto3_transfer_config, ExtraArgs=self.boto3_dl_extra_args
)
return local_path

def _is_file_or_dir(self, cloud_path: S3Path) -> Optional[str]:
Expand All @@ -123,30 +154,17 @@ def _is_file_or_dir(self, cloud_path: S3Path) -> Optional[str]:
return "dir"

# get first item by listing at least one key
s3_obj = self._s3_file_query(cloud_path)

if s3_obj is None:
return None

# since S3 only returns files when filtering objects:
# if the first item key is equal to the path key, this is a file
if s3_obj.key == cloud_path.key:

# "fake" directories on S3 can be created in the console UI
# these are 0-size keys that end in `/`
# Ref: https://github.com/boto/boto3/issues/377
if s3_obj.key.endswith("/") and s3_obj.content_length == 0:
return "dir"
else:
return "file"
else:
return "dir"
return self._s3_file_query(cloud_path)

def _exists(self, cloud_path: S3Path) -> bool:
# check if this is a bucket
if not cloud_path.key:
extra = {
k: self._extra_args[k] for k in ["ExpectedBucketOwner"] if k in self._extra_args
}

try:
self.client.head_bucket(Bucket=cloud_path.bucket)
self.client.head_bucket(Bucket=cloud_path.bucket, **extra)
return True
except ClientError:
return False
Expand All @@ -157,25 +175,44 @@ def _s3_file_query(self, cloud_path: S3Path):
"""Boto3 query used for quick checks of existence and if path is file/dir"""
# check if this is an object that we can access directly
try:
obj = self.s3.Object(cloud_path.bucket, cloud_path.key)
obj.load()
return obj
# head_object accepts all download extra args (note: Object.load does not accept extra args so we do not use it for this check)
self.client.head_object(
Bucket=cloud_path.bucket,
Key=cloud_path.key.rstrip("/"),
**self.boto3_dl_extra_args,
)
return "file"

# else, confirm it is a dir by filtering to the first item under the prefix
except ClientError:
# else, confirm it is a dir by filtering to the first item under the prefix plus a "/"
except (ClientError, self.client.exceptions.NoSuchKey):
key = cloud_path.key.rstrip("/") + "/"

return next(
(
obj
"dir" # always a dir if we find anything with this query
for obj in (
self.s3.Bucket(cloud_path.bucket).objects.filter(Prefix=key).limit(1)
self.s3.Bucket(cloud_path.bucket)
.objects.filter(Prefix=key, **self.boto3_list_extra_args)
.limit(1)
)
),
None,
)

def _list_dir(self, cloud_path: S3Path, recursive=False) -> Iterable[Tuple[S3Path, bool]]:
# shortcut if listing all available buckets
if not cloud_path.bucket:
if recursive:
raise NotImplementedError(
"Cannot recursively list all buckets and contents; you can get all the buckets then recursively list each separately."
)

yield from (
(self.CloudPath(f"s3://{b['Name']}"), True)
for b in self.client.list_buckets().get("Buckets", [])
)
return

prefix = cloud_path.key
if prefix and not prefix.endswith("/"):
prefix += "/"
Expand All @@ -185,7 +222,10 @@ def _list_dir(self, cloud_path: S3Path, recursive=False) -> Iterable[Tuple[S3Pat
paginator = self.client.get_paginator("list_objects_v2")

for result in paginator.paginate(
Bucket=cloud_path.bucket, Prefix=prefix, Delimiter=("" if recursive else "/")
Bucket=cloud_path.bucket,
Prefix=prefix,
Delimiter=("" if recursive else "/"),
**self.boto3_list_extra_args,
):
# yield everything in common prefixes as directories
for result_prefix in result.get("CommonPrefixes", []):
Expand Down Expand Up @@ -238,48 +278,58 @@ def _move_file(self, src: S3Path, dst: S3Path, remove_src: bool = True) -> S3Pat
CopySource={"Bucket": src.bucket, "Key": src.key},
Metadata=self._get_metadata(src).get("extra", {}),
MetadataDirective="REPLACE",
**self.boto3_ul_extra_args,
)

else:
target = self.s3.Object(dst.bucket, dst.key)
target.copy({"Bucket": src.bucket, "Key": src.key})
target.copy(
{"Bucket": src.bucket, "Key": src.key},
ExtraArgs=self.boto3_dl_extra_args,
Config=self.boto3_transfer_config,
)

if remove_src:
self._remove(src)
return dst

def _remove(self, cloud_path: S3Path, missing_ok: bool = True) -> None:
try:
obj = self.s3.Object(cloud_path.bucket, cloud_path.key)

# will throw if not a file
obj.load()

resp = obj.delete()
assert resp.get("ResponseMetadata").get("HTTPStatusCode") == 204
file_or_dir = self._is_file_or_dir(cloud_path=cloud_path)
if file_or_dir == "file":
resp = self.s3.Object(cloud_path.bucket, cloud_path.key).delete(
**self.boto3_list_extra_args
)
if resp.get("ResponseMetadata").get("HTTPStatusCode") not in (204, 200):
raise CloudPathException(
f"Delete operation failed for {cloud_path} with response: {resp}"
)

except ClientError:
elif file_or_dir == "dir":
# try to delete as a direcotry instead
bucket = self.s3.Bucket(cloud_path.bucket)

prefix = cloud_path.key
if prefix and not prefix.endswith("/"):
prefix += "/"

resp = bucket.objects.filter(Prefix=prefix).delete()
resp = bucket.objects.filter(Prefix=prefix, **self.boto3_list_extra_args).delete(
**self.boto3_list_extra_args
)
if resp[0].get("ResponseMetadata").get("HTTPStatusCode") not in (204, 200):
raise CloudPathException(
f"Delete operation failed for {cloud_path} with response: {resp}"
)

# ensure directory deleted; if cloud_path did not exist at all
# resp will be [], so no need to check success
if resp:
assert resp[0].get("ResponseMetadata").get("HTTPStatusCode") == 200
else:
if not missing_ok:
raise FileNotFoundError(f"File does not exist: {cloud_path}")
else:
if not missing_ok:
raise FileNotFoundError(
f"Cannot delete file that does not exist: {cloud_path} (consider passing missing_ok=True)"
)

def _upload_file(self, local_path: Union[str, os.PathLike], cloud_path: S3Path) -> S3Path:
obj = self.s3.Object(cloud_path.bucket, cloud_path.key)

extra_args = {}
extra_args = self.boto3_ul_extra_args.copy()

if self.content_type_method is not None:
content_type, content_encoding = self.content_type_method(str(local_path))
Expand Down