Skip to content

Commit

Permalink
Add support for internal copy in S3 blob db
Browse files Browse the repository at this point in the history
Internal copy performs an efficient copy operation when `put`ting a blob that was previously loaded using `get`.

A new version of boto3 (v1.4.0) is needed for the copy operation. The upgrade also simplified multipart upload support.
  • Loading branch information
millerdev committed Sep 12, 2016
1 parent 1bd0a74 commit dd85087
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 226 deletions.
193 changes: 37 additions & 156 deletions corehq/blobs/s3db.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import absolute_import
import os
import weakref
from contextlib import contextmanager
from threading import Lock

Expand All @@ -8,7 +9,7 @@
from corehq.blobs.util import ClosingContextProxy

import boto3
from boto3.s3.transfer import S3Transfer, ReadFileChunk
from boto3.s3.transfer import S3Transfer
from botocore.client import Config
from botocore.handlers import calculate_md5
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -40,20 +41,24 @@ def __init__(self, config):
def put(self, content, basename="", bucket=DEFAULT_BUCKET):
identifier = self.get_identifier(basename)
path = self.get_path(identifier, bucket)
self._s3_bucket(create=True)
osutil = OpenFileOSUtils()
transfer = S3Transfer(self.db.meta.client, osutil=osutil)
transfer.upload_file(content, self.s3_bucket_name, path)
s3_bucket = self._s3_bucket(create=True)
if isinstance(content, BlobStream) and content.blob_db is self:
source = {"Bucket": self.s3_bucket_name, "Key": content.blob_path}
s3_bucket.copy(source, path)
obj = s3_bucket.Object(path)
# unfortunately cannot get content-md5 here
return BlobInfo(identifier, obj.content_length, None)
content.seek(0)
content_md5 = get_content_md5(content)
content_length = osutil.get_file_size(content)
content_length = get_file_size(content)
s3_bucket.upload_fileobj(content, path)
return BlobInfo(identifier, content_length, "md5-" + content_md5)

def get(self, identifier, bucket=DEFAULT_BUCKET):
path = self.get_path(identifier, bucket)
with maybe_not_found(throw=NotFound(identifier, bucket)):
resp = self._s3_bucket().Object(path).get()
return ClosingContextProxy(resp["Body"]) # body stream
return BlobStream(resp["Body"], self, path)

def delete(self, *args, **kw):
identifier, bucket = self.get_args_for_delete(*args, **kw)
Expand All @@ -78,9 +83,8 @@ def delete(self, *args, **kw):
def copy_blob(self, content, info, bucket):
self._s3_bucket(create=True)
path = self.get_path(info.identifier, bucket)
osutil = OpenFileOSUtils()
transfer = S3Transfer(self.db.meta.client, osutil=osutil)
transfer.upload_file(content, self.s3_bucket_name, path)
self._s3_bucket().upload_fileobj(content, path)


def _s3_bucket(self, create=False):
if create and not self._s3_bucket_exists:
Expand All @@ -99,6 +103,18 @@ def get_path(self, identifier=None, bucket=DEFAULT_BUCKET):
return safejoin(bucket, identifier)


class BlobStream(ClosingContextProxy):

def __init__(self, stream, blob_db, blob_path):
super(BlobStream, self).__init__(stream)
self._blob_db = weakref.ref(blob_db)
self.blob_path = blob_path

@property
def blob_db(self):
return self._blob_db()


