Skip to content

Commit

Permalink
Merge pull request #1776 from davidmarin/google-instance-configs
Browse files Browse the repository at this point in the history
Dataproc instance config opts (fixes #1681)
  • Loading branch information
David Marin committed May 18, 2018
2 parents 1368fae + 998458f commit d83c6cf
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 5 deletions.
73 changes: 73 additions & 0 deletions docs/guides/dataproc-opts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,79 @@ Cluster creation and configuration
.. _`the Dataproc docs on specifying the Dataproc version`:
https://cloud.google.com/dataproc/dataproc-versions

.. mrjob-opt::
:config: core_instance_config
:switch: --core-instance-config
:set: dataproc
:default: ``None``

A dictionary of additional parameters to pass as ``config.worker_config``
when creating the cluster. Follows the format of
`InstanceGroupConfig <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#InstanceGroupConfig>`__ except that it uses
`snake_case` instead of `camel_case`.

For example, to specify 100GB of disk space on core instances, add this to
your config file:

.. code-block:: yaml
runners:
dataproc:
core_instance_config:
disk_config:
boot_disk_size_gb: 100
To set this option on the command line, pass in JSON:

.. code-block:: sh
--core-instance-config '{"disk_config": {"boot_disk_size_gb": 100}}'
This option *can* be used to set number of core instances
(``num_instances``) or instance type (``machine_type_uri``), but usually
you'll want to use :mrjob-opt:`num_core_instances` and
:mrjob-opt:`core_instance_type` along with this option.

.. versionadded:: 0.6.3

.. mrjob-opt::
:config: master_instance_config
:switch: --master-instance-config
:set: dataproc
:default: ``None``

A dictionary of additional parameters to pass as ``config.master_config``
when creating the cluster. See :mrjob-opt:`core_instance_config` for
more details.

.. versionadded:: 0.6.3

.. mrjob-opt::
:config: task_instance_config
:switch: --task-instance-config
:set: dataproc
:default: ``None``

A dictionary of additional parameters to pass as
``config.secondary_worker_config``
when creating the cluster. See :mrjob-opt:`task_instance_config` for
more details.

To make task instances preemptible, add this to your config file:

.. code-block:: yaml
runners:
dataproc:
task_instance_config:
is_preemptible: true
Note that this config won't be applied unless you specify at least one
task instance (either through :mrjob-opt:`num_task_instances` or
by passing ``num_instances`` to this option).

.. versionadded:: 0.6.3

Bootstrapping
-------------

Expand Down
28 changes: 27 additions & 1 deletion mrjob/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,11 @@ class DataprocJobRunner(HadoopInTheCloudJobRunner, LogInterpretationMixin):
alias = 'dataproc'

OPT_NAMES = HadoopInTheCloudJobRunner.OPT_NAMES | {
'core_instance_config',
'gcloud_bin',
'master_instance_config',
'project_id',
'task_instance_config',
}

def __init__(self, **kwargs):
Expand Down Expand Up @@ -1162,13 +1165,17 @@ def _cluster_create_kwargs(self):
project=self._project_id, zone=self._opts['zone'],
count=1, instance_type=self._opts['master_instance_type'],
)
if self._opts['master_instance_config']:
master_conf.update(self._opts['master_instance_config'])

# Compute + storage
worker_conf = _gcp_instance_group_config(
project=self._project_id, zone=self._opts['zone'],
count=self._opts['num_core_instances'],
instance_type=self._opts['core_instance_type']
)
if self._opts['core_instance_config']:
worker_conf.update(self._opts['core_instance_config'])

# Compute ONLY
secondary_worker_conf = _gcp_instance_group_config(
Expand All @@ -1177,17 +1184,24 @@ def _cluster_create_kwargs(self):
instance_type=self._opts['task_instance_type'],
is_preemptible=True
)
if self._opts['task_instance_config']:
secondary_worker_conf.update(self._opts['task_instance_config'])

cluster_config['master_config'] = master_conf
cluster_config['worker_config'] = worker_conf
if self._opts['num_task_instances']:
if secondary_worker_conf.get('num_instances'):
cluster_config['secondary_worker_config'] = secondary_worker_conf

# See - https://cloud.google.com/dataproc/dataproc-versions
if self._opts['image_version']:
cluster_config['software_config'] = dict(
image_version=self._opts['image_version'])

