Permalink
Browse files

Merge branch 'basak-glacier-upload-resume' into develop

  • Loading branch information...
jamesls committed Nov 9, 2012
2 parents fe2359f + ef4fa93 commit 2af2774041c26dbcc652512420c77399391cd5f9
Showing with 549 additions and 48 deletions.
  1. +89 −3 boto/glacier/vault.py
  2. +175 −42 boto/glacier/writer.py
  3. +120 −1 tests/unit/glacier/test_layer2.py
  4. +165 −2 tests/unit/glacier/test_writer.py
View
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2012 Thomas Parslow http://almostobsolete.net/
+# Copyright (c) 2012 Robie Basak <robie@justgohome.co.uk>
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
@@ -23,7 +24,7 @@
from __future__ import with_statement
from .exceptions import UploadArchiveError
from .job import Job
-from .writer import Writer, compute_hashes_from_fileobj
+from .writer import compute_hashes_from_fileobj, resume_file_upload, Writer
from .concurrent import ConcurrentUploader
from .utils import minimum_part_size, DEFAULT_PART_SIZE
import os.path
@@ -126,7 +127,7 @@ def create_archive_writer(self, part_size=DefaultPartSize,
return Writer(self, response['UploadId'], part_size=part_size)
def create_archive_from_file(self, filename=None, file_obj=None,
- description=None):
+ description=None, upload_id_callback=None):
"""
Create a new archive and upload the data from the given file
or file-like object.
@@ -140,6 +141,11 @@ def create_archive_from_file(self, filename=None, file_obj=None,
:type description: str
:param description: An optional description for the archive.
+ :type upload_id_callback: function
+ :param upload_id_callback: if set, call with the upload_id as the
+ only parameter when it becomes known, to enable future calls
+ to resume_archive_from_file in case resume is needed.
+
:rtype: str
:return: The archive id of the newly created archive
"""
@@ -152,8 +158,11 @@ def create_archive_from_file(self, filename=None, file_obj=None,
raise UploadArchiveError("File size of %s bytes exceeds "
"40,000 GB archive limit of Glacier.")
file_obj = open(filename, "rb")
- writer = self.create_archive_writer(description=description,
+ writer = self.create_archive_writer(
+ description=description,
part_size=part_size)
+ if upload_id_callback:
+ upload_id_callback(writer.upload_id)
while True:
data = file_obj.read(part_size)
if not data:
@@ -162,6 +171,63 @@ def create_archive_from_file(self, filename=None, file_obj=None,
writer.close()
return writer.get_archive_id()
+ @staticmethod
+ def _range_string_to_part_index(range_string, part_size):
+ start, inside_end = [int(value) for value in range_string.split('-')]
+ end = inside_end + 1
+ length = end - start
+ if length == part_size + 1:
+ # Off-by-one bug in Amazon's Glacier implementation,
+ # see: https://forums.aws.amazon.com/thread.jspa?threadID=106866
+ # Workaround: since part_size is too big by one byte, adjust it
+ end -= 1
+ inside_end -= 1
+ length -= 1
+ assert not (start % part_size), (
+ "upload part start byte is not on a part boundary")
+ assert (length <= part_size), "upload part is bigger than part size"
+ return start // part_size
+
+ def resume_archive_from_file(self, upload_id, filename=None,
+ file_obj=None):
+ """Resume upload of a file already part-uploaded to Glacier.
+
+ The resumption of an upload where the part-uploaded section is empty
+ is a valid degenerate case that this function can handle.
+
+ One and only one of filename or file_obj must be specified.
+
+ :type upload_id: str
+ :param upload_id: existing Glacier upload id of upload being resumed.
+
+ :type filename: str
+ :param filename: file to open for resume
+
+ :type fobj: file
+ :param fobj: file-like object containing local data to resume. This
+ must read from the start of the entire upload, not just from the
+ point being resumed. Use fobj.seek(0) to achieve this if necessary.
+
+ :rtype: str
+ :return: The archive id of the newly created archive
+
+ """
+ part_list_response = self.list_all_parts(upload_id)
+ part_size = part_list_response['PartSizeInBytes']
+
+ part_hash_map = {}
+ for part_desc in part_list_response['Parts']:
+ part_index = self._range_string_to_part_index(
+ part_desc['RangeInBytes'], part_size)
+ part_tree_hash = part_desc['SHA256TreeHash'].decode('hex')
+ part_hash_map[part_index] = part_tree_hash
+
+ if not file_obj:
+ file_obj = open(filename, "rb")
+
+ return resume_file_upload(
+ self, upload_id, part_size, file_obj, part_hash_map)
+
def concurrent_create_archive_from_file(self, filename):
"""
Create a new archive from a file and upload the given
@@ -290,3 +356,23 @@ def list_jobs(self, completed=None, status_code=None):
response_data = self.layer1.list_jobs(self.name, completed,
status_code)
return [Job(self, jd) for jd in response_data['JobList']]
+
+ def list_all_parts(self, upload_id):
+ """Automatically make and combine multiple calls to list_parts.
+
+ Call list_parts as necessary, combining the results in case multiple
+ calls were required to get data on all available parts.
+
+ """
+ result = self.layer1.list_parts(self.name, upload_id)
+ marker = result['Marker']
+ while marker:
+ additional_result = self.layer1.list_parts(
+ self.name, upload_id, marker=marker)
+ result['Parts'].extend(additional_result['Parts'])
+ marker = additional_result['Marker']
+ # The marker makes no sense in an unpaginated result, and clearing it
+ # makes testing easier. This also has the nice property that the result
+ # is a normal (but expanded) response.
+ result['Marker'] = None
+ return result
Oops, something went wrong.

0 comments on commit 2af2774

Please sign in to comment.