Skip to content

Commit

Permalink
Merge pull request #1794 from davidmarin/google-scopes
Browse files Browse the repository at this point in the history
revise google_account_scopes to let the API set defaults. use single "master" scope for credentials
  • Loading branch information
David Marin committed May 31, 2018
2 parents 3194d69 + db92ea3 commit a0618c5
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 64 deletions.
21 changes: 5 additions & 16 deletions docs/guides/dataproc-opts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -111,25 +111,14 @@ Cluster creation and configuration

.. mrjob-opt::
:config: service_account_scopes
:switch: --service-account-scope
:switch: --service-account-scopes
:set: dataproc
:default: (automatic)

Service account scopes to use when creating a cluster. By default,
Dataproc uses these scopes::

https://www.googleapis.com/auth/bigquery
https://www.googleapis.com/auth/bigtable.admin.table
https://www.googleapis.com/auth/bigtable.data
https://www.googleapis.com/auth/cloud-platform
https://www.googleapis.com/auth/cloud.useraccounts.readonly
https://www.googleapis.com/auth/devstorage.full_control
https://www.googleapis.com/auth/devstorage.read_write
https://www.googleapis.com/auth/logging.write

``--service-account-scope`` can only be used to add additional scopes.
If you wish to exclude some of these scopes, you can use ``!clear`` in
your config file (see :ref:`clearing-configs`).
Optional service account scopes to pass to the API when creating a cluster.

Generally it's suggested that you instead create a
:mrjob-opt:`service_account` with the scopes you want.

.. versionadded:: 0.6.3

Expand Down
39 changes: 22 additions & 17 deletions mrjob/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from mrjob.logs.task import _parse_task_stderr
from mrjob.logs.task import _parse_task_syslog_records
from mrjob.logs.step import _interpret_new_dataproc_step_stderr
from mrjob.parse import is_uri
from mrjob.py2 import PY2
from mrjob.py2 import string_types
from mrjob.py2 import to_unicode
Expand All @@ -74,19 +75,6 @@
_DEFAULT_CLOUD_FS_SYNC_SECS = 5.0
_DEFAULT_CLOUD_TMP_DIR_OBJECT_TTL_DAYS = 90

# https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.clusters#GceClusterConfig # noqa
# NOTE - added cloud-platform so we can invoke gcloud commands from the cluster master (used for auto termination script) # noqa
_DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES = [
'https://www.googleapis.com/auth/cloud.useraccounts.readonly',
'https://www.googleapis.com/auth/devstorage.read_write',
'https://www.googleapis.com/auth/logging.write',
'https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/bigtable.admin.table',
'https://www.googleapis.com/auth/bigtable.data',
'https://www.googleapis.com/auth/devstorage.full_control',
'https://www.googleapis.com/auth/cloud-platform',
]

# job state matcher enum
# use this to only find active jobs. (2 for NON_ACTIVE, but we don't use that)
_STATE_MATCHER_ACTIVE = 1
Expand Down Expand Up @@ -121,7 +109,6 @@
port=8088,
)


# used to match log entries that tell us if a container exited
_CONTAINER_EXECUTOR_CLASS_NAME = (
'org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor')
Expand All @@ -138,6 +125,9 @@
r'|Please initialize the log4j system'
r'|See http://logging.apache.org/log4j)')

# this is equivalent to full permission
_FULL_SCOPE = 'https://www.googleapis.com/auth/cloud-platform'

# convert enum values to strings (e.g. 'RUNNING')

def _cluster_state_name(state_value):
Expand Down Expand Up @@ -265,7 +255,7 @@ def __init__(self, **kwargs):

# load credentials and project ID
self._credentials, auth_project_id = google.auth.default(
scopes=_DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES)
scopes=[_FULL_SCOPE]) # needed for $GOOGLE_APPLICATION_CREDENTIALS

self._project_id = self._opts['project_id'] or auth_project_id

Expand All @@ -276,6 +266,12 @@ def __init__(self, **kwargs):

self._fix_zone_and_region_opts()

if self._opts['service_account_scopes']:
self._opts['service_account_scopes'] = [
_fully_qualify_scope_uri(s)
for s in self._opts['service_account_scopes']
]

# cluster_id can be None here
self._cluster_id = self._opts['cluster_id']

Expand Down Expand Up @@ -350,8 +346,6 @@ def _default_opts(self):
master_instance_type=_DEFAULT_INSTANCE_TYPE,
num_core_instances=_DATAPROC_MIN_WORKERS,
num_task_instances=0,
service_account_scopes=list(
_DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES),
sh_bin=['/bin/sh', '-ex'],
)
)
Expand Down Expand Up @@ -1149,6 +1143,10 @@ def _cluster_create_kwargs(self):
gce_cluster_config['service_account'] = (
self._opts['service_account'])

if self._opts['service_account_scopes']:
gce_cluster_config['service_account_scopes'] = (
self._opts['service_account_scopes'])

if self._opts['zone']:
gce_cluster_config['zone_uri'] = _gcp_zone_uri(
project=self._project_id, zone=self._opts['zone'])
Expand Down Expand Up @@ -1438,3 +1436,10 @@ def _values_to_text(d):
result[k] = v

return result


def _fully_qualify_scope_uri(uri):
if is_uri(uri):
return uri
else:
return 'https://www.googleapis.com/auth/%s' % uri
19 changes: 10 additions & 9 deletions mrjob/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,15 @@ def __call__(self, parser, namespace, value, option_string=None):
setattr(namespace, self.dest, result)


class _SubnetsAction(Action):
class _CommaSeparatedListAction(Action):
"""action to parse a comma-separated list of subnets.
This eliminates whitespace
"""
def __call__(self, parser, namespace, value, option_string=None):
subnets = [s.strip() for s in value.split(',') if s]
items = [s.strip() for s in value.split(',') if s]