# in Python 2, dict keys loaded from JSON will be unicode, which
# the Google protobuf objects don't like
if PY2:
cluster_config = _clean_json_dict_keys(cluster_config)

kwargs = dict(project_id=self._project_id,
cluster_name=self._cluster_id,
config=cluster_config)
Expand Down Expand Up @@ -1382,3 +1396,15 @@ def _fix_traceback(s):
s = _TRACEBACK_EXCEPTION_RE.sub(lambda m: '\n' + m.group(0), s)

return s


def _clean_json_dict_keys(x):
"""Cast any dictionary keys in the given JSON object to str.
We can assume that x isn't a recursive data structure, and that
this is only called in Python 2."""
if isinstance(x, dict):
return {str(k): _clean_json_dict_keys(v) for k, v in x.items()}
elif isinstance(x, list):
return [_clean_json_dict_keys(item) for item in x]
else:
return x
48 changes: 46 additions & 2 deletions mrjob/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,21 @@ def __call__(self, parser, namespace, value, option_string=None):
)),
],
),
core_instance_config=dict(
cloud_role='launch',
switches=[
(['--core-instance-config'], dict(
action=_JSONAction,
help=('detailed JSON dict of configs for the core'
' (worker) instances'
' on Dataproc, including disk config. For format, see'
' https://cloud.google.com/dataproc/docs/reference/rest'
'/v1/projects.regions.clusters#InstanceGroupConfig'
' (except that fields in your JSON should use'
' snake_case, not camelCase).')
)),
],
),
core_instance_bid_price=dict(
cloud_role='launch',
switches=[
Expand Down Expand Up @@ -699,7 +714,7 @@ def __call__(self, parser, namespace, value, option_string=None):
switches=[
(['--instance-groups'], dict(
action=_JSONAction,
help=('detailed JSON list of instance configs, including'
help=('detailed JSON list of EMR instance configs, including'
' EBS configuration. See docs for --instance-groups'
' at http://docs.aws.amazon.com/cli/latest/reference'
'/emr/create-cluster.html'),
Expand Down Expand Up @@ -783,6 +798,20 @@ def __call__(self, parser, namespace, value, option_string=None):
)),
],
),
master_instance_config=dict(
cloud_role='launch',
switches=[
(['--master-instance-config'], dict(
action=_JSONAction,
help=('detailed JSON dict of configs for the master instance'
' on Dataproc including disk config. For format, see'
' https://cloud.google.com/dataproc/docs/reference/rest'
'/v1/projects.regions.clusters#InstanceGroupConfig'
' (except that fields in your JSON should use'
' snake_case, not camelCase).')
)),
],
),
master_instance_type=dict(
cloud_role='launch',
switches=[
Expand Down Expand Up @@ -1110,6 +1139,21 @@ def __call__(self, parser, namespace, value, option_string=None):
)),
],
),
task_instance_config=dict(
cloud_role='launch',
switches=[
(['--task-instance-config'], dict(
action=_JSONAction,
help=('detailed JSON dict of configs for the task'
' (secondary worker) instances'
' on Dataproc including disk config. For format, see'
' https://cloud.google.com/dataproc/docs/reference/rest'
'/v1/projects.regions.clusters#InstanceGroupConfig'
' (except that fields in your JSON should use'
' snake_case, not camelCase).')
)),
],
),
task_instance_type=dict(
cloud_role='launch',
switches=[
Expand Down Expand Up @@ -1214,7 +1258,7 @@ def _filter_by_role(opt_names, *cloud_roles):
return {
opt_name
for opt_name, conf in _RUNNER_OPTS.items()
if conf.get('cloud_role') in cloud_roles
if opt_name in opt_names and conf.get('cloud_role') in cloud_roles
}


Expand Down
13 changes: 13 additions & 0 deletions tests/mock_google/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from google.api_core.exceptions import NotFound
from google.cloud.dataproc_v1.types import Cluster
from google.cloud.dataproc_v1.types import ClusterStatus
from google.cloud.dataproc_v1.types import DiskConfig
from google.cloud.dataproc_v1.types import Job
from google.cloud.dataproc_v1.types import JobStatus

Expand All @@ -30,6 +31,8 @@
from mrjob.dataproc import _job_state_name
from mrjob.util import random_identifier

# default boot disk size set by the API
_DEFAULT_DISK_SIZE_GB = 500

# convert strings (e.g. 'RUNNING') to enum values

Expand Down Expand Up @@ -104,6 +107,16 @@ def create_cluster(self, project_id, region, cluster):
if not cluster.cluster_name:
raise InvalidArgument('Cluster name is required')

# add in default disk config
for x in ('master', 'worker', 'secondary_worker'):
field = x + '_config'
conf = getattr(cluster.config, field, None)
if conf and str(conf): # empty DiskConfigs are still true-ish
if not conf.disk_config:
conf.disk_config = DiskConfig()
if not conf.disk_config.boot_disk_size_gb:
conf.disk_config.boot_disk_size_gb = _DEFAULT_DISK_SIZE_GB

# initialize cluster status
cluster.status.state = _cluster_state_value('CREATING')

Expand Down
81 changes: 80 additions & 1 deletion tests/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ def test_cross_region_explicit_tmp_uri(self):
self.assertEqual(runner._region(), US_EAST_GCE_REGION)


class GCEInstanceGroupTestCase(MockGoogleTestCase):
class InstanceTypeAndNumberTestCase(MockGoogleTestCase):

maxDiff = None

Expand Down Expand Up @@ -805,6 +805,85 @@ def test_task_type_defaults_to_core_type(self):
task=(20, HIGHCPU_GCE_INSTANCE))


class InstanceConfigTestCase(MockGoogleTestCase):
# test the *_instance_config options

def _get_cluster_config(self, *args):
job = MRWordCount(['-r', 'dataproc'] + list(args))
job.sandbox()

with job.make_runner() as runner:
runner._launch()
return runner._get_cluster(runner._cluster_id).config

def test_default(self):
conf = self._get_cluster_config()

self.assertEqual(
conf.master_config.disk_config.boot_disk_size_gb, 500)
self.assertEqual(
conf.worker_config.disk_config.boot_disk_size_gb, 500)

def test_set_disk_config(self):
conf = self._get_cluster_config(
'--master-instance-config',
'{"disk_config": {"boot_disk_size_gb": 100}}',
'--core-instance-config',
'{"disk_config": {"boot_disk_size_gb": 200, "num_local_ssds": 2}}')

self.assertEqual(
conf.master_config.disk_config.boot_disk_size_gb, 100)
self.assertFalse(
conf.master_config.disk_config.num_local_ssds)
self.assertEqual(
conf.worker_config.disk_config.boot_disk_size_gb, 200)
self.assertEqual(
conf.worker_config.disk_config.num_local_ssds, 2)

def test_can_override_num_instances(self):
conf = self._get_cluster_config(
'--core-instance-config', '{"num_instances": 10}')

self.assertEqual(
conf.worker_config.num_instances, 10)

def test_set_task_config(self):
conf = self._get_cluster_config(
'--num-task-instances', '3',
'--task-instance-config',
'{"disk_config": {"boot_disk_size_gb": 300}}')

self.assertEqual(
conf.secondary_worker_config.disk_config.boot_disk_size_gb, 300)

def test_dont_set_task_config_if_no_task_instances(self):
conf = self._get_cluster_config(
'--task-instance-config',
'{"disk_config": {"boot_disk_size_gb": 300}}')

self.assertFalse(
conf.secondary_worker_config.disk_config.boot_disk_size_gb)

def test_can_set_num_instances_through_task_config(self):
conf = self._get_cluster_config(
'--task-instance-config',
'{"disk_config": {"boot_disk_size_gb": 300}, "num_instances": 3}')

self.assertEqual(
conf.secondary_worker_config.num_instances, 3)
self.assertEqual(
conf.secondary_worker_config.disk_config.boot_disk_size_gb, 300)

def test_preemtible_task_instances(self):
conf = self._get_cluster_config(
'--num-task-instances', '3',
'--task-instance-config',
'{"is_preemptible": true}')

self.assertTrue(
conf.secondary_worker_config.is_preemptible)


class MasterBootstrapScriptTestCase(MockGoogleTestCase):

def test_usr_bin_env(self):
Expand Down
1 change: 0 additions & 1 deletion tests/tools/emr/test_create_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def test_runner_kwargs(self):
'cloud_tmp_dir': None,
'cloud_upload_part_size': None,
'conf_paths': None,
'num_cores': None,
'core_instance_bid_price': None,
'core_instance_type': None,
'ec2_key_pair': None,
Expand Down

0 comments on commit d83c6cf

Please sign in to comment.