Skip to content

Commit

Permalink
Merge pull request #1761 from davidmarin/google-part-size
Browse files Browse the repository at this point in the history
chunked uploading on dataproc runner (fixes #1404)
  • Loading branch information
David Marin committed May 1, 2018
2 parents 8f34a7b + dd1301d commit f141795
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 15 deletions.
11 changes: 9 additions & 2 deletions mrjob/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from mrjob.bin import MRJobBinRunner
from mrjob.conf import combine_dicts
from mrjob.py2 import integer_types
from mrjob.py2 import xrange
from mrjob.setup import WorkingDirManager
from mrjob.setup import parse_setup_cmd
Expand Down Expand Up @@ -61,6 +62,7 @@ class HadoopInTheCloudJobRunner(MRJobBinRunner):
'check_cluster_every',
'cloud_fs_sync_secs',
'cloud_tmp_dir',
'cloud_upload_part_size',
'cluster_id',
'core_instance_type',
'extra_cluster_params',
Expand Down Expand Up @@ -119,14 +121,13 @@ def __init__(self, **kwargs):
# store the (tunneled) URL of the job tracker/resource manager
self._ssh_tunnel_url = None



### Options ###

def _default_opts(self):
return combine_dicts(
super(HadoopInTheCloudJobRunner, self)._default_opts(),
dict(
cloud_upload_part_size=100, # 100 MB
max_mins_idle=_DEFAULT_MAX_MINS_IDLE,
# don't use a list because it makes it hard to read option
# values when running in verbose mode. See #1284
Expand All @@ -142,6 +143,12 @@ def _fix_opts(self, opts, source=None):
opts = super(HadoopInTheCloudJobRunner, self)._fix_opts(
opts, source=source)

# cloud_upload_part_size should be a number
if opts.get('cloud_upload_part_size') is not None:
if not isinstance(opts['cloud_upload_part_size'],
(integer_types, float)):
raise TypeError('cloud_upload_part_size must be a number')

# patch max_hours_idle into max_mins_idle (see #1663)
if opts.get('max_hours_idle') is not None:
log.warning(
Expand Down
11 changes: 9 additions & 2 deletions mrjob/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,14 @@ def fs(self):
self._fs = CompositeFilesystem(self._gcs_fs, LocalFilesystem())
return self._fs

def _fs_chunk_size(self):
"""Chunk size for cloud storage Blob objects. Currently
only used for uploading."""
if self._opts['cloud_upload_part_size']:
return int(self._opts['cloud_upload_part_size'] * 1024 * 1024)
else:
return None

def _get_tmpdir(self, given_tmpdir):
"""Helper for _fix_tmpdir"""
if given_tmpdir:
Expand Down Expand Up @@ -502,8 +510,7 @@ def _upload_local_files_to_fs(self):
for path, gcs_uri in self._upload_mgr.path_to_uri().items():
log.debug('uploading %s -> %s' % (path, gcs_uri))

# TODO - mtai @ davidmarin - Implement put function for other FSs
self.fs.put(path, gcs_uri)
self.fs.put(path, gcs_uri, chunk_size=self._fs_chunk_size())

self._wait_for_fs_sync()

Expand Down
2 changes: 0 additions & 2 deletions mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ class EMRJobRunner(HadoopInTheCloudJobRunner, LogInterpretationMixin):
'bootstrap_actions',
'bootstrap_spark',
'cloud_log_dir',
'cloud_upload_part_size',
'core_instance_bid_price',
'ec2_key_pair',
'ec2_key_pair_file',
Expand Down Expand Up @@ -442,7 +441,6 @@ def _default_opts(self):
check_cluster_every=30,
cleanup_on_failure=['JOB'],
cloud_fs_sync_secs=5.0,
cloud_upload_part_size=100, # 100 MB
image_version=_DEFAULT_IMAGE_VERSION,
num_core_instances=0,
num_task_instances=0,
Expand Down
17 changes: 10 additions & 7 deletions mrjob/fs/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class GCSFilesystem(Filesystem):
:py:class:`~mrjob.fs.local.LocalFilesystem`.
"""
def __init__(self, local_tmp_dir=None, credentials=None, project_id=None):
self._local_tmp_dir = local_tmp_dir
self._credentials = credentials
self._local_tmp_dir = local_tmp_dir
self._project_id = project_id

@property
Expand Down Expand Up @@ -180,13 +180,14 @@ def touchz(self, dest_uri):

self._blob(dest_uri).upload_from_string(b'')

def put(self, src_path, dest_uri):
def put(self, src_path, dest_uri, chunk_size=None):
"""Uploads a local file to a specific destination."""
old_blob = self._get_blob(dest_uri)
if old_blob:
raise IOError('File already exists: %s' % dest_uri)

self._blob(dest_uri).upload_from_filename(src_path)
self._blob(dest_uri, chunk_size=chunk_size).upload_from_filename(
src_path)

def get_all_bucket_names(self, prefix=None):
"""Yield the names of all buckets associated with this client.
Expand Down Expand Up @@ -234,15 +235,17 @@ def delete_bucket(self, bucket):
'delete_bucket() was disabled in v0.6.2. Use'
'fs.bucket(name).delete()')

def _get_blob(self, uri):
def _get_blob(self, uri, chunk_size=None):
# NOTE: chunk_size seems not to work well with downloading
bucket_name, blob_name = parse_gcs_uri(uri)
bucket = self.client.get_bucket(bucket_name)
return bucket.get_blob(blob_name)
return bucket.get_blob(blob_name, chunk_size=chunk_size)

def _blob(self, uri):
def _blob(self, uri, chunk_size=None):
# NOTE: chunk_size seems not to work well with downloading
bucket_name, blob_name = parse_gcs_uri(uri)
bucket = self.client.get_bucket(bucket_name)
return bucket.blob(blob_name)
return bucket.blob(blob_name, chunk_size=chunk_size)


# The equivalent S3 methods are in parse.py but it's cleaner to keep them
Expand Down
4 changes: 2 additions & 2 deletions tests/mock_google/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ def create(self):
def exists(self):
return self.name in self.client.mock_gcs_fs

def get_blob(self, blob_name):
def get_blob(self, blob_name, chunk_size=None):
fs = self.client.mock_gcs_fs

if self.name in fs and blob_name in fs[self.name]['blobs']:
blob = self.blob(blob_name)
blob = self.blob(blob_name, chunk_size=chunk_size)
blob._set_md5_hash()
return blob

Expand Down
43 changes: 43 additions & 0 deletions tests/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import mrjob
import mrjob.dataproc
import mrjob.fs.gcs
from mrjob.dataproc import DataprocException
from mrjob.dataproc import DataprocJobRunner
from mrjob.dataproc import _DEFAULT_CLOUD_TMP_DIR_OBJECT_TTL_DAYS
Expand All @@ -43,6 +44,7 @@
from mrjob.util import save_current_environment

from tests.mock_google import MockGoogleTestCase
from tests.mock_google.storage import MockGoogleStorageBlob
from tests.mr_hadoop_format_job import MRHadoopFormatJob
from tests.mr_no_mapper import MRNoMapper
from tests.mr_two_step_job import MRTwoStepJob
Expand Down Expand Up @@ -1420,3 +1422,44 @@ def test_error_from_gcloud_bin(self):

self.assertGreater(self.mock_Popen.call_count, 1)
self.assertFalse(runner._give_up_on_ssh_tunnel)


class CloudUploadPartSizeTestCase(MockGoogleTestCase):

def setUp(self):
super(CloudUploadPartSizeTestCase, self).setUp()

self.upload_from_string = self.start(patch(
'tests.mock_google.storage.MockGoogleStorageBlob'
'.upload_from_string',
side_effect=MockGoogleStorageBlob.upload_from_string,
autospec=True))

def test_default(self):
runner = DataprocJobRunner()

self.assertEqual(runner._fs_chunk_size(), 100 * 1024 * 1024)

def test_float(self):
runner = DataprocJobRunner(cloud_upload_part_size=0.25)

self.assertEqual(runner._fs_chunk_size(), 256 * 1024)

def test_zero(self):
runner = DataprocJobRunner(cloud_upload_part_size=0)

self.assertEqual(runner._fs_chunk_size(), None)

def test_multipart_upload(self):
job = MRWordCount(
['-r', 'dataproc', '--cloud-upload-part-size', '2'])
job.sandbox()

with job.make_runner() as runner:
runner._prepare_for_launch()

# chunk size should be set on blob object used to upload
self.assertTrue(self.upload_from_string.called)
for call_args in self.upload_from_string.call_args_list:
blob = call_args[0][0]
self.assertEqual(blob.chunk_size, 2 * 1024 * 1024)

0 comments on commit f141795

Please sign in to comment.