Skip to content

Commit

Permalink
Merge pull request #1782 from davidmarin/google-network-uri
Browse files Browse the repository at this point in the history
Dataproc network and subnet opts (fixes #1683)
  • Loading branch information
David Marin committed May 23, 2018
2 parents d342bd1 + 56a87ac commit ccdafb9
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 89 deletions.
145 changes: 86 additions & 59 deletions docs/guides/dataproc-opts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,49 +75,26 @@ Cluster creation and configuration
https://cloud.google.com/dataproc/dataproc-versions

.. mrjob-opt::
:config: core_instance_config
:switch: --core-instance-config
:config: network
:switch: --network
:type: :ref:`string <data-type-string>`
:set: dataproc
:default: ``None``

A dictionary of additional parameters to pass as ``config.worker_config``
when creating the cluster. Follows the format of
`InstanceGroupConfig <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#InstanceGroupConfig>`__ except that it uses
`snake_case` instead of `camel_case`.

For example, to specify 100GB of disk space on core instances, add this to
your config file:

.. code-block:: yaml
runners:
dataproc:
core_instance_config:
disk_config:
boot_disk_size_gb: 100
To set this option on the command line, pass in JSON:

.. code-block:: sh
--core-instance-config '{"disk_config": {"boot_disk_size_gb": 100}}'
This option *can* be used to set number of core instances
(``num_instances``) or instance type (``machine_type_uri``), but usually
you'll want to use :mrjob-opt:`num_core_instances` and
:mrjob-opt:`core_instance_type` along with this option.
Name or URI on network to launch cluster in on Dataproc. Cannot be used
with :mrjob-opt:`subnet`.

.. versionadded:: 0.6.3

.. mrjob-opt::
:config: master_instance_config
:switch: --master-instance-config
:config: subnet
:switch: --subnet
:type: :ref:`string <data-type-string>`
:set: dataproc
:default: ``None``

A dictionary of additional parameters to pass as ``config.master_config``
when creating the cluster. See :mrjob-opt:`core_instance_config` for
more details.
Name or URI on subnetwork to launch cluster in on Dataproc. Cannot be used
with :mrjob-opt:`network`.

.. versionadded:: 0.6.3

Expand Down Expand Up @@ -156,32 +133,6 @@ Cluster creation and configuration

.. versionadded:: 0.6.3

.. mrjob-opt::
:config: task_instance_config
:switch: --task-instance-config
:set: dataproc
:default: ``None``

A dictionary of additional parameters to pass as
``config.secondary_worker_config``
when creating the cluster. See :mrjob-opt:`task_instance_config` for
more details.

To make task instances preemptible, add this to your config file:

.. code-block:: yaml
runners:
dataproc:
task_instance_config:
is_preemptible: true
Note that this config won't be applied unless you specify at least one
task instance (either through :mrjob-opt:`num_task_instances` or
by passing ``num_instances`` to this option).

.. versionadded:: 0.6.3

.. mrjob-opt::
:config: cluster_properties
:switch: --cluster-property
Expand Down Expand Up @@ -323,6 +274,82 @@ Number and type of instances
run task instances without core instances (because there's nowhere to host
HDFS).


.. mrjob-opt::
:config: core_instance_config
:switch: --core-instance-config
:set: dataproc
:default: ``None``

A dictionary of additional parameters to pass as ``config.worker_config``
when creating the cluster. Follows the format of
`InstanceGroupConfig <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#InstanceGroupConfig>`__ except that it uses
`snake_case` instead of `camel_case`.

For example, to specify 100GB of disk space on core instances, add this to
your config file:

.. code-block:: yaml
runners:
dataproc:
core_instance_config:
disk_config:
boot_disk_size_gb: 100
To set this option on the command line, pass in JSON:

.. code-block:: sh
--core-instance-config '{"disk_config": {"boot_disk_size_gb": 100}}'
This option *can* be used to set number of core instances
(``num_instances``) or instance type (``machine_type_uri``), but usually
you'll want to use :mrjob-opt:`num_core_instances` and
:mrjob-opt:`core_instance_type` along with this option.

.. versionadded:: 0.6.3

.. mrjob-opt::
:config: master_instance_config
:switch: --master-instance-config
:set: dataproc
:default: ``None``

A dictionary of additional parameters to pass as ``config.master_config``
when creating the cluster. See :mrjob-opt:`core_instance_config` for
more details.

.. versionadded:: 0.6.3


.. mrjob-opt::
:config: task_instance_config
:switch: --task-instance-config
:set: dataproc
:default: ``None``

A dictionary of additional parameters to pass as
``config.secondary_worker_config``
when creating the cluster. See :mrjob-opt:`task_instance_config` for
more details.

To make task instances preemptible, add this to your config file:

.. code-block:: yaml
runners:
dataproc:
task_instance_config:
is_preemptible: true
Note that this config won't be applied unless you specify at least one
task instance (either through :mrjob-opt:`num_task_instances` or
by passing ``num_instances`` to this option).

.. versionadded:: 0.6.3


FS paths and options
--------------------
MRJob uses google-api-python-client to manipulate/access FS.
Expand Down
27 changes: 13 additions & 14 deletions mrjob/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from mrjob.py2 import PY2
from mrjob.py2 import string_types
from mrjob.py2 import to_unicode
from mrjob.runner import _blank_out_conflicting_opts
from mrjob.setup import UploadDirManager
from mrjob.step import StepFailedException
from mrjob.util import random_identifier
Expand Down Expand Up @@ -232,9 +233,11 @@ class DataprocJobRunner(HadoopInTheCloudJobRunner, LogInterpretationMixin):
'core_instance_config',
'gcloud_bin',
'master_instance_config',
'network',
'project_id',
'service_account',
'service_account_scopes',
'subnet',
'task_instance_config',
}

