Skip to content

Commit

Permalink
Merge pull request #1689 from davidmarin/pooling-jk
Browse files Browse the repository at this point in the history
Turn off pooling by default (fixes #1663, #1686, #1687)
  • Loading branch information
David Marin committed Oct 22, 2017
2 parents f50e682 + 7bef6d7 commit 95b94b4
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 122 deletions.
38 changes: 27 additions & 11 deletions docs/guides/emr-opts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -366,32 +366,48 @@ Cluster creation and configuration
and other instance options. See :mrjob-opt:`instance_fleets` for
details.
.. mrjob-opt::
:config: max_hours_idle
:switch: --max-hours-idle
:type: :ref:`string <data-type-string>`
:config: max_mins_idle
:switch: --max-mins-idle
:type: float
:set: emr
:default: 0.5
:default: 5
Automatically terminate persistent/pooled clusters that have been idle at
least this many hours, if we're within :mrjob-opt:`mins_to_end_of_hour` of
an EC2 billing hour.
least this many minutes.
.. versionchanged:: 0.6.0
All clusters launched by mrjob now auto-terminate when idle. In previous
versions, you needed to set this option explicitly, or use
:ref:`terminate-idle-clusters`.
versions, you needed to set :mrjob-opt:`max_hours_idle`, set this
option explicitly, or use :ref:`terminate-idle-clusters`.
.. mrjob-opt::
:config: max_hours_idle
:switch: --max-hours-idle
:type: float
:set: emr
:default: None
.. deprecated:: 0.6.0
Starting with v0.6.0, you should use :mrjob-opt:`max_mins_idle`
instead.
.. mrjob-opt::
:config: mins_to_end_of_hour
:switch: --mins-to-end-of-hour
:type: :ref:`string <data-type-string>`
:type: float
:set: emr
:default: 5.0
If :mrjob-opt:`max_hours_idle` is set, controls how close to the end of an
EC2 billing hour the cluster can automatically terminate itself.
.. deprecated:: 0.6.0
This option was created back when EMR billed by the full hour, and
does nothing as of v0.6.0. If using versions prior to v0.6.0, it's
recommended you set this to 60.0 to effectively disable this feature.
.. mrjob-opt::
:config: region
Expand Down
23 changes: 10 additions & 13 deletions docs/guides/pooling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,20 @@
Cluster Pooling
===============

Clusters on EMR take several minutes to spin up. Also, EMR bills by the full
hour, so if you run, say, a 10-minute job and then shut down the cluster, the
other 50 minutes are wasted.
Clusters on EMR take several minutes to spin up, which can make development
painfully slow.

To mitigate these problems, :py:mod:`mrjob` provides **cluster pools.** By
default, once your job completes, the cluster will stay open to accept
To get around this, :py:mod:`mrjob` provides
**cluster pooling.**. If you set :mrjob-opt:`pool_clusters` to true,
once your job completes, the cluster will stay open to accept
additional jobs, and eventually shut itself down after it has been idle
for a certain amount of time (see :mrjob-opt:`max_hours_idle` and
:mrjob-opt:`mins_to_end_of_hour`).
for a certain amount of time (by default, ten minutes; see
:mrjob-opt:`max_mins_idle`).

.. note::

Cluster pooling was not turned on by default in versions prior to 0.6.0.
To get the same behavior in previous versions :mrjob-opt:`pool_clusters` to
``True`` and :mrjob-opt:`max_hours_idle` to 0.5 (don't forget to set
`max_hours_idle`, or your clusters will never shut down).
When using cluster pooling prior to v0.6.0, make sure to set
:mrjob-opt:`max_hours_idle`, or your cluster will never shut down.

Pooling is designed so that jobs run against the same :py:mod:`mrjob.conf` can
share the same clusters. This means that the version of :py:mod:`mrjob` and
Expand All @@ -41,8 +39,7 @@ separate pools.
Pooling is flexible about instance type and number of instances; it will
attempt to select the most powerful cluster available as long as the
cluster's instances provide at least as much memory and at least as much CPU as
your job requests. If there is a tie, it picks clusters that are closest to
the end of a full hour, to minimize wasted instance hours.
your job requests.

Pooling is also somewhat flexible about EBS volumes (see
:mrjob-opt:`instance_groups`). Each volume must have the same volume type,
Expand Down
24 changes: 24 additions & 0 deletions mrjob/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from os.path import basename

from mrjob.bin import MRJobBinRunner
from mrjob.conf import combine_dicts
from mrjob.setup import WorkingDirManager
from mrjob.setup import parse_setup_cmd
from mrjob.util import cmd_line
Expand Down Expand Up @@ -52,6 +53,7 @@ class HadoopInTheCloudJobRunner(MRJobBinRunner):
'image_version',
'instance_type',
'master_instance_type',
'max_mins_idle',
'max_hours_idle',
'num_core_instances',
'num_task_instances',
Expand All @@ -66,6 +68,10 @@ class HadoopInTheCloudJobRunner(MRJobBinRunner):
def __init__(self, **kwargs):
super(HadoopInTheCloudJobRunner, self).__init__(**kwargs)

if self._opts.get('max_hours_idle'):
log.warning('max_hours_idle is deprecated and will be removed'
' in v0.7.0. Please use max_mins_idle instead')

# if *cluster_id* is not set, ``self._cluster_id`` will be
# set when we create or join a cluster
self._cluster_id = self._opts['cluster_id']
Expand All @@ -92,6 +98,24 @@ def __init__(self, **kwargs):

### Options ###

def _default_opts(self):
return combine_dicts(
super(HadoopInTheCloudJobRunner, self)._default_opts(),
dict(max_mins_idle=5.0),
)

def _fix_opts(self, opts, source=None):
# patch max_hours_idle into max_mins_idle (see #1663)
opts = super(HadoopInTheCloudJobRunner, self)._fix_opts(
opts, source=source)

if (opts.get('max_mins_idle') is None and
opts.get('max_hours_idle') is not None):

opts['max_mins_idle'] = opts['max_hours_idle'] * 60

return opts

def _combine_opts(self, opt_list):
"""Propagate *instance_type* to other instance type opts, if not
already set.
Expand Down
10 changes: 4 additions & 6 deletions mrjob/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
# version of mrjob
_DEFAULT_IMAGE_VERSION = '1.0'
_DEFAULT_CHECK_CLUSTER_EVERY = 10.0
_DEFAULT_MAX_HOURS_IDLE = 0.1
_DEFAULT_CLOUD_FS_SYNC_SECS = 5.0
_DEFAULT_CLOUD_TMP_DIR_OBJECT_TTL_DAYS = 90

Expand Down Expand Up @@ -108,7 +107,7 @@
}

# bootstrap action which automatically terminates idle clusters
_MAX_HOURS_IDLE_BOOTSTRAP_ACTION_PATH = os.path.join(
_MAX_MINS_IDLE_BOOTSTRAP_ACTION_PATH = os.path.join(
os.path.dirname(mrjob.__file__),
'bootstrap',
'terminate_idle_cluster_dataproc.sh')
Expand Down Expand Up @@ -299,7 +298,6 @@ def _default_opts(self):
image_version=_DEFAULT_IMAGE_VERSION,
instance_type=_DEFAULT_INSTANCE_TYPE,
master_instance_type=_DEFAULT_INSTANCE_TYPE,
max_hours_idle=_DEFAULT_MAX_HOURS_IDLE,
num_core_instances=_DATAPROC_MIN_WORKERS,
num_task_instances=0,
sh_bin=['/bin/sh', '-ex'],
Expand Down Expand Up @@ -417,7 +415,7 @@ def _add_bootstrap_files_for_upload(self):
self._create_master_bootstrap_script_if_needed()
if self._master_bootstrap_script_path:
self._upload_mgr.add(self._master_bootstrap_script_path)
self._upload_mgr.add(_MAX_HOURS_IDLE_BOOTSTRAP_ACTION_PATH)
self._upload_mgr.add(_MAX_MINS_IDLE_BOOTSTRAP_ACTION_PATH)

def _add_job_files_for_upload(self):
"""Add files needed for running the job (setup and input)
Expand Down Expand Up @@ -793,15 +791,15 @@ def _cluster_create_kwargs(self):
# always add idle termination script
# add it last, so that we don't count bootstrapping as idle time
gcs_init_script_uris.append(
self._upload_mgr.uri(_MAX_HOURS_IDLE_BOOTSTRAP_ACTION_PATH))
self._upload_mgr.uri(_MAX_MINS_IDLE_BOOTSTRAP_ACTION_PATH))

