Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for internal copy in S3 blob db #13203

Merged
merged 1 commit into from
Sep 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 36 additions & 156 deletions corehq/blobs/s3db.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import absolute_import
import os
import weakref
from contextlib import contextmanager
from threading import Lock

Expand All @@ -8,7 +9,6 @@
from corehq.blobs.util import ClosingContextProxy

import boto3
from boto3.s3.transfer import S3Transfer, ReadFileChunk
from botocore.client import Config
from botocore.handlers import calculate_md5
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -40,20 +40,24 @@ def __init__(self, config):
def put(self, content, basename="", bucket=DEFAULT_BUCKET):
identifier = self.get_identifier(basename)
path = self.get_path(identifier, bucket)
self._s3_bucket(create=True)
osutil = OpenFileOSUtils()
transfer = S3Transfer(self.db.meta.client, osutil=osutil)
transfer.upload_file(content, self.s3_bucket_name, path)
s3_bucket = self._s3_bucket(create=True)
if isinstance(content, BlobStream) and content.blob_db is self:
source = {"Bucket": self.s3_bucket_name, "Key": content.blob_path}
s3_bucket.copy(source, path)
obj = s3_bucket.Object(path)
# unfortunately cannot get content-md5 here
return BlobInfo(identifier, obj.content_length, None)
content.seek(0)
content_md5 = get_content_md5(content)
content_length = osutil.get_file_size(content)
content_length = get_file_size(content)
s3_bucket.upload_fileobj(content, path)
return BlobInfo(identifier, content_length, "md5-" + content_md5)

def get(self, identifier, bucket=DEFAULT_BUCKET):
path = self.get_path(identifier, bucket)
with maybe_not_found(throw=NotFound(identifier, bucket)):
resp = self._s3_bucket().Object(path).get()
return ClosingContextProxy(resp["Body"]) # body stream
return BlobStream(resp["Body"], self, path)

def delete(self, *args, **kw):
identifier, bucket = self.get_args_for_delete(*args, **kw)
Expand All @@ -78,9 +82,8 @@ def delete(self, *args, **kw):
def copy_blob(self, content, info, bucket):
self._s3_bucket(create=True)
path = self.get_path(info.identifier, bucket)
osutil = OpenFileOSUtils()
transfer = S3Transfer(self.db.meta.client, osutil=osutil)
transfer.upload_file(content, self.s3_bucket_name, path)
self._s3_bucket().upload_fileobj(content, path)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notable difference in new upload_fileobj implementation in boto3 (v1.4.0): boto/s3transfer#52



def _s3_bucket(self, create=False):
if create and not self._s3_bucket_exists:
Expand All @@ -99,6 +102,18 @@ def get_path(self, identifier=None, bucket=DEFAULT_BUCKET):
return safejoin(bucket, identifier)


class BlobStream(ClosingContextProxy):

def __init__(self, stream, blob_db, blob_path):
super(BlobStream, self).__init__(stream)
self._blob_db = weakref.ref(blob_db)
self.blob_path = blob_path

@property
def blob_db(self):
return self._blob_db()


