Permalink
Browse files

mods to support efficient EOF seek for KeyFile

  • Loading branch information...
1 parent f2ee238 commit 4bc87271c2200011bd3bda213ffc791e23cc9e04 @mfschwartz mfschwartz committed Jan 28, 2013
View
2 boto/connection.py
@@ -821,7 +821,7 @@ def _mexe(self, request, sender=None, override_num_retries=None,
i = 0
connection = self.get_http_connection(request.host, self.is_secure)
while i <= num_retries:
- # Use binary exponential backoff to desynchronize client requests
+ # Use binary exponential backoff to desynchronize client requests.
next_sleep = random.random() * (2 ** i)
try:
# we now re-sign each request before it is retried
View
56 boto/gs/key.py
@@ -19,10 +19,14 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
+import base64
+import binascii
import os
+import re
import StringIO
from boto.exception import BotoClientError
from boto.s3.key import Key as S3Key
+from boto.s3.keyfile import KeyFile
class Key(S3Key):
"""
@@ -284,22 +288,50 @@ def set_contents_from_file(self, fp, headers=None, replace=True,
# caller requests reading from beginning of fp.
fp.seek(0, os.SEEK_SET)
else:
- spos = fp.tell()
- fp.seek(0, os.SEEK_END)
- if fp.tell() == spos:
- fp.seek(0, os.SEEK_SET)
- if fp.tell() != spos:
- # Raise an exception as this is likely a programming error
- # whereby there is data before the fp but nothing after it.
- fp.seek(spos)
- raise AttributeError(
- 'fp is at EOF. Use rewind option or seek() to data start.')
- # seek back to the correct position.
- fp.seek(spos)
+ # The following seek/tell/seek logic is intended
+ # to detect applications using the older interface to
+ # set_contents_from_file(), which automatically rewound the
+ # file each time the Key was reused. This changed with commit
+ # 14ee2d03f4665fe20d19a85286f78d39d924237e, to support uploads
+ # split into multiple parts and uploaded in parallel, and at
+ # the time of that commit this check was added because otherwise
+ # older programs would get a success status and upload an empty
+ # object. Unfortuantely, it's very inefficient for fp's implemented
+ # by KeyFile (used, for example, by gsutil when copying between
+ # providers). So, we skip the check for the KeyFile case.
+ # TODO: At some point consider removing this seek/tell/seek
+ # logic, after enough time has passed that it's unlikely any
+ # programs remain that assume the older auto-rewind interface.
+ if not isinstance(fp, KeyFile):
+ spos = fp.tell()
+ fp.seek(0, os.SEEK_END)
+ if fp.tell() == spos:
+ fp.seek(0, os.SEEK_SET)
+ if fp.tell() != spos:
+ # Raise an exception as this is likely a programming
+ # error whereby there is data before the fp but nothing
+ # after it.
+ fp.seek(spos)
+ raise AttributeError('fp is at EOF. Use rewind option '
+ 'or seek() to data start.')
+ # seek back to the correct position.
+ fp.seek(spos)
if hasattr(fp, 'name'):
self.path = fp.name
if self.bucket != None:
+ if isinstance(fp, KeyFile):
+ # Avoid EOF seek for KeyFile case as it's very inefficient.
+ key = fp.getkey()
+ size = key.size - fp.tell()
+ self.size = size
+ # At present both GCS and S3 use MD5 for the etag for
+ # non-multipart-uploaded objects. If the etag is 32 hex
+ # chars use it as an MD5, to avoid having to read the file
+ # twice while transferring.
+ if (re.match('^"[a-fA-F0-9]{32}"$', key.etag)):
+ etag = key.etag.strip('"')
+ md5 = (etag, base64.b64encode(binascii.unhexlify(etag)))
if size:
self.size = size
else:
View
23 boto/gs/resumable_upload_handler.py
@@ -34,6 +34,7 @@
from boto.exception import InvalidUriError
from boto.exception import ResumableTransferDisposition
from boto.exception import ResumableUploadException
+from boto.s3.keyfile import KeyFile
try:
from hashlib import md5
except ImportError:
@@ -447,7 +448,12 @@ def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
self.upload_start_point = server_end
total_bytes_uploaded = server_end + 1
- fp.seek(total_bytes_uploaded)
+ # Corner case: Don't attempt to seek if we've already uploaded the
+ # entire file, because if the file is a stream (e.g., the KeyFile
+ # wrapper around input key when copying between providers), attempting
+ # to seek to the end of file would result in an InvalidRange error.
+ if file_length < total_bytes_uploaded:
+ fp.seek(total_bytes_uploaded)
conn = key.bucket.connection
# Get a new HTTP connection (vs conn.get_http_connection(), which reuses
@@ -537,7 +543,7 @@ def track_progress_less_iterations(self, server_had_bytes_before_attempt,
'progress. You might try this upload again later',
ResumableTransferDisposition.ABORT_CUR_PROCESS)
- # Use binary exponential backoff to desynchronize client requests
+ # Use binary exponential backoff to desynchronize client requests.
sleep_time_secs = random.random() * (2**self.progress_less_iterations)
if debug >= 1:
print ('Got retryable failure (%d progress-less in a row).\n'
@@ -585,9 +591,14 @@ def send_file(self, key, fp, headers, cb=None, num_cb=10):
if CT in headers and headers[CT] is None:
del headers[CT]
- fp.seek(0, os.SEEK_END)
- file_length = fp.tell()
- fp.seek(0)
+ # Determine file size different ways for case where fp is actually a
+ # wrapper around a Key vs an actual file.
+ if isinstance(fp, KeyFile):
+ file_length = fp.getkey().size
+ else:
+ fp.seek(0, os.SEEK_END)
+ file_length = fp.tell()
+ fp.seek(0)
debug = key.bucket.connection.debug
# Compute the MD5 checksum on the fly.
@@ -596,7 +607,7 @@ def send_file(self, key, fp, headers, cb=None, num_cb=10):
# Use num-retries from constructor if one was provided; else check
# for a value specified in the boto config file; else default to 5.
if self.num_retries is None:
- self.num_retries = config.getint('Boto', 'num_retries', 5)
+ self.num_retries = config.getint('Boto', 'num_retries', 6)
self.progress_less_iterations = 0
while True: # Retry as long as we're making progress.
View
56 boto/s3/key.py
@@ -27,11 +27,13 @@
import rfc822
import StringIO
import base64
+import binascii
import math
import urllib
import boto.utils
from boto.exception import BotoClientError
from boto.provider import Provider
+from boto.s3.keyfile import KeyFile
from boto.s3.user import User
from boto import UserAgent
from boto.utils import compute_md5
@@ -158,7 +160,6 @@ def get_md5_from_hexdigest(self, md5_hexdigest):
A utility function to create the 2-tuple (md5hexdigest, base64md5)
from just having a precalculated md5_hexdigest.
"""
- import binascii
digest = binascii.unhexlify(md5_hexdigest)
base64md5 = base64.encodestring(digest)
if base64md5[-1] == '\n':
@@ -1033,18 +1034,34 @@ class of the new Key to be REDUCED_REDUNDANCY. The Reduced
# caller requests reading from beginning of fp.
fp.seek(0, os.SEEK_SET)
else:
- spos = fp.tell()
- fp.seek(0, os.SEEK_END)
- if fp.tell() == spos:
- fp.seek(0, os.SEEK_SET)
- if fp.tell() != spos:
- # Raise an exception as this is likely a programming error
- # whereby there is data before the fp but nothing after it.
- fp.seek(spos)
- raise AttributeError(
- 'fp is at EOF. Use rewind option or seek() to data start.')
- # seek back to the correct position.
- fp.seek(spos)
+ # The following seek/tell/seek logic is intended
+ # to detect applications using the older interface to
+ # set_contents_from_file(), which automatically rewound the
+ # file each time the Key was reused. This changed with commit
+ # 14ee2d03f4665fe20d19a85286f78d39d924237e, to support uploads
+ # split into multiple parts and uploaded in parallel, and at
+ # the time of that commit this check was added because otherwise
+ # older programs would get a success status and upload an empty
+ # object. Unfortuantely, it's very inefficient for fp's implemented
+ # by KeyFile (used, for example, by gsutil when copying between
+ # providers). So, we skip the check for the KeyFile case.
+ # TODO: At some point consider removing this seek/tell/seek
+ # logic, after enough time has passed that it's unlikely any
+ # programs remain that assume the older auto-rewind interface.
+ if not isinstance(fp, KeyFile):
+ spos = fp.tell()
+ fp.seek(0, os.SEEK_END)
+ if fp.tell() == spos:
+ fp.seek(0, os.SEEK_SET)
+ if fp.tell() != spos:
+ # Raise an exception as this is likely a programming
+ # error whereby there is data before the fp but nothing
+ # after it.
+ fp.seek(spos)
+ raise AttributeError('fp is at EOF. Use rewind option '
+ 'or seek() to data start.')
+ # seek back to the correct position.
+ fp.seek(spos)
if reduced_redundancy:
self.storage_class = 'REDUCED_REDUNDANCY'
@@ -1054,7 +1071,6 @@ class of the new Key to be REDUCED_REDUNDANCY. The Reduced
# What if different providers provide different classes?
if hasattr(fp, 'name'):
self.path = fp.name
-
if self.bucket != None:
if not md5 and provider.supports_chunked_transfer():
# defer md5 calculation to on the fly and
@@ -1063,6 +1079,18 @@ class of the new Key to be REDUCED_REDUNDANCY. The Reduced
self.size = None
else:
chunked_transfer = False
+ if isinstance(fp, KeyFile):
+ # Avoid EOF seek for KeyFile case as it's very inefficient.
+ key = fp.getkey()
+ size = key.size - fp.tell()
+ self.size = size
+ # At present both GCS and S3 use MD5 for the etag for
+ # non-multipart-uploaded objects. If the etag is 32 hex
+ # chars use it as an MD5, to avoid having to read the file
+ # twice while transferring.
+ if (re.match('^"[a-fA-F0-9]{32}"$', key.etag)):
+ etag = key.etag.strip('"')
+ md5 = (etag, base64.b64encode(binascii.unhexlify(etag)))
if not md5:
# compute_md5() and also set self.size to actual
# size of the bytes read computing the md5.
View
134 boto/s3/keyfile.py
@@ -0,0 +1,134 @@
+# Copyright 2013 Google Inc.
+# Copyright 2011, Nexenta Systems Inc.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+
+"""
+Wrapper class to expose a Key being read via a partial implementaiton of the
+Python file interface. The only functions supported are those needed for seeking
+in a Key open for reading.
+"""
+
+import os
+
+class KeyFile():
+
+ def __init__(self, key):
+ self.key = key
+ self.key.open_read()
+ self.location = 0
+ self.closed = False
+ self.softspace = -1 # Not implemented.
+ self.mode = 'r'
+ self.encoding = 'Undefined in KeyFile'
+ self.errors = 'Undefined in KeyFile'
+ self.newlines = 'Undefined in KeyFile'
+ self.name = key.name
+
+ def tell(self):
+ if self.location is None:
+ raise ValueError("I/O operation on closed file")
+ return self.location
+
+ def seek(self, pos, whence=os.SEEK_SET):
+ # Note: This seek implementation is very inefficient if you have a Key
+ # positioned to the start of a very large object and then call seek(0,
+ # os.SEEK_END), because it will first read all the data from the open socket
+ # before performing the range GET to position to the end of the file.
+ self.key.close()
+ if whence == os.SEEK_END:
+ # We need special handling for this case because sending an HTTP range GET
+ # with EOF for the range start would cause an invalid range error. Instead
+ # we position to one before EOF (plus pos) and then read one byte to
+ # position at EOF.
+ pos = self.key.size + pos - 1
+ if pos < 0:
+ raise IOError("Invalid argument")
+ self.key.open_read(headers={"Range": "bytes=%d-" % pos})
+ self.key.read(1)
+ self.location = pos + 1
+ return
+
+ if whence == os.SEEK_SET:
+ if pos < 0:
+ raise IOError("Invalid argument")
+ elif whence == os.SEEK_CUR:
+ pos += self.location
+ else:
+ raise IOError('Invalid whence param (%d) passed to seek' % whence)
+ try:
+ self.key.open_read(headers={"Range": "bytes=%d-" % pos})
+ except GSResponseError as e:
+ # 416 Invalid Range means that the given starting byte was past the end
+ # of file. We catch this because the Python file interface allows silently
+ # seeking past the end of the file.
+ if e.status != 416:
+ raise
+
+ self.location = pos
+
+ def read(self, size):
+ self.location += size
+ return self.key.read(size)
+
+ def close(self):
+ self.key.close()
+ self.location = None
+ self.closed = True
+
+ def isatty(self):
+ return False
+
+ # Non-file interface, useful for code that wants to dig into underlying Key
+ # state.
+ def getkey(self):
+ return self.key
+
+ # Unimplemented interfaces below here.
+
+ def write(self, buf):
+ raise NotImplementedError('write not implemented in KeyFile')
+
+ def fileno(self):
+ raise NotImplementedError('fileno not implemented in KeyFile')
+
+ def flush(self):
+ raise NotImplementedError('flush not implemented in KeyFile')
+
+ def next(self):
+ raise NotImplementedError('next not implemented in KeyFile')
+
+ def readinto(self):
+ raise NotImplementedError('readinto not implemented in KeyFile')
+
+ def readline(self):
+ raise NotImplementedError('readline not implemented in KeyFile')
+
+ def readlines(self):
+ raise NotImplementedError('readlines not implemented in KeyFile')
+
+ def truncate(self):
+ raise NotImplementedError('truncate not implemented in KeyFile')
+
+ def writelines(self):
+ raise NotImplementedError('writelines not implemented in KeyFile')
+
+ def xreadlines(self):
+ raise NotImplementedError('xreadlines not implemented in KeyFile')
View
4 boto/s3/resumable_download_handler.py
@@ -30,6 +30,7 @@
from boto.connection import AWSAuthConnection
from boto.exception import ResumableDownloadException
from boto.exception import ResumableTransferDisposition
+from boto.s3.keyfile import KeyFile
"""
Resumable download handler.
@@ -72,6 +73,9 @@ def get_cur_file_size(fp, position_to_eof=False):
"""
Returns size of file, optionally leaving fp positioned at EOF.
"""
+ if isinstance(fp, KeyFile) and not position_to_eof:
+ # Avoid EOF seek for KeyFile case as it's very inefficient.
+ return fp.getkey().size
if not position_to_eof:
cur_pos = fp.tell()
fp.seek(0, os.SEEK_END)
View
9 tests/integration/s3/mock_storage_service.py
@@ -29,6 +29,7 @@
import copy
import boto
import base64
+import re
from boto.utils import compute_md5
from boto.s3.prefix import Prefix
@@ -105,11 +106,17 @@ def _handle_headers(self, headers):
if 'Content-Language' in headers:
self.content_language = headers['Content-Language']
- def open_read(self, headers=NOT_IMPL, query_args=NOT_IMPL,
+ # Simplistic partial implementation for headers: Just supports range GETs
+ # of flavor 'Range: bytes=xyz-'.
+ def open_read(self, headers=None, query_args=NOT_IMPL,
override_num_retries=NOT_IMPL):
if self.closed:
self.read_pos = 0
self.closed = False
+ if headers and 'Range' in headers:
+ match = re.match('bytes=([0-9]+)-$', headers['Range'])
+ if match:
+ self.read_pos = int(match.group(1))
def close(self):
self.closed = True
View
95 tests/unit/s3/test_keyfile.py
@@ -0,0 +1,95 @@
+# Copyright 2013 Google Inc.
+# Copyright 2011, Nexenta Systems Inc.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+
+import os
+import unittest
+from boto.s3.keyfile import KeyFile
+from tests.integration.s3.mock_storage_service import MockConnection
+from tests.integration.s3.mock_storage_service import MockBucket
+
+
+class KeyfileTest(unittest.TestCase):
+
+ def setUp(self):
+ service_connection = MockConnection()
+ self.contents = '0123456789'
+ bucket = MockBucket(service_connection, 'mybucket')
+ key = bucket.new_key('mykey')
+ key.set_contents_from_string(self.contents)
+ self.keyfile = KeyFile(key)
+
+ def tearDown(self):
+ self.keyfile.close()
+
+ def testReadFull(self):
+ self.assertEqual(self.keyfile.read(len(self.contents)), self.contents)
+
+ def testReadPartial(self):
+ self.assertEqual(self.keyfile.read(5), self.contents[:5])
+ self.assertEqual(self.keyfile.read(5), self.contents[5:])
+
+ def testTell(self):
+ self.assertEqual(self.keyfile.tell(), 0)
+ self.keyfile.read(4)
+ self.assertEqual(self.keyfile.tell(), 4)
+ self.keyfile.read(6)
+ self.assertEqual(self.keyfile.tell(), 10)
+ self.keyfile.close()
+ with self.assertRaisesRegexp(ValueError, 'operation on closed file'):
+ self.keyfile.tell()
+
+ def testSeek(self):
+ self.assertEqual(self.keyfile.read(4), self.contents[:4])
+ self.keyfile.seek(0)
+ self.assertEqual(self.keyfile.read(4), self.contents[:4])
+ self.keyfile.seek(5)
+ self.assertEqual(self.keyfile.read(5), self.contents[5:])
+
+ # Seeking negative should raise.
+ with self.assertRaisesRegexp(IOError, 'Invalid argument'):
+ self.keyfile.seek(-5)
+
+ # Reading past end of file is supposed to return empty string.
+ self.keyfile.read(10)
+ self.assertEqual(self.keyfile.read(20), '')
+
+ # Seeking past end of file is supposed to silently work.
+ self.keyfile.seek(50)
+ self.assertEqual(self.keyfile.tell(), 50)
+ self.assertEqual(self.keyfile.read(1), '')
+
+ def testSeekEnd(self):
+ self.assertEqual(self.keyfile.read(4), self.contents[:4])
+ self.keyfile.seek(0, os.SEEK_END)
+ self.assertEqual(self.keyfile.read(1), '')
+ self.keyfile.seek(-1, os.SEEK_END)
+ self.assertEqual(self.keyfile.tell(), 9)
+ self.assertEqual(self.keyfile.read(1), '9')
+ # Test attempt to seek backwards past the start from the end.
+ with self.assertRaises(IOError):
+ self.keyfile.seek(-100, os.SEEK_END)
+
+ def testSeekCur(self):
+ self.assertEqual(self.keyfile.read(1), self.contents[0])
+ self.keyfile.seek(1, os.SEEK_CUR)
+ self.assertEqual(self.keyfile.tell(), 2)
+ self.assertEqual(self.keyfile.read(4), self.contents[2:6])

0 comments on commit 4bc8727

Please sign in to comment.