Skip to content

Commit

Permalink
Merge pull request #1777 from davidmarin/google-service-account
Browse files Browse the repository at this point in the history
Added service_account and service_account_scopes opts. Fixes #1682.
  • Loading branch information
David Marin committed May 18, 2018
2 parents d83c6cf + 5a91a01 commit e258d37
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 5 deletions.
35 changes: 35 additions & 0 deletions docs/guides/dataproc-opts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,41 @@ Cluster creation and configuration

.. versionadded:: 0.6.3

.. mrjob-opt::
:config: service_account
:switch: --service-account
:set: dataproc
:default: ``None``

Optional service account to use when creating a cluster. For more
information see `Service Accounts <https://cloud.google.com/compute/docs/access/service-accounts#custom_service_accounts>`__.

.. versionadded:: 0.6.3

.. mrjob-opt::
:config: service_account_scopes
:switch: --service-account-scope
:set: dataproc
:default: (automatic)

Service account scopes to use when creating a cluster. By default,
Dataproc uses these scopes::

https://www.googleapis.com/auth/bigquery
https://www.googleapis.com/auth/bigtable.admin.table
https://www.googleapis.com/auth/bigtable.data
https://www.googleapis.com/auth/cloud-platform
https://www.googleapis.com/auth/cloud.useraccounts.readonly
https://www.googleapis.com/auth/devstorage.full_control
https://www.googleapis.com/auth/devstorage.read_write
https://www.googleapis.com/auth/logging.write

``--service-account-scope`` can only be used to add additional scopes.
If you wish to exclude some of these scopes, you can use ``!clear`` in
your config file (see :ref:`clearing-configs`).

.. versionadded:: 0.6.3

.. mrjob-opt::
:config: task_instance_config
:switch: --task-instance-config
Expand Down
12 changes: 10 additions & 2 deletions mrjob/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ class DataprocJobRunner(HadoopInTheCloudJobRunner, LogInterpretationMixin):
'gcloud_bin',
'master_instance_config',
'project_id',
'service_account',
'service_account_scopes',
'task_instance_config',
}

Expand Down Expand Up @@ -348,6 +350,8 @@ def _default_opts(self):
master_instance_type=_DEFAULT_INSTANCE_TYPE,
num_core_instances=_DATAPROC_MIN_WORKERS,
num_task_instances=0,
service_account_scopes=list(
_DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES),
sh_bin=['/bin/sh', '-ex'],
)
)
Expand Down Expand Up @@ -1144,10 +1148,14 @@ def _cluster_create_kwargs(self):
self._opts['max_mins_idle'] * 60))

gce_cluster_config = dict(
service_account_scopes=_DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES,
metadata=cluster_metadata
metadata=cluster_metadata,
service_account_scopes=self._opts['service_account_scopes'],
)

if self._opts['service_account']:
gce_cluster_config['service_account'] = (
self._opts['service_account'])