def safepath(path):
if (path.startswith(("/", ".")) or
"/../" in path or
Expand All @@ -125,6 +140,17 @@ def get_content_md5(content):
return params["headers"]["Content-MD5"]


def get_file_size(fileobj):
if not hasattr(fileobj, 'fileno'):
pos = fileobj.tell()
try:
fileobj.seek(0, os.SEEK_END)
return fileobj.tell()
finally:
fileobj.seek(pos)
return os.fstat(fileobj.fileno()).st_size


@contextmanager
def maybe_not_found(throw=None):
try:
Expand All @@ -134,149 +160,3 @@ def maybe_not_found(throw=None):
raise
if throw is not None:
raise throw


class OpenFileOSUtils(object):

def get_file_size(self, fileobj):
if not hasattr(fileobj, 'fileno'):
pos = fileobj.tell()
try:
fileobj.seek(0, os.SEEK_END)
return fileobj.tell()
finally:
fileobj.seek(pos)
return os.fstat(fileobj.fileno()).st_size

def open_file_chunk_reader(self, fileobj, start_byte, size, callback):
full_size = self.get_file_size(fileobj)
return ReadOpenFileChunk(fileobj, start_byte, size, full_size,
callback, enable_callback=False)

def open(self, filename, mode):
raise NotImplementedError

def remove_file(self, filename):
raise NotImplementedError

def rename_file(self, current_filename, new_filename):
raise NotImplementedError


class ReadOpenFileChunk(ReadFileChunk):
"""Wrapper for OpenFileChunk that implements ReadFileChunk interface
"""

def __init__(self, fileobj, start_byte, chunk_size, full_file_size, *args, **kw):

class FakeFile:

def seek(self, pos):
pass

length = min(chunk_size, full_file_size - start_byte)
self._chunk = OpenFileChunk(fileobj, start_byte, length)
super(ReadOpenFileChunk, self).__init__(
FakeFile(), start_byte, chunk_size, full_file_size, *args, **kw)
assert self._size == length, (self._size, length)

def __repr__(self):
return ("<ReadOpenFileChunk {} offset={} length={}>".format(
self._chunk.file,
self._start_byte,
self._size,
))

def read(self, amount=None):
data = self._chunk.read(amount)
if self._callback is not None and self._callback_enabled:
self._callback(len(data))
return data

def seek(self, where):
old_pos = self._chunk.tell()
self._chunk.seek(where)
if self._callback is not None and self._callback_enabled:
# To also rewind the callback() for an accurate progress report
self._callback(where - old_pos)

def tell(self):
return self._chunk.tell()

def close(self):
self._chunk.close()
self._chunk = None

def __enter__(self):
return self

def __exit__(self, *args, **kwargs):
self.close()


class OpenFileChunk(object):
"""A wrapper for reading from a file-like object from multiple threads

Each thread reading from the file-like object should have its own
private instance of this class.
"""

init_lock = Lock()
file_locks = {}

def __init__(self, fileobj, start_byte, length):
with self.init_lock:
try:
lock, refs = self.file_locks[fileobj]
except KeyError:
lock, refs = self.file_locks[fileobj] = (Lock(), set())
refs.add(self)
self.lock = lock
self.file = fileobj
self.start = self.offset = start_byte
self.length = length

def read(self, amount=None):
if self.offset >= self.start + self.length:
return b""
with self.lock:
pos = self.file.tell()
self.file.seek(self.offset)

if amount is None:
amount = self.length
amount = min(self.length - self.tell(), amount)
read = self.file.read(amount)

self.offset = self.file.tell()
self.file.seek(pos)
assert self.offset - self.start >= 0, (self.start, self.offset)
assert self.offset <= self.start + self.length, \
(self.start, self.length, self.offset)
return read

def seek(self, pos):
assert pos >= 0, pos
self.offset = self.start + pos

def tell(self):
return self.offset - self.start

def close(self):
if self.file is None:
return
try:
with self.init_lock:
lock, refs = self.file_locks[self.file]
refs.remove(self)
if not refs:
self.file_locks.pop(self.file)
finally:
self.file = None
self.lock = None

def __enter__(self):
return self

def __exit__(self, *args, **kwargs):
self.close()
76 changes: 8 additions & 68 deletions corehq/blobs/tests/test_s3db.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ def test_put_with_double_dotted_name(self):
with self.db.get(info.identifier) as fh:
self.assertEqual(fh.read(), b"content")

def test_put_from_get_stream(self):
name = "form.xml"
old = self.db.put(StringIO(b"content"), name, "old_bucket")
with self.db.get(old.identifier, "old_bucket") as fh:
new = self.db.put(fh, name, "new_bucket")
with self.db.get(new.identifier, "new_bucket") as fh:
self.assertEqual(fh.read(), b"content")

def test_delete(self):
name = "test.4"
bucket = "doc.4"
Expand Down Expand Up @@ -217,71 +225,3 @@ def test_empty_attachment_name(self):
def test_bad_name(self, name, bucket=mod.DEFAULT_BUCKET):
with self.assertRaises(mod.BadName):
self.db.get(name, bucket)


class TestOpenFileChunk(TestCase):

@classmethod
def setUpClass(cls):
cls.tmp_context = tempdir()
tmp = cls.tmp_context.__enter__()
cls.filepath = join(tmp, "file.txt")
with open(cls.filepath, "wb") as fh:
fh.write(b"data")

@classmethod
def tearDownClass(cls):
cls.tmp_context.__exit__(None, None, None)

def get_chunk(self, normal_file, start, length):
return mod.OpenFileChunk(normal_file, start, length)


@generate_cases([
(0, 0, b"data"),
(1, 1, b"ata",),
(2, 2, b"ta",),
(3, 3, b"a"),
(4, 4, b""),
(5, 5, b""),
], TestOpenFileChunk)
def test_seek_tell_read(self, to, expect_tell, expect_read):
with open(self.filepath, "rb") as normal_file:
normal_file.seek(to)

with self.get_chunk(normal_file, 0, 4) as chunk:
self.assertIn(normal_file, mod.OpenFileChunk.file_locks)
# chunk seek/tell/read should not affect normal_file
chunk.seek(to)
self.assertEqual(chunk.tell(), expect_tell)
self.assertEqual(chunk.read(), expect_read)

self.assertEqual(normal_file.tell(), expect_tell)
self.assertEqual(normal_file.read(), expect_read)
self.assertNotIn(normal_file, mod.OpenFileChunk.file_locks)


@generate_cases([
(0, (0, b"data"), (0, b"at")),
(1, (1, b"ata"), (1, b"t")),
(2, (2, b"ta",), (2, b"")),
(3, (3, b"a"), (3, b"")),
(4, (4, b""), (4, b"")),
(5, (5, b""), (5, b"")),
], TestOpenFileChunk)
def test_seek_tell_read_in_sub_chunk(self, to, expect_norm, expect_chunk):
with open(self.filepath, "rb") as normal_file:
normal_file.seek(to)

with self.get_chunk(normal_file, 1, 2) as chunk:
# chunk seek/tell/read should not affect normal_file
chunk.seek(to)
self.assertEqual((chunk.tell(), chunk.read()), expect_chunk)

self.assertEqual((normal_file.tell(), normal_file.read()), expect_norm)


class TestReadOpenFileChunk(TestOpenFileChunk):

def get_chunk(self, normal_file, start, length):
return mod.ReadOpenFileChunk(normal_file, start, length, 4)
2 changes: 1 addition & 1 deletion corehq/blobs/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __iter__(self):
return iter(self._obj)

def __enter__(self):
return self._obj
return self

def __exit__(self, *args):
self._obj.close()
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ git+git://github.com/dimagi/pyzxcvbn.git#egg=pyzxcvbn
django-statici18n==1.1.5
django-simple-captcha==0.5.1
httpagentparser==1.7.8
boto3==1.2.3
boto3==1.4.0
simpleeval==0.8.7
laboratory==0.2.0
ConcurrentLogHandler==0.9.1
Expand Down