Skip to content

Commit

Permalink
Merge pull request #1779 from davidmarin/google-cluster-properties
Browse files Browse the repository at this point in the history
cluster_properties opt (fixes #1680)
  • Loading branch information
David Marin committed May 23, 2018
2 parents e258d37 + 726938d commit d342bd1
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 2 deletions.
11 changes: 11 additions & 0 deletions docs/guides/dataproc-opts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,17 @@ Cluster creation and configuration

.. versionadded:: 0.6.3

.. mrjob-opt::
:config: cluster_properties
:switch: --cluster-property
:set: dataproc
:default: ``None``

A dictionary of properties to set in the cluster's config files
(e.g. ``mapred-site.xml``). For details, see
`Cluster properties <https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/cluster-properties>`__.


Bootstrapping
-------------

Expand Down
32 changes: 30 additions & 2 deletions mrjob/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import time
import re
Expand Down Expand Up @@ -48,6 +49,7 @@
from mrjob.logs.task import _parse_task_syslog_records
from mrjob.logs.step import _interpret_new_dataproc_step_stderr
from mrjob.py2 import PY2
from mrjob.py2 import string_types
from mrjob.py2 import to_unicode
from mrjob.setup import UploadDirManager
from mrjob.step import StepFailedException
Expand Down Expand Up @@ -226,6 +228,7 @@ class DataprocJobRunner(HadoopInTheCloudJobRunner, LogInterpretationMixin):
alias = 'dataproc'

OPT_NAMES = HadoopInTheCloudJobRunner.OPT_NAMES | {
'cluster_properties',
'core_instance_config',
'gcloud_bin',
'master_instance_config',
Expand Down Expand Up @@ -1200,10 +1203,18 @@ def _cluster_create_kwargs(self):
if secondary_worker_conf.get('num_instances'):
cluster_config['secondary_worker_config'] = secondary_worker_conf

software_config = {}

if self._opts['cluster_properties']:
software_config['properties'] = _values_to_text(
self._opts['cluster_properties'])

# See - https://cloud.google.com/dataproc/dataproc-versions
if self._opts['image_version']:
cluster_config['software_config'] = dict(
image_version=self._opts['image_version'])
software_config['image_version'] = self._opts['image_version']

if software_config:
cluster_config['software_config'] = software_config

# in Python 2, dict keys loaded from JSON will be unicode, which
# the Google protobuf objects don't like
Expand Down Expand Up @@ -1416,3 +1427,20 @@ def _clean_json_dict_keys(x):
return [_clean_json_dict_keys(item) for item in x]
else:
return x


def _values_to_text(d):
"""Return a dictionary with the same keys as *d*, but where the
non-string, non-bytes values have been JSON-encoded.
Used to encode cluster properties.
"""
result = {}

for k, v in d.items():
if not isinstance(v, (string_types, bytes)):
v = json.dumps(v)

result[k] = v

return result
14 changes: 14 additions & 0 deletions mrjob/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,20 @@ def __call__(self, parser, namespace, value, option_string=None):
)),
],
),
cluster_properties=dict(
combiner=combine_dicts,
switches=[
(['--cluster-property'], dict(
action=_KeyValueAction,
help=('Properties to set in Hadoop config files on Dataproc.'
'Args take the form file_prefix:property=value.'
' You can use --cluster-property multiple times.'
' For more info, see'
' https://cloud.google.com/dataproc/docs/concepts'
'/configuring-clusters/cluster-properties'),
)),
],
),
cmdenv=dict(
combiner=combine_envs,
switches=[
Expand Down
39 changes: 39 additions & 0 deletions tests/mock_google/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,38 @@
'https://www.googleapis.com/auth/devstorage.full_control',
}

# actual properties taken from Dataproc
_DEFAULT_CLUSTER_PROPERTIES = {
'distcp:mapreduce.map.java.opts': '-Xmx2457m',
'distcp:mapreduce.map.memory.mb': '3072',
'distcp:mapreduce.reduce.java.opts': '-Xmx2457m',
'distcp:mapreduce.reduce.memory.mb': '3072',
'hdfs:dfs.namenode.handler.count': '20',
'hdfs:dfs.namenode.service.handler.count': '10',
'mapred-env:HADOOP_JOB_HISTORYSERVER_HEAPSIZE': '1000',
'mapred:mapreduce.map.cpu.vcores': '1',
'mapred:mapreduce.map.java.opts': '-Xmx2457m',
'mapred:mapreduce.map.memory.mb': '3072',
'mapred:mapreduce.reduce.cpu.vcores': '1',
'mapred:mapreduce.reduce.java.opts': '-Xmx2457m',
'mapred:mapreduce.reduce.memory.mb': '3072',
'mapred:yarn.app.mapreduce.am.command-opts': '-Xmx2457m',
'mapred:yarn.app.mapreduce.am.resource.cpu-vcores': '1',
'mapred:yarn.app.mapreduce.am.resource.mb': '3072',
'spark-env:SPARK_DAEMON_MEMORY': '1000m',
'spark:spark.driver.maxResultSize': '480m',
'spark:spark.driver.memory': '960m',
'spark:spark.executor.cores': '1',
'spark:spark.executor.memory': '1152m',
'spark:spark.yarn.am.memory': '1152m',
'spark:spark.yarn.am.memoryOverhead': '384',
'spark:spark.yarn.executor.memoryOverhead': '384',
'yarn-env:YARN_TIMELINESERVER_HEAPSIZE': '1000',
'yarn:yarn.nodemanager.resource.memory-mb': '3072',
'yarn:yarn.scheduler.maximum-allocation-mb': '3072',
'yarn:yarn.scheduler.minimum-allocation-mb': '256',
}

# convert strings (e.g. 'RUNNING') to enum values

def _cluster_state_value(state_name):
Expand Down Expand Up @@ -146,6 +178,13 @@ def create_cluster(self, project_id, region, cluster):

gce_config.service_account_scopes[:] = sorted(scopes)

# add in default cluster properties
props = cluster.config.software_config.properties

for k, v in _DEFAULT_CLUSTER_PROPERTIES.items():
if k not in props:
props[k] = v

# initialize cluster status
cluster.status.state = _cluster_state_value('CREATING')

Expand Down
41 changes: 41 additions & 0 deletions tests/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,47 @@ def test_clear_service_account_scopes(self):
set(_DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES))


