Skip to content

Commit

Permalink
Merge pull request #399 from reef-technologies/large-file-cont
Browse files Browse the repository at this point in the history
Refactoring Large File Upload Continification Logic
  • Loading branch information
ppolewicz committed Jun 26, 2023
2 parents ff09ba7 + c9752b0 commit 0ad8ae8
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 111 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Infrastructure
* Replaced `pyflakes` with `ruff` for linting
* Refactored logic for resuming large file uploads to unify code paths, correct inconsistencies, and enhance configurability (#381)
* Automatically set copyright date when generating the docs

## [1.21.0] - 2023-04-17
Expand Down
292 changes: 181 additions & 111 deletions b2sdk/transfer/emerge/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def _get_unfinished_file_and_parts(
)
return unfinished_file, finished_parts

def _find_unfinished_file_by_plan_id(
def _find_matching_unfinished_file(
self,
bucket_id,
file_name,
Expand All @@ -330,61 +330,182 @@ def _find_unfinished_file_by_plan_id(
legal_hold: Optional[LegalHold] = None,
custom_upload_timestamp: Optional[int] = None,
cache_control: Optional[str] = None,
check_file_info_without_large_file_sha1: Optional[bool] = False,
eager_mode: Optional[bool] = False,
):
"""
Search for a matching unfinished large file in the specified bucket.
In case a matching file is found but has inconsistencies (for example, mismatching file info or encryption settings),
mismatches are logged.
:param bucket_id: The identifier of the bucket where the unfinished file resides.
:param file_name: The name of the file to be matched.
:param file_info: Information about the file to be uploaded.
:param emerge_parts_dict: A dictionary containing the parts of the file to be emerged.
:param encryption: The encryption settings for the file.
:param file_retention: The retention settings for the file, if any.
:param legal_hold: The legal hold status of the file, if any.
:param custom_upload_timestamp: The custom timestamp for the upload, if any.
:param cache_control: The cache control settings for the file, if any.
:param check_file_info_without_large_file_sha1: A flag indicating whether the file information should be checked without the `large_file_sha1`.
:param eager_mode: A flag indicating whether the first matching file should be returned.
:return: A tuple of the best matching unfinished file and its finished parts. If no match is found, returns `None`.
"""

file_retention = file_retention or NO_RETENTION_FILE_SETTING
assert 'plan_id' in file_info
best_match_file = None
best_match_parts = {}
best_match_parts_len = 0

for file_ in self.services.large_file.list_unfinished_large_files(
bucket_id, prefix=file_name
):
if file_.file_name != file_name:
logger.debug('Rejecting %s: file name mismatch', file_.file_id)
continue

if file_.file_info != file_info:
if check_file_info_without_large_file_sha1:
file_info_without_large_file_sha1 = self._get_file_info_without_large_file_sha1(
file_info
)
if file_info_without_large_file_sha1 != self._get_file_info_without_large_file_sha1(
file_.file_info
):
logger.debug(
'Rejecting %s: file info mismatch after dropping `large_file_sha1`',
file_.file_id
)
continue
else:
logger.debug('Rejecting %s: file info mismatch', file_.file_id)
continue

if encryption is not None and encryption != file_.encryption:
logger.debug('Rejecting %s: encryption mismatch', file_.file_id)
continue
# FIXME: encryption is None ???
if encryption is None or file_.encryption != encryption:

if cache_control is not None and cache_control != file_.cache_control:
logger.debug('Rejecting %s: cacheControl mismatch', file_.file_id)
continue

if legal_hold is None:
if LegalHold.UNSET != file_.legal_hold:
# Uploading and not providing legal_hold means that server's response about that file version
# will have legal_hold=LegalHold.UNSET
logger.debug('Rejecting %s: legal hold mismatch (not unset)', file_.file_id)
continue
elif legal_hold != file_.legal_hold:
logger.debug('Rejecting %s: legal hold mismatch', file_.file_id)
continue

if file_retention != file_.file_retention:
# if `file_.file_retention` is UNKNOWN then we skip - lib user can still
# pass UNKNOWN file_retention here - but raw_api/server won't allow it
# and we don't check it here
logger.debug('Rejecting %s: retention mismatch', file_.file_id)
continue

if custom_upload_timestamp is not None and file_.upload_timestamp != custom_upload_timestamp:
continue

if cache_control is None or file_.cache_control != cache_control:
logger.debug('Rejecting %s: custom_upload_timestamp mismatch', file_.file_id)
continue

finished_parts = {}

for part in self.services.large_file.list_parts(file_.file_id):

emerge_part = emerge_parts_dict.get(part.part_number)

if emerge_part is None:
# large file with same `plan_id` has more parts than current plan
# so we want to skip this large file because it is broken
# something is wrong - we have a part that we don't know about
# so we can't resume this upload
logger.debug(
'Rejecting %s: part %s not found in emerge parts, giving up.',
file_.file_id, part.part_number
)
finished_parts = None
break

# Compare part sizes
if emerge_part.get_length() != part.content_length:
logger.debug(
'Rejecting %s: part %s size mismatch', file_.file_id, part.part_number
)
continue # part size doesn't match - so we reupload

# Compare part hashes
if emerge_part.is_hashable() and emerge_part.get_sha1() != part.content_sha1:
continue # auto-healing - `plan_id` matches but part.sha1 doesn't - so we reupload
logger.debug(
'Rejecting %s: part %s sha1 mismatch', file_.file_id, part.part_number
)
continue # part.sha1 doesn't match - so we reupload

finished_parts[part.part_number] = part

if finished_parts is None:
continue

finished_parts_len = len(finished_parts)
if best_match_file is None or finished_parts_len > best_match_parts_len:

if finished_parts and (
best_match_file is None or finished_parts_len > best_match_parts_len
):
best_match_file = file_
best_match_parts = finished_parts
best_match_parts_len = finished_parts_len

if eager_mode and best_match_file is not None:
break

return best_match_file, best_match_parts

def _find_unfinished_file_by_plan_id(
self,
bucket_id,
file_name,
file_info,
emerge_parts_dict,
encryption: EncryptionSetting,
file_retention: Optional[FileRetentionSetting] = None,
legal_hold: Optional[LegalHold] = None,
custom_upload_timestamp: Optional[int] = None,
cache_control: Optional[str] = None,
):
"""
Search for a matching unfinished large file by plan_id in the specified bucket.
This function aims to locate a matching unfinished large file using the plan_id and the supplied parameters.
It's used to resume an interrupted upload, centralizing the shared logic between `_find_unfinished_file_by_plan_id`
and `_match_unfinished_file_if_possible`.
In case a matching file is found but has inconsistencies (for example, mismatching file info or encryption settings),
the function checks if 'plan_id' is in file_info, as this is a prerequisite.
:param bucket_id: The identifier of the bucket where the unfinished file resides.
:param file_name: The name of the file to be matched.
:param file_info: Information about the file to be uploaded.
:param emerge_parts_dict: A dictionary containing the parts of the file to be emerged.
:param encryption: The encryption settings for the file.
:param file_retention: The retention settings for the file, if any.
:param legal_hold: The legal hold status of the file, if any.
:param custom_upload_timestamp: The custom timestamp for the upload, if any.
:param cache_control: The cache control settings for the file, if any.
:return: A tuple of the best matching unfinished file and its finished parts. If no match is found, it returns `None`.
"""
if 'plan_id' not in file_info:
raise ValueError("The 'plan_id' key must be in file_info dictionary.")

return self._find_matching_unfinished_file(
bucket_id=bucket_id,
file_name=file_name,
file_info=file_info,
emerge_parts_dict=emerge_parts_dict,
encryption=encryption,
file_retention=file_retention or NO_RETENTION_FILE_SETTING,
legal_hold=legal_hold,
custom_upload_timestamp=custom_upload_timestamp,
cache_control=cache_control,
check_file_info_without_large_file_sha1=False,
)

@classmethod
def _get_file_info_without_large_file_sha1(
cls,
Expand All @@ -409,106 +530,55 @@ def _match_unfinished_file_if_possible(
cache_control: Optional[str] = None,
):
"""
Find an unfinished file that may be used to resume a large file upload. The
file is found using the filename and comparing the uploaded parts against
the local file.
This is only possible if the application key being used allows ``listFiles`` access.
Scan for a suitable unfinished large file in the specified bucket to resume upload.
This function examines each unfinished large file for a possible match with the provided
parameters. This enables resumption of an interrupted upload by reusing the unfinished file,
provided that file's info and additional parameters match.
Along with the filename and file info, additional parameters like encryption, file retention,
legal hold, custom upload timestamp, and cache control are compared for a match. The
'emerge_parts_dict' is also cross-checked for matching file parts.
Function is eager to find a match, and will return the first match it finds. If no match is
found, it returns `None`.
:param bucket_id: The identifier of the bucket containing the unfinished file.
:param file_name: The name of the file to find.
:param file_info: Information about the file to be uploaded.
:param emerge_parts_dict: A dictionary of the parts of the file to be emerged.
:param encryption: The encryption settings for the file.
:param file_retention: The retention settings for the file, if applicable.
:param legal_hold: The legal hold status of the file, if applicable.
:param custom_upload_timestamp: The custom timestamp for the upload, if set.
:param cache_control: The cache control settings for the file, if set.
:return: A tuple of the best matching unfinished file and its finished parts. If no match is found, returns `None`.
"""
file_retention = file_retention or NO_RETENTION_FILE_SETTING
file_info_without_large_file_sha1 = self._get_file_info_without_large_file_sha1(file_info)
logger.debug('Checking for matching unfinished large files for %s...', file_name)
for file_ in self.services.large_file.list_unfinished_large_files(
bucket_id, prefix=file_name
):
if file_.file_name != file_name:
logger.debug('Rejecting %s: file has a different file name', file_.file_id)
continue
if file_.file_info != file_info:
if (LARGE_FILE_SHA1 in file_.file_info) == (LARGE_FILE_SHA1 in file_info):
logger.debug(
'Rejecting %s: large_file_sha1 is present or missing in both file infos',
file_.file_id
)
continue

if self._get_file_info_without_large_file_sha1(
file_.file_info
) != file_info_without_large_file_sha1:
# ignoring the large_file_sha1 file infos are still different
logger.debug(
'Rejecting %s: file info mismatch after dropping `large_file_sha1`',
file_.file_id
)
continue

# FIXME: what if `encryption is None` - match ANY encryption? :)
if encryption is not None and encryption != file_.encryption:
logger.debug('Rejecting %s: encryption mismatch', file_.file_id)
continue

if cache_control is not None and cache_control != file_.cache_control:
logger.debug('Rejecting %s: cacheControl mismatch', file_.file_id)
continue

if legal_hold is None:
if LegalHold.UNSET != file_.legal_hold:
# Uploading and not providing legal_hold means that server's response about that file version
# will have legal_hold=LegalHold.UNSET
logger.debug('Rejecting %s: legal hold mismatch (not unset)', file_.file_id)
continue
elif legal_hold != file_.legal_hold:
logger.debug('Rejecting %s: legal hold mismatch', file_.file_id)
continue

if file_retention != file_.file_retention:
# if `file_.file_retention` is UNKNOWN then we skip - lib user can still
# pass UNKNOWN file_retention here - but raw_api/server won't allow it
# and we don't check it here
logger.debug('Rejecting %s: retention mismatch', file_.file_id)
continue

if custom_upload_timestamp is not None and file_.upload_timestamp != custom_upload_timestamp:
logger.debug('Rejecting %s: custom_upload_timestamp mismatch', file_.file_id)
continue

files_match = True
finished_parts = {}
for part in self.services.large_file.list_parts(file_.file_id):
emerge_part = emerge_parts_dict.get(part.part_number)
if emerge_part is None:
files_match = False
break

# Compare part sizes
if emerge_part.get_length() != part.content_length:
files_match = False
break

# Compare hash
assert emerge_part.is_hashable()
sha1_sum = emerge_part.get_sha1()
if sha1_sum != part.content_sha1:
files_match = False
break

# Save part
finished_parts[part.part_number] = part

# Skip not matching files or unfinished files with no uploaded parts
if not files_match or not finished_parts:
logger.debug('Rejecting %s: No finished parts or part mismatch', file_.file_id)
continue
file_, finished_parts = self._find_matching_unfinished_file(
bucket_id,
file_name,
file_info,
emerge_parts_dict,
encryption,
file_retention,
legal_hold,
custom_upload_timestamp,
cache_control,
check_file_info_without_large_file_sha1=True,
eager_mode=True,
)

# Return first matched file
logger.debug(
'Unfinished file %s matches with %i finished parts', file_.file_id,
len(finished_parts)
)
return file_, finished_parts
if file_ is None:
logger.debug('No matching unfinished files found.')
return None, {}

logger.debug('No matching unfinished files found.')
return None, {}
logger.debug(
'Unfinished file %s matches with %i finished parts', file_.file_id, len(finished_parts)
)
return file_, finished_parts


class BaseExecutionStepFactory(metaclass=ABCMeta):
Expand Down

0 comments on commit 0ad8ae8

Please sign in to comment.