if self._opts['zone']:
gce_cluster_config['zone_uri'] = _gcp_zone_uri(
project=self._project_id, zone=self._opts['zone'])
Expand Down
22 changes: 22 additions & 0 deletions mrjob/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,28 @@ def __call__(self, parser, namespace, value, option_string=None):
)),
],
),
service_account=dict(
cloud_role='launch',
switches=[
(['--service-account'], dict(
help=('Service account to use when creating a Dataproc'
' cluster. Usually takes the form'
' [account_id]@[project_id].iam.gserviceaccount.com.'
' Set to "" to use the default.'),
)),
],
),
service_account_scopes=dict(
cloud_role='launch',
combiner=combine_lists,
switches=[
(['--service-account-scope'], dict(
action='append',
help=('Additional service account scope to use when creating'
' a Dataproc cluster.'),
)),
],
),
setup=dict(
combiner=combine_lists,
switches=[
Expand Down
29 changes: 29 additions & 0 deletions tests/mock_google/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@
# default boot disk size set by the API
_DEFAULT_DISK_SIZE_GB = 500

# account scopes that are included whether you ask for them or not
# for more info, see:
# https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#GceClusterConfig # noqa
_MANDATORY_SCOPES = {
'https://www.googleapis.com/auth/cloud.useraccounts.readonly',
'https://www.googleapis.com/auth/devstorage.read_write',
'https://www.googleapis.com/auth/logging.write',
}

# account scopes that are included if you don't specify any
_DEFAULT_SCOPES = {
'https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/bigtable.admin.table',
'https://www.googleapis.com/auth/bigtable.data',
'https://www.googleapis.com/auth/devstorage.full_control',
}

# convert strings (e.g. 'RUNNING') to enum values

def _cluster_state_value(state_name):
Expand Down Expand Up @@ -117,6 +134,18 @@ def create_cluster(self, project_id, region, cluster):
if not conf.disk_config.boot_disk_size_gb:
conf.disk_config.boot_disk_size_gb = _DEFAULT_DISK_SIZE_GB

# update gce_cluster_config
gce_config = cluster.config.gce_cluster_config

# add in default scopes and sort
scopes = set(gce_config.service_account_scopes)

if not scopes:
scopes.update(_DEFAULT_SCOPES)
scopes.update(_MANDATORY_SCOPES)

gce_config.service_account_scopes[:] = sorted(scopes)

# initialize cluster status
cluster.status.state = _cluster_state_value('CREATING')

Expand Down
7 changes: 4 additions & 3 deletions tests/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ def setUp(self):
add_null_handler_to_root_logger()

if self.MRJOB_CONF_CONTENTS is not None:
patcher = mrjob_conf_patcher(self.MRJOB_CONF_CONTENTS)
patcher.start()
self.addCleanup(patcher.stop)
self.mrjob_conf_patcher = mrjob_conf_patcher(
self.MRJOB_CONF_CONTENTS)
self.mrjob_conf_patcher.start()
self.addCleanup(self.mrjob_conf_patcher.stop)


class SandboxedTestCase(EmptyMrjobConfTestCase):
Expand Down
62 changes: 62 additions & 0 deletions tests/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from mrjob.dataproc import _CONTAINER_EXECUTOR_CLASS_NAME
from mrjob.dataproc import _DEFAULT_CLOUD_TMP_DIR_OBJECT_TTL_DAYS
from mrjob.dataproc import _DEFAULT_GCE_REGION
from mrjob.dataproc import _DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES
from mrjob.dataproc import _DEFAULT_IMAGE_VERSION
from mrjob.dataproc import _HADOOP_STREAMING_JAR_URI
from mrjob.dataproc import _MAX_MINS_IDLE_BOOTSTRAP_ACTION_PATH
Expand Down Expand Up @@ -441,6 +442,64 @@ def test_availability_zone_config(self):
cluster.config.worker_config.machine_type_uri)


class GCEClusterConfigTestCase(MockGoogleTestCase):
# test service_account, service_account_scopes

def _get_gce_cluster_config(self, *args):
job = MRWordCount(['-r', 'dataproc'] + list(args))
job.sandbox()

with job.make_runner() as runner:
runner._launch()
return runner._get_cluster(
runner._cluster_id).config.gce_cluster_config

def test_default(self):
gcc = self._get_gce_cluster_config()

self.assertFalse(gcc.service_account)
self.assertEqual(set(gcc.service_account_scopes),
set(_DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES))

def test_service_account(self):
account = '12345678901-compute@developer.gserviceaccount.com'

gcc = self._get_gce_cluster_config(
'--service-account', account)

self.assertEqual(gcc.service_account, account)

def test_service_account_scopes(self):
scope1 = 'https://www.googleapis.com/auth/scope1'
scope2 = 'https://www.googleapis.com/auth/scope2'

gcc = self._get_gce_cluster_config(
'--service-account-scope', scope1,
'--service-account-scope', scope2)

self.assertGreater(
set(gcc.service_account_scopes),
set(_DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES))
self.assertIn(scope1, set(gcc.service_account_scopes))
self.assertIn(scope2, set(gcc.service_account_scopes))

def test_clear_service_account_scopes(self):
# it's possible to use less service accounts than the default,
# just not very wise
conf_path = self.makefile(
'mrjob.conf',
b'runners:\n dataproc:\n service_account_scopes: !clear')

self.mrjob_conf_patcher.stop()
gcc = self._get_gce_cluster_config('-c', conf_path)
self.mrjob_conf_patcher.start()

self.assertLess(
set(gcc.service_account_scopes),
set(_DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES))



class ProjectIDTestCase(MockGoogleTestCase):

def test_default(self):
Expand Down Expand Up @@ -884,6 +943,9 @@ def test_preemtible_task_instances(self):
conf.secondary_worker_config.is_preemptible)





class MasterBootstrapScriptTestCase(MockGoogleTestCase):

def test_usr_bin_env(self):
Expand Down

0 comments on commit e258d37

Please sign in to comment.