Skip to content

Commit

Permalink
Merge pull request #1785 from davidmarin/cloud-part-size-mb
Browse files Browse the repository at this point in the history
rename cloud_upload_part_size -> cloud_part_size_mb (fixes #1774)
  • Loading branch information
David Marin committed May 24, 2018
2 parents 7b9718e + 09d40bc commit 250dad2
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 37 deletions.
10 changes: 7 additions & 3 deletions docs/guides/emr-opts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -944,13 +944,13 @@ MRJob uses boto3 to manipulate/access S3.
This used to be called *s3_sync_wait_time*

.. mrjob-opt::
:config: cloud_upload_part_size
:switch: --cloud-upload-part-size
:config: cloud_part_size_mb
:switch: --cloud-part-size-mb
:type: integer
:set: emr
:default: 100

Upload files to S3 in parts no bigger than this many megabytes
Upload files to cloud filesystem in parts no bigger than this many megabytes
(technically, `mebibytes`_). Default is 100 MiB, as
`recommended by Amazon`_. Set to 0 to disable multipart uploading
entirely.
Expand All @@ -965,6 +965,10 @@ MRJob uses boto3 to manipulate/access S3.
.. _`requires parts to be between 5 MiB and 5 GiB`:
http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html

.. versionchanged:: 0.6.3

This used to be called *cloud_upload_part_size*.

.. versionchanged:: 0.5.4

This used to be called *s3_upload_part_size*.
Expand Down
2 changes: 1 addition & 1 deletion docs/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ made more generic, to make it easier to share code with the
*s3_log_uri* :mrjob-opt:`cloud_log_dir`
*s3_sync_wait_time* :mrjob-opt:`cloud_fs_sync_secs`
*s3_tmp_dir* :mrjob-opt:`cloud_tmp_dir`
*s3_upload_part_size* :mrjob-opt:`cloud_upload_part_size`
*s3_upload_part_size* *cloud_upload_part_size*
=============================== ======================================

The old option names and command-line switches are now deprecated but will
Expand Down
12 changes: 6 additions & 6 deletions mrjob/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ class HadoopInTheCloudJobRunner(MRJobBinRunner):
'bootstrap_python',
'check_cluster_every',
'cloud_fs_sync_secs',
'cloud_part_size_mb',
'cloud_tmp_dir',
'cloud_upload_part_size',
'cluster_id',
'core_instance_type',
'extra_cluster_params',
Expand Down Expand Up @@ -128,7 +128,7 @@ def _default_opts(self):
return combine_dicts(
super(HadoopInTheCloudJobRunner, self)._default_opts(),
dict(
cloud_upload_part_size=100, # 100 MB
cloud_part_size_mb=100, # 100 MB
max_mins_idle=_DEFAULT_MAX_MINS_IDLE,
# don't use a list because it makes it hard to read option
# values when running in verbose mode. See #1284
Expand All @@ -144,11 +144,11 @@ def _fix_opts(self, opts, source=None):
opts = super(HadoopInTheCloudJobRunner, self)._fix_opts(
opts, source=source)

# cloud_upload_part_size should be a number
if opts.get('cloud_upload_part_size') is not None:
if not isinstance(opts['cloud_upload_part_size'],
# cloud_part_size_mb should be a number
if opts.get('cloud_part_size_mb') is not None:
if not isinstance(opts['cloud_part_size_mb'],
(integer_types, float)):
raise TypeError('cloud_upload_part_size must be a number')
raise TypeError('cloud_part_size_mb must be a number')

# patch max_hours_idle into max_mins_idle (see #1663)
if opts.get('max_hours_idle') is not None:
Expand Down
4 changes: 2 additions & 2 deletions mrjob/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ def fs(self):
def _fs_chunk_size(self):
"""Chunk size for cloud storage Blob objects. Currently
only used for uploading."""
if self._opts['cloud_upload_part_size']:
return int(self._opts['cloud_upload_part_size'] * 1024 * 1024)
if self._opts['cloud_part_size_mb']:
return int(self._opts['cloud_part_size_mb'] * 1024 * 1024)
else:
return None

Expand Down
2 changes: 1 addition & 1 deletion mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ def _upload_contents(self, s3_uri, path):

def _get_upload_part_size(self):
# part size is in MB, as the minimum is 5 MB
return int((self._opts['cloud_upload_part_size'] or 0) * 1024 * 1024)
return int((self._opts['cloud_part_size_mb'] or 0) * 1024 * 1024)

def _ssh_tunnel_config(self):
"""Look up AMI version, and return a dict with the following keys:
Expand Down
12 changes: 7 additions & 5 deletions mrjob/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,13 +466,15 @@ def __call__(self, parser, namespace, value, option_string=None):
)),
],
),
cloud_upload_part_size=dict(
cloud_part_size_mb=dict(
deprecated_aliases=['cloud_upload_part_size'],
cloud_role='launch',
switches=[
(['--cloud-upload-part-size'], dict(
help=('Upload files to S3 in parts no bigger than this many'
' megabytes. Default is 100 MiB. Set to 0 to disable'
' multipart uploading entirely.'),
(['--cloud-part-size-mb'], dict(
deprecated_aliases=['--cloud-upload-part-size'],
help=('Upload files to cloud FS in parts no bigger than this'
' many megabytes. Default is 100 MiB. Set to 0 to'
' disable multipart uploading entirely.'),
type=float,
)),
],
Expand Down
23 changes: 12 additions & 11 deletions mrjob/tools/emr/create_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@
the default is 5.0 to be safe.
--cloud-log-dir CLOUD_LOG_DIR
URI on remote FS to write logs into
--cloud-part-size-mb CLOUD_PART_SIZE_MB
Upload files to cloud FS in parts no bigger than this
many megabytes. Default is 100 MiB. Set to 0 to
disable multipart uploading entirely.
--cloud-upload-part-size CLOUD_PART_SIZE_MB
Deprecated alias for --cloud-part-size-mb
--cloud-tmp-dir CLOUD_TMP_DIR
URI on remote FS to use as our temp directory.
--cloud-upload-part-size CLOUD_UPLOAD_PART_SIZE
Upload files to S3 in parts no bigger than this many
megabytes. Default is 100 MiB. Set to 0 to disable
multipart uploading entirely.
-c CONF_PATHS, --conf-path CONF_PATHS
Path to alternate mrjob.conf file to read from
--no-conf Don't load mrjob.conf even if it's available
Expand Down Expand Up @@ -119,8 +121,8 @@
http://docs.aws.amazon.com/cli/latest/reference/emr
/create-cluster.html
--instance-groups INSTANCE_GROUPS
detailed JSON list of instance configs, including EBS
configuration. See docs for --instance-groups at
detailed JSON list of EMR instance configs, including
EBS configuration. See docs for --instance-groups at
http://docs.aws.amazon.com/cli/latest/reference/emr
/create-cluster.html
--instance-type INSTANCE_TYPE
Expand Down Expand Up @@ -166,12 +168,11 @@
-us-west-1.amazonaws.com). You usually shouldn't set
this; by default mrjob will choose the correct
endpoint for each S3 bucket based on its location.
--subnet SUBNET ID of Amazon VPC subnet to launch cluster in. If not
set or empty string, cluster is launched in the normal
AWS cloud.
--subnets SUBNET Like --subnets, but with a comma-separated list, to
--subnet SUBNET ID of Amazon VPC subnet/URI of Google Compute Engine
subnetwork to launch cluster in.
--subnets SUBNET Like --subnet, but with a comma-separated list, to
specify multiple subnets in conjunction with
--instance-fleets
--instance-fleets (EMR only)
--tag TAGS Metadata tags to apply to the EMR cluster; should take
the form KEY=VALUE. You can use --tag multiple times
--task-instance-bid-price TASK_INSTANCE_BID_PRICE
Expand Down
10 changes: 5 additions & 5 deletions tests/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2009,10 +2009,10 @@ def test_end_to_end(self):
self.assertEqual(error['task_error']['message'], TRACEBACK)


class CloudUploadPartSizeTestCase(MockGoogleTestCase):
class CloudPartSizeTestCase(MockGoogleTestCase):

def setUp(self):
super(CloudUploadPartSizeTestCase, self).setUp()
super(CloudPartSizeTestCase, self).setUp()

self.upload_from_string = self.start(patch(
'tests.mock_google.storage.MockGoogleStorageBlob'
Expand All @@ -2026,18 +2026,18 @@ def test_default(self):
self.assertEqual(runner._fs_chunk_size(), 100 * 1024 * 1024)

def test_float(self):
runner = DataprocJobRunner(cloud_upload_part_size=0.25)
runner = DataprocJobRunner(cloud_part_size_mb=0.25)

self.assertEqual(runner._fs_chunk_size(), 256 * 1024)

def test_zero(self):
runner = DataprocJobRunner(cloud_upload_part_size=0)
runner = DataprocJobRunner(cloud_part_size_mb=0)

self.assertEqual(runner._fs_chunk_size(), None)

def test_multipart_upload(self):
job = MRWordCount(
['-r', 'dataproc', '--cloud-upload-part-size', '2'])
['-r', 'dataproc', '--cloud-part-size-mb', '2'])
job.sandbox()

with job.make_runner() as runner:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2882,13 +2882,13 @@ def test_default_part_size(self):
def test_custom_part_size(self):
# this test used to simulate multipart upload, but now we leave
# that to boto3
runner = EMRJobRunner(cloud_upload_part_size=50.0 / 1024 / 1024)
runner = EMRJobRunner(cloud_part_size_mb=50.0 / 1024 / 1024)

data = b'Mew' * 20
self.assert_upload_succeeds(runner, data, 50)

def test_disable_multipart(self):
runner = EMRJobRunner(cloud_upload_part_size=0)
runner = EMRJobRunner(cloud_part_size_mb=0)

data = b'Mew' * 20
self.assert_upload_succeeds(runner, data, _HUGE_PART_THRESHOLD)
Expand Down
2 changes: 1 addition & 1 deletion tests/tools/emr/test_create_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def test_runner_kwargs(self):
'bootstrap_spark': None,
'cloud_fs_sync_secs': None,
'cloud_log_dir': None,
'cloud_part_size_mb': None,
'cloud_tmp_dir': None,
'cloud_upload_part_size': None,
'conf_paths': None,
'core_instance_bid_price': None,
'core_instance_type': None,
Expand Down

0 comments on commit 250dad2

Please sign in to comment.