# NOTE - Cluster initializationActions can only take scripts with no
# script args, so the auto-term script receives 'mrjob-max-secs-idle'
# via metadata instead of as an arg
cluster_metadata = dict()
cluster_metadata['mrjob-version'] = mrjob.__version__
cluster_metadata['mrjob-max-secs-idle'] = str(int(
self._opts['max_hours_idle'] * 3600))
self._opts['max_mins_idle'] * 60))

cluster_config = dict(
gceClusterConfig=dict(
Expand Down
17 changes: 8 additions & 9 deletions mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@
_POOLING_SLEEP_INTERVAL = 30.01 # Add .1 seconds so minutes arent spot on.

# bootstrap action which automatically terminates idle clusters
_MAX_HOURS_IDLE_BOOTSTRAP_ACTION_PATH = os.path.join(
_MAX_MINS_IDLE_BOOTSTRAP_ACTION_PATH = os.path.join(
os.path.dirname(mrjob.__file__),
'bootstrap',
'terminate_idle_cluster.sh')
Expand Down Expand Up @@ -470,11 +470,10 @@ def _default_opts(self):
cloud_fs_sync_secs=5.0,
cloud_upload_part_size=100, # 100 MB
image_version=_DEFAULT_IMAGE_VERSION,
max_hours_idle=0.5,
mins_to_end_of_hour=5.0,
num_core_instances=0,
num_task_instances=0,
pool_clusters=True,
pool_clusters=False,
pool_name='default',
pool_wait_minutes=0,
region=_DEFAULT_EMR_REGION,
Expand Down Expand Up @@ -808,9 +807,9 @@ def _add_bootstrap_files_for_upload(self, persistent=False):
for bootstrap_action in self._bootstrap_actions():
self._upload_mgr.add(bootstrap_action['path'])

# Add max-hours-idle script if we need it
# Add max-mins-idle script if we need it
if persistent or self._opts['pool_clusters']:
self._upload_mgr.add(_MAX_HOURS_IDLE_BOOTSTRAP_ACTION_PATH)
self._upload_mgr.add(_MAX_MINS_IDLE_BOOTSTRAP_ACTION_PATH)

def _add_master_node_setup_files_for_upload(self):
"""Add files necesary for the master node setup script to
Expand Down Expand Up @@ -1354,9 +1353,9 @@ def _cluster_kwargs(self, persistent=False):
# use idle termination script on persistent clusters
# add it last, so that we don't count bootstrapping as idle time
uri = self._upload_mgr.uri(
_MAX_HOURS_IDLE_BOOTSTRAP_ACTION_PATH)
_MAX_MINS_IDLE_BOOTSTRAP_ACTION_PATH)
# script takes args in (integer) seconds
ba_args = [str(int(self._opts['max_hours_idle'] * 3600)),
ba_args = [str(int(self._opts['max_mins_idle'] * 60)),
str(int(self._opts['mins_to_end_of_hour'] * 60))]
BootstrapActions.append(dict(
Name='idle timeout',
Expand Down Expand Up @@ -1924,8 +1923,8 @@ def _check_for_pooled_cluster_self_termination(self, cluster, step):
if self._created_cluster:
return

# don't check for max_hours_idle because it's possible to
# join a self-terminating cluster without having max_hours_idle set
# don't check for max_mins_idle because it's possible to
# join a self-terminating cluster without having max_mins_idle set
# on this runner (pooling only cares about the master bootstrap script,
# not other bootstrap actions)

Expand Down
24 changes: 16 additions & 8 deletions mrjob/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,22 +794,33 @@ def __call__(self, parser, namespace, value, option_string=None):
)),
],
),
max_hours_idle=dict(
max_mins_idle=dict(
cloud_role='launch',
switches=[
(['--max-hours-idle'], dict(
(['--max-mins-idle'], dict(
help=("If we create a cluster, have it automatically"
" terminate itself after it's been idle this many"
" hours"),
" minutes"),
type=float,
)),
],
),
max_hours_idle=dict(
cloud_role='launch',
deprecated=True,
switches=[
(['--max-hours-idle'], dict(
help='Please use --max-mins-idle instead',
type=float,
)),
],
),
mins_to_end_of_hour=dict(
cloud_role='launch',
deprecated=True,
switches=[
(['--mins-to-end-of-hour'], dict(
help=("If --max-hours-idle is set, control how close to the"
help=("If --max-mins-idle is set, control how close to the"
" end of an hour the cluster can automatically"
" terminate itself (default is 5 minutes)"),
type=float,
Expand Down Expand Up @@ -848,10 +859,7 @@ def __call__(self, parser, namespace, value, option_string=None):
(['--pool-clusters'], dict(
action='store_true',
help=('Add to an existing cluster or create a new one that'
' does not terminate when the job completes.\n'
'WARNING: do not run this without --max-hours-idle or '
' with mrjob terminate-idle-clusters in your crontab;'
' clusters left idle can quickly become expensive!'),
' does not terminate when the job completes.'),
)),
(['--no-pool-clusters'], dict(
action='store_false',
Expand Down
29 changes: 20 additions & 9 deletions mrjob/tools/emr/terminate_idle_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@

log = logging.getLogger(__name__)

_DEFAULT_MAX_HOURS_IDLE = 1
_DEFAULT_MAX_MINS_IDLE = 60
_DEFAULT_MAX_MINUTES_LOCKED = 1


Expand All @@ -91,9 +91,16 @@ def main(cl_args=None):
MRJob.set_up_logging(quiet=options.quiet,
verbose=options.verbose)

# support
max_mins_idle = options.max_mins_idle
if max_mins_idle is None and options.max_hours_idle is not None:
log.warning('--max-hours-idle is deprecated and will be removed'
' in v0.7.0. Please use --max-mins-idle instead.')
max_mins_idle = options.max_hours_idle * 60

_maybe_terminate_clusters(
dry_run=options.dry_run,
max_hours_idle=options.max_hours_idle,
max_mins_idle=max_mins_idle,
mins_to_end_of_hour=options.mins_to_end_of_hour,
unpooled_only=options.unpooled_only,
now=_boto3_now(),
Expand All @@ -107,7 +114,7 @@ def main(cl_args=None):

def _runner_kwargs(options):
kwargs = options.__dict__.copy()
for unused_arg in ('quiet', 'verbose', 'max_hours_idle',
for unused_arg in ('quiet', 'verbose', 'max_mins_idle', 'max_hours_idle',
'max_mins_locked',
'mins_to_end_of_hour', 'unpooled_only',
'pooled_only', 'pool_name', 'dry_run'):
Expand All @@ -117,7 +124,7 @@ def _runner_kwargs(options):


def _maybe_terminate_clusters(dry_run=False,
max_hours_idle=None,
max_mins_idle=None,
mins_to_end_of_hour=None,
now=None,
pool_name=None,
Expand All @@ -130,8 +137,8 @@ def _maybe_terminate_clusters(dry_run=False,
now = _boto3_now()

# old default behavior
if max_hours_idle is None and mins_to_end_of_hour is None:
max_hours_idle = _DEFAULT_MAX_HOURS_IDLE
if max_mins_idle is None and mins_to_end_of_hour is None:
max_mins_idle = _DEFAULT_MAX_MINS_IDLE

runner = EMRJobRunner(**kwargs)
emr_client = runner.make_emr_client()
Expand Down Expand Up @@ -199,8 +206,8 @@ def _maybe_terminate_clusters(dry_run=False,
cluster_summary['Name']))

# filter out clusters that don't meet our criteria
if (max_hours_idle is not None and
time_idle <= timedelta(hours=max_hours_idle)):
if (max_mins_idle is not None and
time_idle <= timedelta(minutes=max_mins_idle)):
continue

# mins_to_end_of_hour doesn't apply to jobs with pending steps
Expand Down Expand Up @@ -348,7 +355,11 @@ def _make_arg_parser():
arg_parser.add_argument(
'--max-hours-idle', dest='max_hours_idle',
default=None, type=float,
help=('Max number of hours a cluster can go without bootstrapping,'
help=('Please use --max-mins-idle instead.'))
arg_parser.add_argument(
'--max-mins-idle', dest='max_mins_idle',
default=None, type=float,
help=('Max number of minutes a cluster can go without bootstrapping,'
' running a step, or having a new step created. This will fire'
' even if there are pending steps which EMR has failed to'
' start. Make sure you set this higher than the amount of time'
Expand Down

0 comments on commit 95b94b4

Please sign in to comment.