Skip to content
Permalink
Browse files

rohmu: Add support for remote copy operation

All storage providers support performing a remote copy operation, which
can be useful if an object has already been uploaded but another copy of
the same data is needed with different name / metadata.
  • Loading branch information...
rikonen committed Mar 12, 2019
1 parent 8fb2181 commit 626a30f16a3ca19b631dbc550b758cce710d5850
@@ -52,6 +52,37 @@ def __init__(self, account_name, account_key, bucket_name, prefix=None,
self.container = self.get_or_create_container(self.container_name)
self.log.debug("AzureTransfer initialized, %r", self.container_name)

def copy_file(self, *, source_key, destination_key, metadata=None, **kwargs):
timeout = kwargs.get("timeout") or 15
source_path = self.format_key_for_backend(source_key, remove_slash_prefix=True, trailing_slash=False)
destination_path = self.format_key_for_backend(destination_key, remove_slash_prefix=True, trailing_slash=False)
source_url = self.conn.make_blob_url(self.container_name, source_path)
start = time.monotonic()
self.conn.copy_blob(self.container_name, destination_path, source_url, metadata=metadata, timeout=timeout)
while True:
blob_properties = self.conn.get_blob_properties(self.container_name, destination_path, timeout=timeout)
copy_props = blob_properties.properties.copy
if copy_props.status == "success":
return
elif copy_props.status == "pending":
if time.monotonic() - start < timeout:
time.sleep(0.1)
else:
self.conn.abort_copy_blob(self.container_name, destination_key, copy_props.id, timeout=timeout)
raise StorageError(
"Copying {!r} to {!r} did not complete in {} seconds".format(source_key, destination_key, timeout)
)
elif copy_props.status == "failed":
raise StorageError(
"Copying {!r} to {!r} failed: {!r}".format(source_key, destination_key, copy_props.status_description)
)
else:
raise StorageError(
"Copying {!r} to {!r} failed, unexpected status: {!r}".format(
source_key, destination_key, copy_props.status
)
)

def get_metadata_for_key(self, key):
path = self.format_key_for_backend(key, remove_slash_prefix=True, trailing_slash=False)
items = list(self._iter_key(path=path, with_metadata=True, deep=False))
@@ -26,6 +26,11 @@ def __init__(self, prefix):
prefix += "/"
self.prefix = prefix

def copy_file(self, *, source_key, destination_key, metadata=None, **_kwargs):
"""Performs remote copy from source key name to destination key name. Key must identify a file, trees
cannot be copied with this method. If no metadata is given copies the existing metadata."""
raise NotImplementedError

def format_key_for_backend(self, key, remove_slash_prefix=False, trailing_slash=False):
"""Add a possible prefix to the key before sending it to the backend"""
path = self.prefix + key
@@ -164,6 +164,23 @@ def _retry_on_reset(self, request, action):
retries -= 1
time.sleep(retry_wait)

def copy_file(self, *, source_key, destination_key, metadata=None, **_kwargs):
source_object = self.format_key_for_backend(source_key)
destination_object = self.format_key_for_backend(destination_key)
body = {}
if metadata is not None:
body["metadata"] = metadata

with self._object_client(not_found=source_key) as clob:
request = clob.copy(
body=body,
destinationBucket=self.bucket_name,
destinationObject=destination_object,
sourceBucket=self.bucket_name,
sourceObject=source_object,
)
self._retry_on_reset(request, request.execute)

def get_metadata_for_key(self, key):
key = self.format_key_for_backend(key)
with self._object_client(not_found=key) as clob:
@@ -24,6 +24,18 @@ def __init__(self, directory, prefix=None):
super().__init__(prefix=prefix)
self.log.debug("LocalTransfer initialized")

def copy_file(self, *, source_key, destination_key, metadata=None, **_kwargs):
source_path = self.format_key_for_backend(source_key.strip("/"))
destination_path = self.format_key_for_backend(destination_key.strip("/"))
if not os.path.isfile(source_path):
raise FileNotFoundFromStorageError(source_key)
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
shutil.copy(source_path, destination_path)
if metadata is None:
shutil.copy(source_path + ".metadata", destination_path + ".metadata")
else:
self._save_metadata(destination_path, metadata)

def get_metadata_for_key(self, key):
source_path = self.format_key_for_backend(key.strip("/"))
if not os.path.exists(source_path):
@@ -83,6 +83,24 @@ def __init__(self,
self.encrypted = encrypted
self.log.debug("S3Transfer initialized")

def copy_file(self, *, source_key, destination_key, metadata=None, **_kwargs):
source_path = self.bucket_name + "/" + self.format_key_for_backend(source_key, remove_slash_prefix=True)
destination_path = self.format_key_for_backend(destination_key, remove_slash_prefix=True)
try:
self.s3_client.copy_object(
Bucket=self.bucket_name,
CopySource=source_path,
Key=destination_path,
Metadata=metadata or {},
MetadataDirective="COPY" if metadata is None else "REPLACE",
)
except botocore.exceptions.ClientError as ex:
status_code = ex.response.get("ResponseMetadata", {}).get("HTTPStatusCode")
if status_code == 404:
raise FileNotFoundFromStorageError(source_key)
else:
raise StorageError("Copying {!r} to {!r} failed: {!r}".format(source_key, destination_key, ex)) from ex

def get_metadata_for_key(self, key):
key = self.format_key_for_backend(key, remove_slash_prefix=True)
return self._metadata_for_key(key)
@@ -81,6 +81,12 @@ def progress_callback(pos, total):
assert st.get_contents_to_fileobj("test1/x1", out) == {"k": "v"}
assert out.getvalue() == b"dummy"

# Copy file
st.copy_file(source_key="test1/x1", destination_key="test_copy/copy1")
assert st.get_contents_to_string("test_copy/copy1") == (b"dummy", {"k": "v"})
st.copy_file(source_key="test1/x1", destination_key="test_copy/copy2", metadata={"new": "meta"})
assert st.get_contents_to_string("test_copy/copy2") == (b"dummy", {"new": "meta"})

st.store_file_from_memory("test1/x1", b"l", {"fancymetadata": "value"})
assert st.get_contents_to_string("test1/x1") == (b"l", {"fancymetadata": "value"})

0 comments on commit 626a30f

Please sign in to comment.
You can’t perform that action at this time.