setattr(namespace, self.dest, subnets)
setattr(namespace, self.dest, items)


class _AppendJSONAction(Action):
Expand Down Expand Up @@ -1014,12 +1014,13 @@ def __call__(self, parser, namespace, value, option_string=None):
),
service_account_scopes=dict(
cloud_role='launch',
combiner=combine_lists,
switches=[
(['--service-account-scope'], dict(
action='append',
help=('Additional service account scope to use when creating'
' a Dataproc cluster.'),
(['--service-account-scopes'], dict(
action=_CommaSeparatedListAction,
help=("A comma-separated list of service account scopes"
" on Dataproc, used to limit your cluster's access."
" For each scope, you can specify the"
" full URI or just the name (e.g. 'logging.write')"),
)),
],
),
Expand Down Expand Up @@ -1157,7 +1158,7 @@ def __call__(self, parser, namespace, value, option_string=None):
' subnetwork to launch cluster in.'),
)),
(['--subnets'], dict(
action=_SubnetsAction,
action=_CommaSeparatedListAction,
help=('Like --subnet, but with a comma-separated list, to'
' specify multiple subnets in conjunction with'
' --instance-fleets (EMR only)'),
Expand Down
5 changes: 3 additions & 2 deletions tests/mock_google/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
from mrjob.parse import is_uri
from mrjob.util import random_identifier

# default boot disk size set by the API
_DEFAULT_DISK_SIZE_GB = 500

# account scopes that are included whether you ask for them or not
# for more info, see:
Expand All @@ -53,6 +51,9 @@
'https://www.googleapis.com/auth/devstorage.full_control',
}

# default boot disk size set by the API
_DEFAULT_DISK_SIZE_GB = 500

# actual properties taken from Dataproc
_DEFAULT_CLUSTER_PROPERTIES = {
'distcp:mapreduce.map.java.opts': '-Xmx2457m',
Expand Down
48 changes: 28 additions & 20 deletions tests/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from mrjob.dataproc import _CONTAINER_EXECUTOR_CLASS_NAME
from mrjob.dataproc import _DEFAULT_CLOUD_TMP_DIR_OBJECT_TTL_DAYS
from mrjob.dataproc import _DEFAULT_GCE_REGION
from mrjob.dataproc import _DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES
from mrjob.dataproc import _DEFAULT_IMAGE_VERSION
from mrjob.dataproc import _HADOOP_STREAMING_JAR_URI
from mrjob.dataproc import _cluster_state_name
Expand All @@ -45,6 +44,7 @@
from mrjob.fs.gcs import GCSFilesystem
from mrjob.fs.gcs import parse_gcs_uri
from mrjob.logs.errors import _pick_error
from mrjob.parse import is_uri
from mrjob.py2 import PY2
from mrjob.py2 import StringIO
from mrjob.step import StepFailedException
Expand All @@ -53,6 +53,8 @@
from mrjob.util import save_current_environment

from tests.mock_google import MockGoogleTestCase
from tests.mock_google.dataproc import _DEFAULT_SCOPES
from tests.mock_google.dataproc import _MANDATORY_SCOPES
from tests.mock_google.storage import MockGoogleStorageBlob
from tests.mr_hadoop_format_job import MRHadoopFormatJob
from tests.mr_jar_and_streaming import MRJarAndStreaming
Expand Down Expand Up @@ -458,8 +460,18 @@ def test_default(self):
gcc = self._get_gce_cluster_config()

self.assertFalse(gcc.service_account)
self.assertEqual(set(gcc.service_account_scopes),
set(_DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES))
self.assertEqual(
set(gcc.service_account_scopes),
_MANDATORY_SCOPES | _DEFAULT_SCOPES
)

def test_blank_means_default(self):
gcc = self._get_gce_cluster_config('--service-account-scopes', '')

self.assertEqual(
set(gcc.service_account_scopes),
_MANDATORY_SCOPES | _DEFAULT_SCOPES
)

def test_service_account(self):
account = '12345678901-compute@developer.gserviceaccount.com'
Expand All @@ -474,29 +486,25 @@ def test_service_account_scopes(self):
scope2 = 'https://www.googleapis.com/auth/scope2'

gcc = self._get_gce_cluster_config(
'--service-account-scope', scope1,
'--service-account-scope', scope2)
'--service-account-scopes', '%s,%s' % (scope1, scope2))

self.assertGreater(
self.assertEqual(
set(gcc.service_account_scopes),
set(_DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES))
self.assertIn(scope1, set(gcc.service_account_scopes))
self.assertIn(scope2, set(gcc.service_account_scopes))
_MANDATORY_SCOPES | {scope1, scope2})

def test_clear_service_account_scopes(self):
# it's possible to use less service accounts than the default,
# just not very wise
conf_path = self.makefile(
'mrjob.conf',
b'runners:\n dataproc:\n service_account_scopes: !clear')
def test_set_scope_by_name(self):
scope_name = 'test.name'
scope_uri = 'https://www.googleapis.com/auth/test.name'

self.mrjob_conf_patcher.stop()
gcc = self._get_gce_cluster_config('-c', conf_path)
self.mrjob_conf_patcher.start()
gcc = self._get_gce_cluster_config(
'--service-account-scopes', scope_name)

self.assertLess(
self.assertEqual(
set(gcc.service_account_scopes),
set(_DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES))
_MANDATORY_SCOPES | {scope_uri})





class ClusterPropertiesTestCase(MockGoogleTestCase):
Expand Down

0 comments on commit a0618c5

Please sign in to comment.