Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYNPY-1253] Check MD5 before upload to verify change in content #1063

Merged
merged 7 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions synapseclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1593,6 +1593,7 @@ def store(
# Anything with a path is treated as a cache-able item
# Only Files are expected in the following logic
if entity.get("path", False) and not isinstance(obj, Folder):
local_file_md5_hex = None
if "concreteType" not in properties:
properties["concreteType"] = File._synapse_entity_type
# Make sure the path is fully resolved
Expand All @@ -1619,15 +1620,35 @@ def store(
fileHandle["externalURL"] != entity["externalURL"]
)
else:
synapse_store_flag = entity["synapseStore"] or local_state.get(
"synapseStore"
)
# Check if we need to upload a new version of an existing
# file. If the file referred to by entity['path'] has been
# modified, we want to upload the new version.
# If synapeStore is false then we must upload a ExternalFileHandle
needs_upload = not entity[
"synapseStore"
] or not self.cache.contains(
needs_upload = not synapse_store_flag or not self.cache.contains(
bundle["entity"]["dataFileHandleId"], entity["path"]
)

md5_stored_in_synapse = local_state.get("_file_handle", {}).get(
"contentMd5", None
) or (fileHandle or {}).get("contentMd5", None)

# Check if we got an MD5 checksum from Synapse and compare it to the local file
if (
synapse_store_flag
and needs_upload
and os.path.isfile(entity["path"])
and md5_stored_in_synapse
and md5_stored_in_synapse
== (
local_file_md5_hex := utils.md5_for_file(
entity["path"]
).hexdigest()
)
):
needs_upload = False
elif entity.get("dataFileHandleId", None) is not None:
needs_upload = False
else:
Expand Down Expand Up @@ -1657,7 +1678,7 @@ def store(
if (synapseStore or local_state_fh.get("externalURL") is None)
else local_state_fh.get("externalURL"),
synapseStore=synapseStore,
md5=local_state_fh.get("contentMd5"),
md5=local_file_md5_hex or local_state_fh.get("contentMd5"),
BWMac marked this conversation as resolved.
Show resolved Hide resolved
file_size=local_state_fh.get("contentSize"),
mimetype=local_state_fh.get("contentType"),
max_threads=self.max_threads,
Expand Down Expand Up @@ -3365,6 +3386,7 @@ def _createExternalObjectStoreFileHandle(
file_path: str,
storage_location_id: int,
mimetype: str = None,
md5: str = None,
) -> Dict[str, Union[str, int]]:
"""
Create a new FileHandle representing an external object.
Expand All @@ -3374,17 +3396,18 @@ def _createExternalObjectStoreFileHandle(
file_path: The local path of the uploaded file
storage_location_id: The optional storage location descriptor
mimetype: The Mimetype of the file, if known.
md5: The file's content MD5, if known.

Returns:
A FileHandle for objects that are stored externally.
"""
if mimetype is None:
mimetype, enc = mimetypes.guess_type(file_path, strict=False)
mimetype, _ = mimetypes.guess_type(file_path, strict=False)
file_handle = {
"concreteType": concrete_types.EXTERNAL_OBJECT_STORE_FILE_HANDLE,
"fileKey": s3_file_key,
"fileName": os.path.basename(file_path),
"contentMd5": utils.md5_for_file(file_path).hexdigest(),
"contentMd5": md5 or utils.md5_for_file(file_path).hexdigest(),
"contentSize": os.stat(file_path).st_size,
"storageLocationId": storage_location_id,
"contentType": mimetype,
Expand All @@ -3404,6 +3427,7 @@ def create_external_s3_file_handle(
parent=None,
storage_location_id=None,
mimetype=None,
md5: str = None,
):
"""
Create an external S3 file handle for e.g. a file that has been uploaded directly to
Expand All @@ -3419,6 +3443,7 @@ def create_external_s3_file_handle(
storage_location_id: Explicit storage location id to create the file handle in, mutually exclusive
with parent
mimetype: Mimetype of the file, if known
md5: MD5 of the file, if known

Raises:
ValueError: If neither parent nor storage_location_id is specified, or if both are specified.
Expand All @@ -3434,14 +3459,14 @@ def create_external_s3_file_handle(
storage_location_id = upload_destination["storageLocationId"]

if mimetype is None:
mimetype, enc = mimetypes.guess_type(file_path, strict=False)
mimetype, _ = mimetypes.guess_type(file_path, strict=False)

file_handle = {
"concreteType": concrete_types.S3_FILE_HANDLE,
"key": s3_file_key,
"bucketName": bucket_name,
"fileName": os.path.basename(file_path),
"contentMd5": utils.md5_for_file(file_path).hexdigest(),
"contentMd5": md5 or utils.md5_for_file(file_path).hexdigest(),
"contentSize": os.stat(file_path).st_size,
"storageLocationId": storage_location_id,
"contentType": mimetype,
Expand Down
13 changes: 8 additions & 5 deletions synapseclient/core/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,9 @@ def get(
# None if there are no unmodified files
for cached_file_path, cache_map_entry in sorted(
cache_map.items(),
key=lambda item: item[1]["modified_time"]
if isinstance(item[1], dict)
else item[1],
key=lambda item: (
item[1]["modified_time"] if isinstance(item[1], dict) else item[1]
),
reverse=True,
):
if self._cache_item_unmodified(cache_map_entry, cached_file_path):
Expand All @@ -335,7 +335,10 @@ def get(
return None

def add(
self, file_handle_id: typing.Union[collections.abc.Mapping, str], path: str
self,
file_handle_id: typing.Union[collections.abc.Mapping, str],
path: str,
md5: str = None,
) -> dict:
"""
Add a file to the cache
Expand All @@ -353,7 +356,7 @@ def add(
"modified_time": epoch_time_to_iso(
math.floor(_get_modified_time(path))
),
"content_md5": utils.md5_for_file(path).hexdigest(),
"content_md5": md5 or utils.md5_for_file(path).hexdigest(),
}
self._write_cache_map(cache_dir, cache_map)

Expand Down
10 changes: 6 additions & 4 deletions synapseclient/core/upload/multipart_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ def multipart_upload_file(
preview: bool = True,
force_restart: bool = False,
max_threads: int = None,
md5: str = None,
) -> str:
"""Upload a file to a Synapse upload destination in chunks.

Expand All @@ -461,6 +462,7 @@ def multipart_upload_file(
from scratch, False to try to resume
max_threads: number of concurrent threads to devote
to upload
md5: The MD5 of the file. If not provided, it will be calculated.

Returns:
a File Handle ID
Expand All @@ -471,9 +473,9 @@ def multipart_upload_file(
"""
trace.get_current_span().set_attributes(
{
"synapse.storage_location_id": storage_location_id
if storage_location_id is not None
else ""
"synapse.storage_location_id": (
storage_location_id if storage_location_id is not None else ""
)
}
)

Expand All @@ -491,7 +493,7 @@ def multipart_upload_file(
content_type = mime_type or "application/octet-stream"

callback_func = Spinner().print_tick if not syn.silent else None
md5_hex = md5_for_file(file_path, callback=callback_func).hexdigest()
md5_hex = md5 or md5_for_file(file_path, callback=callback_func).hexdigest()

part_size = _get_part_size(part_size, file_size)

Expand Down
Loading
Loading