Skip to content

Commit

Permalink
Merge pull request #986 from davidmarin/multipart-upload-to-s3
Browse files Browse the repository at this point in the history
Multipart upload to S3
  • Loading branch information
David Marin committed Apr 2, 2015
2 parents ab89658 + 688a1ac commit efe6f47
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 1 deletion.
22 changes: 22 additions & 0 deletions docs/guides/emr-opts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,28 @@ which is set to True if boto.Version >= '2.25.0'
How long to wait for S3 to reach eventual consistency. This is typically
less than a second (zero in U.S. West), but the default is 5.0 to be safe.

.. mrjob-opt::
:config: s3_upload_part_size
:switch: --s3-upload-part-size
:type: integer
:set: emr
:default: 100

Upload files to S3 in parts no bigger than this many megabytes
(technically, `mebibytes`_). Default is 100 MiB, as
`recommended by Amazon`_. Set to 0 to disable multipart uploading
entirely.

Currently, Amazon `requires parts to be between 5 MiB and 5 GiB`_.
mrjob does not enforce these limits.

.. _`mebibytes`:
http://en.wikipedia.org/wiki/Mebibyte
.. _`recommended by Amazon`:
http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
.. _`requires parts to be between 5 MiB and 5 GiB`:
http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html

SSH access and tunneling
------------------------

Expand Down
5 changes: 5 additions & 0 deletions docs/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ this job flow.
You can now use ``--check-input-paths`` and ``--no-check-input-paths`` on EMR
as well as Hadoop.

Files larger than 100MB will be uploaded to S3 using multipart upload if you
have the `filechunkio` module installed. You can change the limit/part size
with the ``--s3-upload-part-size`` option, or disable multipart upload by
setting this option to 0.

You can now require protocols to be strict from :ref:`mrjob.conf <mrjob.conf>`;
this means unencodable input/output will result in an exception rather
than the job quietly incrementing a counter. It is recommended you set this
Expand Down
68 changes: 67 additions & 1 deletion mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@
# inside hadoop streaming
boto = None

try:
import filechunkio
except ImportError:
# that's cool; filechunkio is only for multipart uploading
filechunkio = None

# need this to retry on SSL errors (see Issue #621)
try:
from boto.https_connection import InvalidCertificateException
Expand Down Expand Up @@ -355,6 +361,7 @@ class EMRRunnerOptionStore(RunnerOptionStore):
's3_log_uri',
's3_scratch_uri',
's3_sync_wait_time',
's3_upload_part_size',
'ssh_bin',
'ssh_bind_ports',
'ssh_tunnel_is_open',
Expand Down Expand Up @@ -398,6 +405,7 @@ def default_options(self):
'num_ec2_instances': 1,
'num_ec2_task_instances': 0,
's3_sync_wait_time': 5.0,
's3_upload_part_size': 100, # 100 MB
'sh_bin': ['/bin/sh', '-ex'],
'ssh_bin': ['ssh'],
'ssh_bind_ports': range(40001, 40841),
Expand Down Expand Up @@ -889,9 +897,67 @@ def _upload_local_files_to_s3(self):

for path, s3_uri in self._upload_mgr.path_to_uri().iteritems():
log.debug('uploading %s -> %s' % (path, s3_uri))
s3_key = self.make_s3_key(s3_uri, s3_conn)
self._upload_contents(s3_uri, s3_conn, path)

def _upload_contents(self, s3_uri, s3_conn, path):
"""Uploads the file at the given path to S3, possibly using
multipart upload."""
fsize = os.stat(path).st_size
part_size = self._get_upload_part_size()

s3_key = self.make_s3_key(s3_uri, s3_conn)

if self._should_use_multipart_upload(fsize, part_size, path):
log.debug("Starting multipart upload of %s" % (path,))
mpul = s3_key.bucket.initiate_multipart_upload(s3_key.name)

try:
self._upload_parts(mpul, path, fsize, part_size)
except:
mpul.cancel_upload()
raise

mpul.complete_upload()
log.debug("Completed multipart upload of %s to %s" % (
path, s3_key.name))
else:
s3_key.set_contents_from_filename(path)

def _upload_parts(self, mpul, path, fsize, part_size):
offsets = xrange(0, fsize, part_size)

for i, offset in enumerate(offsets):
part_num = i + 1

log.debug("uploading %d/%d of %s" % (
part_num, len(offsets), path))
chunk_bytes = min(part_size, fsize - offset)