class ClusterPropertiesTestCase(MockGoogleTestCase):

def _get_cluster_properties(self, *args):
job = MRWordCount(['-r', 'dataproc'] + list(args))
job.sandbox()

with job.make_runner() as runner:
runner._launch()
return runner._get_cluster(
runner._cluster_id).config.software_config.properties

def test_default(self):
props = self._get_cluster_properties()

self.assertNotIn('foo:bar', props)

def test_command_line(self):
props = self._get_cluster_properties(
'--cluster-property',
'dataproc:dataproc.allow.zero.workers=true',
'--cluster-property',
'mapred:mapreduce.map.memory.mb=1024',
)

self.assertEqual(props['dataproc:dataproc.allow.zero.workers'], 'true')
self.assertEqual(props['mapred:mapreduce.map.memory.mb'], '1024')

def test_convert_conf_values_to_strings(self):
conf_path = self.makefile(
'mrjob.conf',
b'runners:\n dataproc:\n cluster_properties:\n'
b" 'dataproc:dataproc.allow.zero.workers': true\n"
b" 'hdfs:dfs.namenode.handler.count': 40\n")

self.mrjob_conf_patcher.stop()
props = self._get_cluster_properties('-c', conf_path)
self.mrjob_conf_patcher.start()

self.assertEqual(props['dataproc:dataproc.allow.zero.workers'], 'true')
self.assertEqual(props['hdfs:dfs.namenode.handler.count'], '40')


class ProjectIDTestCase(MockGoogleTestCase):

Expand Down

0 comments on commit d342bd1

Please sign in to comment.