Skip to content

Commit

Permalink
Merge pull request #1792 from davidmarin/google-idle-timeout
Browse files Browse the repository at this point in the history
Use Dataproc's built-in idle timeout feature (fixes #1705)
  • Loading branch information
David Marin committed May 31, 2018
2 parents 78d2438 + 65f5fc7 commit 3194d69
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 136 deletions.
4 changes: 4 additions & 0 deletions docs/guides/emr-opts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@ Cluster creation and configuration
and when it may first terminate the cluster, to allow Hadoop to
accept your first job.
.. note::
The Google Dataproc won't allow an idle time of less than 10 minutes.
.. mrjob-opt::
:config: max_hours_idle
:switch: --max-hours-idle
Expand Down
88 changes: 0 additions & 88 deletions mrjob/bootstrap/terminate_idle_cluster_dataproc.sh

This file was deleted.

19 changes: 7 additions & 12 deletions mrjob/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,6 @@
'1.0': '2.7.2'
}

# bootstrap action which automatically terminates idle clusters
_MAX_MINS_IDLE_BOOTSTRAP_ACTION_PATH = join(
dirname(mrjob.__file__),
'bootstrap',
'terminate_idle_cluster_dataproc.sh')

_HADOOP_STREAMING_JAR_URI = (
'file:///usr/lib/hadoop-mapreduce/hadoop-streaming.jar')

Expand Down Expand Up @@ -511,7 +505,6 @@ def _add_bootstrap_files_for_upload(self):
self._create_master_bootstrap_script_if_needed()
if self._master_bootstrap_script_path:
self._upload_mgr.add(self._master_bootstrap_script_path)
self._upload_mgr.add(_MAX_MINS_IDLE_BOOTSTRAP_ACTION_PATH)

def _add_job_files_for_upload(self):
"""Add files needed for running the job (setup and input)
Expand Down Expand Up @@ -1130,16 +1123,14 @@ def _cluster_create_kwargs(self):
gcs_init_script_uris.append(
self._upload_mgr.uri(self._master_bootstrap_script_path))

# always add idle termination script
# add it last, so that we don't count bootstrapping as idle time
gcs_init_script_uris.append(
self._upload_mgr.uri(_MAX_MINS_IDLE_BOOTSTRAP_ACTION_PATH))

# NOTE - Cluster initialization_actions can only take scripts with no
# script args, so the auto-term script receives 'mrjob-max-secs-idle'
# via metadata instead of as an arg
cluster_metadata = dict()
cluster_metadata['mrjob-version'] = mrjob.__version__

# TODO: remove this once lifecycle_config is visible through
# gcloud and the Google Cloud Console
cluster_metadata['mrjob-max-secs-idle'] = str(int(
self._opts['max_mins_idle'] * 60))

Expand Down Expand Up @@ -1202,6 +1193,10 @@ def _cluster_create_kwargs(self):
if secondary_worker_conf.get('num_instances'):
cluster_config['secondary_worker_config'] = secondary_worker_conf

cluster_config['lifecycle_config'] = dict(
idle_delete_ttl=dict(
seconds=int(self._opts['max_mins_idle'] * 60)))

software_config = {}

if self._opts['cluster_properties']:
Expand Down
51 changes: 15 additions & 36 deletions tests/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from mrjob.dataproc import _DEFAULT_GCE_SERVICE_ACCOUNT_SCOPES
from mrjob.dataproc import _DEFAULT_IMAGE_VERSION
from mrjob.dataproc import _HADOOP_STREAMING_JAR_URI
from mrjob.dataproc import _MAX_MINS_IDLE_BOOTSTRAP_ACTION_PATH
from mrjob.dataproc import _cluster_state_name
from mrjob.dataproc import _fix_java_stack_trace
from mrjob.dataproc import _fix_traceback
Expand Down Expand Up @@ -301,13 +300,13 @@ def test_cleanup_remote(self):
self._test_cloud_tmp_cleanup('CLOUD_TMP', 0)

def test_cleanup_local(self):
self._test_cloud_tmp_cleanup('LOCAL_TMP', 5)
self._test_cloud_tmp_cleanup('LOCAL_TMP', 4)

def test_cleanup_logs(self):
self._test_cloud_tmp_cleanup('LOGS', 5)
self._test_cloud_tmp_cleanup('LOGS', 4)

def test_cleanup_none(self):
self._test_cloud_tmp_cleanup('NONE', 5)
self._test_cloud_tmp_cleanup('NONE', 4)

def test_cleanup_combine(self):
self._test_cloud_tmp_cleanup('LOGS,CLOUD_TMP', 0)
Expand Down Expand Up @@ -748,7 +747,6 @@ def _test_instance_groups(self, opts, **kwargs):
fake_bootstrap_script = 'gs://fake-bucket/fake-script.sh'
runner._master_bootstrap_script_path = fake_bootstrap_script
runner._upload_mgr.add(fake_bootstrap_script)
runner._upload_mgr.add(_MAX_MINS_IDLE_BOOTSTRAP_ACTION_PATH)

cluster_id = runner._launch_cluster()

Expand Down Expand Up @@ -1176,50 +1174,31 @@ def test_no_mapper(self):

class MaxMinsIdleTestCase(MockGoogleTestCase):

def assertRanIdleTimeoutScriptWith(self, runner, expected_metadata):
cluster_metadata, last_init_exec = (
self._cluster_metadata_and_last_init_exec(runner))

# Verify args
for key in expected_metadata.keys():
self.assertEqual(cluster_metadata[key], expected_metadata[key])

expected_uri = runner._upload_mgr.uri(
_MAX_MINS_IDLE_BOOTSTRAP_ACTION_PATH)
self.assertEqual(last_init_exec, expected_uri)

def _cluster_metadata_and_last_init_exec(self, runner):
cluster = runner._get_cluster(runner.get_cluster_id())

# Verify last arg
last_init_action = cluster.config.initialization_actions[-1]
last_init_exec = last_init_action.executable_file

cluster_metadata = cluster.config.gce_cluster_config.metadata
return cluster_metadata, last_init_exec

def test_default(self):
mr_job = MRWordCount(['-r', 'dataproc'])
mr_job.sandbox()

with mr_job.make_runner() as runner:
runner.run()
self.assertRanIdleTimeoutScriptWith(runner, {
'mrjob-max-secs-idle': '600',
})

cluster = runner._get_cluster(runner._cluster_id)

self.assertEqual(
cluster.config.lifecycle_config.idle_delete_ttl.seconds,
600)

def test_persistent_cluster(self):
mr_job = MRWordCount(['-r', 'dataproc', '--max-mins-idle', '0.6'])
mr_job = MRWordCount(['-r', 'dataproc', '--max-mins-idle', '30'])
mr_job.sandbox()

with mr_job.make_runner() as runner:
runner.run()
self.assertRanIdleTimeoutScriptWith(runner, {
'mrjob-max-secs-idle': '36',
})

def test_bootstrap_script_is_actually_installed(self):
self.assertTrue(os.path.exists(_MAX_MINS_IDLE_BOOTSTRAP_ACTION_PATH))
cluster = runner._get_cluster(runner._cluster_id)

self.assertEqual(
cluster.config.lifecycle_config.idle_delete_ttl.seconds,
1800)


class TestCatFallback(MockGoogleTestCase):
Expand Down

0 comments on commit 3194d69

Please sign in to comment.