Expand Down Expand Up @@ -360,20 +363,10 @@ def _default_opts(self):
)

def _combine_opts(self, opt_list):
"""Blank out overridden *zone* and *region* opts."""
# copy opt_list so we can modify it
opt_list = [dict(opts) for opts in opt_list]

# blank out any instance_fleets/groups before the last config
# where they are set
blank_out = False
for opts in reversed(opt_list):
if blank_out:
opts['region'] = None
opts['zone'] = None
elif any(opts.get(k) is not None
for k in ('region', 'zone')):
blank_out = True
"""Blank out conflicts between *network*/*subnet* and
*region*/*zone*."""
opt_list = _blank_out_conflicting_opts(opt_list, ['region', 'zone'])
opt_list = _blank_out_conflicting_opts(opt_list, ['network', 'subnet'])

# now combine opts, with region/zone blanked out
return super(DataprocJobRunner, self)._combine_opts(opt_list)
Expand Down Expand Up @@ -1155,6 +1148,12 @@ def _cluster_create_kwargs(self):
service_account_scopes=self._opts['service_account_scopes'],
)

if self._opts['network']:
gce_cluster_config['network_uri'] = self._opts['network']

if self._opts['subnet']:
gce_cluster_config['subnetwork_uri'] = self._opts['subnet']

if self._opts['service_account']:
gce_cluster_config['service_account'] = (
self._opts['service_account'])
Expand Down
16 changes: 5 additions & 11 deletions mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
from mrjob.py2 import PY2
from mrjob.py2 import string_types
from mrjob.py2 import urlopen
from mrjob.runner import _blank_out_conflicting_opts
from mrjob.setup import UploadDirManager
from mrjob.setup import WorkingDirManager
from mrjob.step import StepFailedException
Expand Down Expand Up @@ -457,19 +458,12 @@ def _combine_opts(self, opt_list):
"""Blank out overriden *instance_fleets* and *instance_groups*
Convert image_version of 4.x and later to release_label."""
# copy opt_list so we can modify it
opt_list = [dict(opts) for opts in opt_list]

# blank out any instance_fleets/groups before the last config
# where they are set
blank_out = False
for opts in reversed(opt_list):
if blank_out:
opts['instance_fleets'] = None
opts['instance_groups'] = None
elif any(opts.get(k) is not None
for k in self._INSTANCE_OPT_NAMES):
blank_out = True
opt_list = _blank_out_conflicting_opts(
opt_list,
['instance_fleets', 'instance_groups'],
self._INSTANCE_OPT_NAMES)

# now combine opts, with instance_groups/fleets blanked out
opts = super(EMRJobRunner, self)._combine_opts(opt_list)
Expand Down
18 changes: 13 additions & 5 deletions mrjob/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,15 @@ def __call__(self, parser, namespace, value, option_string=None):
)),
],
),
network=dict(
cloud_role='launch',
switches=[
(['--network'], dict(
help=('URI of Google Compute Engine network to launch cluster'
" in. Can't be used with --subnet."),
)),
],
),
num_core_instances=dict(
cloud_role='launch',
switches=[
Expand Down Expand Up @@ -1142,15 +1151,14 @@ def __call__(self, parser, namespace, value, option_string=None):
cloud_role='launch',
switches=[
(['--subnet'], dict(
help=('ID of Amazon VPC subnet to launch cluster in. If not'
' set or empty string, cluster is launched in the normal'
' AWS cloud.'),
help=('ID of Amazon VPC subnet/URI of Google Compute Engine'
' subnetwork to launch cluster in.'),
)),
(['--subnets'], dict(
action=_SubnetsAction,
help=('Like --subnets, but with a comma-separated list, to'
help=('Like --subnet, but with a comma-separated list, to'
' specify multiple subnets in conjunction with'
' --instance-fleets'),
' --instance-fleets (EMR only)'),
)),
],
),
Expand Down
25 changes: 25 additions & 0 deletions mrjob/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,3 +1207,28 @@ def _to_str(s):
return s

return dict((_to_str(k), _to_str(v)) for k, v in env.items())


def _blank_out_conflicting_opts(opt_list, opt_names, conflicting_opts=None):
"""Utility for :py:meth:`MRJobRunner._combine_opts()`: if multiple
configs specify conflicting opts, blank them out in all but the
last config (so, for example, the command line beats the config file).
This returns a copy of *opt_list*
"""
conflicting_opts = set(conflicting_opts or ()) | set(opt_names)

# copy opt_list so we can modify it
opt_list = [dict(opts) for opts in opt_list]

# blank out region/zone before the last config where they are set
blank_out = False
for opts in reversed(opt_list):
if blank_out:
for opt_name in opt_names:
opts[opt_name] = None
elif any(opts.get(opt_name) is not None
for opt_name in conflicting_opts):
blank_out = True

return opt_list

0 comments on commit ccdafb9

Please sign in to comment.