def safepath(path):
if (path.startswith(("/", ".")) or
"/../" in path or
Expand All @@ -125,6 +141,17 @@ def get_content_md5(content):
return params["headers"]["Content-MD5"]


def get_file_size(fileobj):
if not hasattr(fileobj, 'fileno'):
pos = fileobj.tell()
try:
fileobj.seek(0, os.SEEK_END)
return fileobj.tell()
finally:
fileobj.seek(pos)
return os.fstat(fileobj.fileno()).st_size


@contextmanager
def maybe_not_found(throw=None):
try:
Expand All @@ -134,149 +161,3 @@ def maybe_not_found(throw=None):
raise
if throw is not None:
raise throw


class OpenFileOSUtils(object):

def get_file_size(self, fileobj):
if not hasattr(fileobj, 'fileno'):
pos = fileobj.tell()
try:
fileobj.seek(0, os.SEEK_END)
return fileobj.tell()
finally:
fileobj.seek(pos)
return os.fstat(fileobj.fileno()).st_size

def open_file_chunk_reader(self, fileobj, start_byte, size, callback):
full_size = self.get_file_size(fileobj)
return ReadOpenFileChunk(fileobj, start_byte, size, full_size,
callback, enable_callback=False)

def open(self, filename, mode):
raise NotImplementedError

def remove_file(self, filename):
raise NotImplementedError

def rename_file(self, current_filename, new_filename):
raise NotImplementedError


class ReadOpenFileChunk(ReadFileChunk):
"""Wrapper for OpenFileChunk that implements ReadFileChunk interface
"""

def __init__(self, fileobj, start_byte, chunk_size, full_file_size, *args, **kw):

class FakeFile:

def seek(self, pos):
pass

length = min(chunk_size, full_file_size - start_byte)
self._chunk = OpenFileChunk(fileobj, start_byte, length)
super(ReadOpenFileChunk, self).__init__(
FakeFile(), start_byte, chunk_size, full_file_size, *args, **kw)
assert self._size == length, (self._size, length)

def __repr__(self):
return ("<ReadOpenFileChunk {} offset={} length={}>".format(
self._chunk.file,
self._start_byte,
self._size,
))

def read(self, amount=None):
data = self._chunk.read(amount)
if self._callback is not None and self._callback_enabled:
self._callback(len(data))
return data

def seek(self, where):
old_pos = self._chunk.tell()
self._chunk.seek(where)
if self._callback is not None and self._callback_enabled:
# To also rewind the callback() for an accurate progress report
self._callback(where - old_pos)

def tell(self):
return self._chunk.tell()

def close(self):
self._chunk.close()
self._chunk = None

def __enter__(self):
return self

def __exit__(self, *args, **kwargs):
self.close()


class OpenFileChunk(object):
"""A wrapper for reading from a file-like object from multiple threads
Each thread reading from the file-like object should have its own
private instance of this class.
"""

init_lock = Lock()
file_locks = {}

def __init__(self, fileobj, start_byte, length):
with self.init_lock:
try:
lock, refs = self.file_locks[fileobj]
except KeyError:
lock, refs = self.file_locks[fileobj] = (Lock(), set())
refs.add(self)
self.lock = lock
self.file = fileobj
self.start = self.offset = start_byte
self.length = length

def read(self, amount=None):
if self.offset >= self.start + self.length:
return b""
with self.lock:
pos = self.file.tell()
self.file.seek(self.offset)

if amount is None:
amount = self.length
amount = min(self.length - self.tell(), amount)
read = self.file.read(amount)

self.offset = self.file.tell()
self.file.seek(pos)
assert self.offset - self.start >= 0, (self.start, self.offset)
assert self.offset <= self.start + self.length, \
(self.start, self.length, self.offset)
return read

def seek(self, pos):
assert pos >= 0, pos
self.offset = self.start + pos

def tell(self):
return self.offset - self.start

def close(self):
if self.file is None:
return
try:
with self.init_lock:
lock, refs = self.file_locks[self.file]
refs.remove(self)
if not refs:
self.file_locks.pop(self.file)
finally:
self.file = None
self.lock = None

def __enter__(self):
return self

def __exit__(self, *args, **kwargs):
self.close()
76 changes: 8 additions & 68 deletions corehq/blobs/tests/test_s3db.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ def test_put_with_double_dotted_name(self):
with self.db.get(info.identifier) as fh:
self.assertEqual(fh.read(), b"content")

def test_put_from_get_stream(self):
name = "form.xml"
old = self.db.put(StringIO(b"content"), name, "old_bucket")
with self.db.get(old.identifier, "old_bucket") as fh:
new = self.db.put(fh, name, "new_bucket")
with self.db.get(new.identifier, "new_bucket") as fh:
self.assertEqual(fh.read(), b"content")

def test_delete(self):
name = "test.4"
bucket = "doc.4"
Expand Down Expand Up @@ -217,71 +225,3 @@ def test_empty_attachment_name(self):
def test_bad_name(self, name, bucket=mod.DEFAULT_BUCKET):
with self.assertRaises(mod.BadName):
self.db.get(name, bucket)


class TestOpenFileChunk(TestCase):

@classmethod
def setUpClass(cls):
cls.tmp_context = tempdir()
tmp = cls.tmp_context.__enter__()
cls.filepath = join(tmp, "file.txt")
with open(cls.filepath, "wb") as fh:
fh.write(b"data")

@classmethod
def tearDownClass(cls):
cls.tmp_context.__exit__(None, None, None)

def get_chunk(self, normal_file, start, length):
return mod.OpenFileChunk(normal_file, start, length)


@generate_cases([
(0, 0, b"data"),
(1, 1, b"ata",),
(2, 2, b"ta",),
(3, 3, b"a"),
(4, 4, b""),
(5, 5, b""),
], TestOpenFileChunk)
def test_seek_tell_read(self, to, expect_tell, expect_read):
with open(self.filepath, "rb") as normal_file:
normal_file.seek(to)

with self.get_chunk(normal_file, 0, 4) as chunk:
self.assertIn(normal_file, mod.OpenFileChunk.file_locks)
# chunk seek/tell/read should not affect normal_file
chunk.seek(to)
self.assertEqual(chunk.tell(), expect_tell)
self.assertEqual(chunk.read(), expect_read)

self.assertEqual(normal_file.tell(), expect_tell)
self.assertEqual(normal_file.read(), expect_read)
self.assertNotIn(normal_file, mod.OpenFileChunk.file_locks)


@generate_cases([
(0, (0, b"data"), (0, b"at")),
(1, (1, b"ata"), (1, b"t")),
(2, (2, b"ta",), (2, b"")),
(3, (3, b"a"), (3, b"")),
(4, (4, b""), (4, b"")),
(5, (5, b""), (5, b"")),
], TestOpenFileChunk)
def test_seek_tell_read_in_sub_chunk(self, to, expect_norm, expect_chunk):
with open(self.filepath, "rb") as normal_file:
normal_file.seek(to)

with self.get_chunk(normal_file, 1, 2) as chunk:
# chunk seek/tell/read should not affect normal_file
chunk.seek(to)
self.assertEqual((chunk.tell(), chunk.read()), expect_chunk)

self.assertEqual((normal_file.tell(), normal_file.read()), expect_norm)


class TestReadOpenFileChunk(TestOpenFileChunk):

def get_chunk(self, normal_file, start, length):
return mod.ReadOpenFileChunk(normal_file, start, length, 4)
2 changes: 1 addition & 1 deletion corehq/blobs/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __iter__(self):
return iter(self._obj)

def __enter__(self):
return self._obj
return self

def __exit__(self, *args):
self._obj.close()
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ git+git://github.com/dimagi/pyzxcvbn.git#egg=pyzxcvbn
django-statici18n==1.1.5
django-simple-captcha==0.5.1
httpagentparser==1.7.8
boto3==1.2.3
boto3==1.4.0
simpleeval==0.8.7
laboratory==0.2.0
ConcurrentLogHandler==0.9.1
Expand Down

0 comments on commit dd85087

Please sign in to comment.