with filechunkio.FileChunkIO(
path, 'r', offset=offset, bytes=chunk_bytes) as fp:
mpul.upload_part_from_file(fp, part_num)

def _get_upload_part_size(self):
# part size is in MB, as the minimum is 5 MB
return int((self._opts['s3_upload_part_size'] or 0) * 1024 * 1024)

def _should_use_multipart_upload(self, fsize, part_size, path):
"""Decide if we want to use multipart uploading.
path is only used to log warnings."""
if not part_size: # disabled
return False

if fsize <= part_size:
return False

if filechunkio is None:
log.warning("Can't use S3 multipart upload for %s because"
" filechunkio is not installed" % path)
return False

return True

def setup_ssh_tunnel_to_job_tracker(self, host):
"""setup the ssh tunnel to the job tracker, if it's not currently
running.
Expand Down
7 changes: 7 additions & 0 deletions mrjob/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,13 @@ def add_emr_opts(opt_group):
' is typically less than a second (zero in us-west) but the'
' default is 5.0 to be safe.')),

opt_group.add_option(
'--s3-upload-part-size', dest='s3_upload_part_size', default=None,
type='float',
help=('Upload files to S3 in parts no bigger than this many'
' megabytes. Default is 100 MiB. Set to 0 to disable'
' multipart uploading entirely.')),

opt_group.add_option(
'--ssh-bin', dest='ssh_bin', default=None,
help=("Name/path of ssh binary. Arguments are allowed (e.g."
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
setuptools_kwargs = {
'install_requires': [
'boto>=2.2.0',
'filechunkio',
'PyYAML',
'simplejson>=2.0.9',
],
Expand Down
54 changes: 54 additions & 0 deletions tests/mockboto.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
except ImportError:
boto = None

from mock import Mock

from mrjob.conf import combine_values
from mrjob.parse import is_s3_uri
from mrjob.parse import parse_s3_uri
Expand Down Expand Up @@ -168,6 +170,12 @@ def list(self, prefix=''):
yield MockKey(bucket=self, name=key_name,
date_to_str=to_iso8601)

def initiate_multipart_upload(self, key_name):
key = self.new_key(key_name)
return MockMultiPartUpload(key)




class MockKey(object):
"""Mock out boto.s3.Key"""
Expand All @@ -189,6 +197,9 @@ def read_mock_data(self):
else:
raise boto.exception.S3ResponseError(404, 'Not Found')

def mock_multipart_upload_was_cancelled(self):
return isinstance(self.read_mock_data(), MultiPartUploadCancelled)

def write_mock_data(self, data):
if self.name in self.bucket.mock_state():
self.bucket.mock_state()[self.name] = (data, datetime.utcnow())
Expand Down Expand Up @@ -265,6 +276,49 @@ def size(self):
return len(self.get_contents_as_string())


class MultiPartUploadCancelled(str):
pass

class MockMultiPartUpload(object):

def __init__(self, key):
"""Mock out boto.s3.MultiPartUpload
Note that real MultiPartUpload objects don't actually know which key
they're associated with. It's just simpler this way.
"""
self.key = key
self.parts = {}

def upload_part_from_file(self, fp, part_num):
part_num = int(part_num) # boto leaves this to a format string

# this check is actually in boto
if part_num < 1:
raise ValueError('Part numbers must be greater than zero')

self.parts[part_num] = fp.read()

def complete_upload(self):
data = ''

if self.parts:
num_parts = max(self.parts)
for part_num in xrange(1, num_parts + 1):
# S3 might be more graceful about missing parts. But we
# certainly don't want this to slip past testing
data += self.parts[part_num]

self.key.set_contents_from_string(data)

def cancel_upload(self):
self.parts = None # should break any further calls

# record that multipart upload was cancelled
cancelled = MultiPartUploadCancelled(self.key.get_contents_as_string())
self.key.set_contents_from_string(cancelled)


### EMR ###

def to_iso8601(when):
Expand Down
91 changes: 91 additions & 0 deletions tests/test_emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from mrjob.emr import EMRJobRunner
from mrjob.emr import attempt_to_acquire_lock
from mrjob.emr import describe_all_job_flows
from mrjob.emr import filechunkio
from mrjob.emr import _MAX_HOURS_IDLE_BOOTSTRAP_ACTION_PATH
from mrjob.emr import _lock_acquire_step_1
from mrjob.emr import _lock_acquire_step_2
Expand All @@ -57,8 +58,10 @@
from mrjob.util import tar_and_gzip

from tests.mockboto import DEFAULT_MAX_JOB_FLOWS_RETURNED
from tests.mockboto import MockBucket
from tests.mockboto import MockEmrConnection
from tests.mockboto import MockEmrObject
from tests.mockboto import MockMultiPartUpload
from tests.mockboto import MockS3Connection
from tests.mockboto import add_mock_s3_data
from tests.mockboto import to_iso8601
Expand Down Expand Up @@ -3479,3 +3482,91 @@ def test_switch(self):

with mr_job.make_runner() as runner:
self.assertEqual(runner._action_on_failure, 'CONTINUE')


class MultiPartUploadTestCase(MockEMRAndS3TestCase):

PART_SIZE_IN_MB = 50.0 / 1024 / 1024
TEST_BUCKET = 'walrus'
TEST_FILENAME = 'data.dat'
TEST_S3_URI = 's3://%s/%s' % (TEST_BUCKET, TEST_FILENAME)

def setUp(self):
super(MultiPartUploadTestCase, self).setUp()
# create the walrus bucket
self.add_mock_s3_data({self.TEST_BUCKET: {}})

def upload_data(self, runner, data):
"""Upload some bytes to S3"""
data_path = os.path.join(self.tmp_dir, self.TEST_FILENAME)
with open(data_path, 'w') as fp:
fp.write(data)

s3_conn = runner.make_s3_conn()

runner._upload_contents(self.TEST_S3_URI, s3_conn, data_path)

def assert_upload_succeeds(self, runner, data, expect_multipart):
"""Write the data to a temp file, and then upload it to (mock) S3,
checking that the data successfully uploaded."""
with patch.object(runner, '_upload_parts', wraps=runner._upload_parts):
self.upload_data(runner, data)

s3_key = runner.get_s3_key(self.TEST_S3_URI)
self.assertEqual(s3_key.get_contents_as_string(), data)
self.assertEqual(runner._upload_parts.called, expect_multipart)

def test_small_file(self):
runner = EMRJobRunner()
data = 'beavers mate for life'

self.assert_upload_succeeds(runner, data, expect_multipart=False)

@unittest.skipIf(filechunkio is None, 'need filechunkio')
def test_large_file(self):
# Real S3 has a minimum chunk size of 5MB, but I'd rather not
# store that in memory (in our mock S3 filesystem)
runner = EMRJobRunner(s3_upload_part_size=self.PART_SIZE_IN_MB)
self.assertEqual(runner._get_upload_part_size(), 50)

data = 'Mew' * 20
self.assert_upload_succeeds(runner, data, expect_multipart=True)

def test_file_size_equals_part_size(self):
runner = EMRJobRunner(s3_upload_part_size=self.PART_SIZE_IN_MB)
self.assertEqual(runner._get_upload_part_size(), 50)

data = 'o' * 50
self.assert_upload_succeeds(runner, data, expect_multipart=False)

def test_disable_multipart(self):
runner = EMRJobRunner(s3_upload_part_size=0)
self.assertEqual(runner._get_upload_part_size(), 0)

data = 'Mew' * 20
self.assert_upload_succeeds(runner, data, expect_multipart=False)

def test_no_filechunkio(self):
with patch.object(mrjob.emr, 'filechunkio', None):
runner = EMRJobRunner(s3_upload_part_size=self.PART_SIZE_IN_MB)
self.assertEqual(runner._get_upload_part_size(), 50)

data = 'Mew' * 20
with logger_disabled('mrjob.emr'):
self.assert_upload_succeeds(runner, data,
expect_multipart=False)

def test_exception_while_uploading_large_file(self):

runner = EMRJobRunner(s3_upload_part_size=self.PART_SIZE_IN_MB)
self.assertEqual(runner._get_upload_part_size(), 50)

data = 'Mew' * 20

saved_mpul = []

with patch.object(runner, '_upload_parts', side_effect=IOError):
self.assertRaises(IOError, self.upload_data, runner, data)

s3_key = runner.get_s3_key(self.TEST_S3_URI)
self.assertTrue(s3_key.mock_multipart_upload_was_cancelled())
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ envlist = py26, py27, pypy

[testenv]
deps =
filechunkio
mock
pytest
pytest-sugar
Expand Down

0 comments on commit efe6f47

Please sign in to comment.