From 126e4cfdc9e152ba8af60055b5b76afeee451f06 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Tue, 9 Sep 2014 19:13:47 -0400 Subject: [PATCH 1/6] wait for specific cluster states --- ec2/spark_ec2.py | 79 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 73 insertions(+), 6 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index bfd07593b92ed..b3ee208aeb69c 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -60,9 +60,11 @@ def parse_args(): parser.add_option( "-s", "--slaves", type="int", default=1, help="Number of slaves to launch (default: %default)") - parser.add_option( - "-w", "--wait", type="int", default=120, - help="Seconds to wait for nodes to start (default: %default)") + # NOTE: For strict "API" compatibility, we should probably leave this in + # and just mark it as deprecated / not used. + # parser.add_option( + # "-w", "--wait", type="int", default=120, + # help="Seconds to wait for nodes to start (default: %default)") parser.add_option( "-k", "--key-pair", help="Key pair to use on instances") @@ -618,6 +620,55 @@ def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes): time.sleep(wait_secs) +def is_ssh_available(host, opts): + "Checks if SSH is available on the host." + try: + with open(os.devnull, 'w') as devnull: + ret = subprocess.check_call( + ssh_command(opts) + + ['-t', '-t', + '-o', 'ConnectTimeout=3', + '%s@%s' % (opts.user, host), + stringify_command('true')], + stdout=devnull, + stderr=devnull + ) + if ret == 0: + return True + else: + return False + except subprocess.CalledProcessError as e: + return False + + +def wait_for_cluster_state(cluster_instances, cluster_state, opts): + """ + cluster_instances: a list of boto.ec2.instance.Instance + cluster_state: a string representing the desired state of all the instances in the cluster + value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as + 'running', 'terminated', etc. + (would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250) + """ + sys.stdout.write("Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state)) + sys.stdout.flush() + while True: + for i in cluster_instances: + s = i.update() # capture output to suppress print to screen in newer versions of boto + # print "{instance}: {state}".format(instance=i.id, state=i.state) + if cluster_state == 'ssh-ready': + if all(i.state == 'running' for i in cluster_instances) and \ + all(is_ssh_available(host=i.ip_address, opts=opts) for i in cluster_instances): + print "" # so that next line of output starts on new line + return + else: + if all(i.state == cluster_state for i in cluster_instances): + print "" # so that next line of output starts on new line + return + sys.stdout.write(".") + sys.stdout.flush() + time.sleep(3) + + # Get number of local disks available for a given EC2 instance type. def get_num_disks(instance_type): # From http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/InstanceStorage.html @@ -872,7 +923,14 @@ def real_main(): (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) else: (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name) - wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes) + # wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes) + # NOTE: This next line means if we have a terminally broken cluster, (e.gl for a resume) + # we'll keep waiting until the user exits. + wait_for_cluster_state( + cluster_instances=(master_nodes + slave_nodes), + cluster_state='ssh-ready', + opts=opts + ) setup_cluster(conn, master_nodes, slave_nodes, opts, True) elif action == "destroy": @@ -901,7 +959,11 @@ def real_main(): else: group_names = [opts.security_group_prefix + "-master", opts.security_group_prefix + "-slaves"] - + wait_for_cluster_state( + cluster_instances=(master_nodes + slave_nodes), + cluster_state='terminated', + opts=opts + ) attempt = 1 while attempt <= 3: print "Attempt %d" % attempt @@ -987,7 +1049,12 @@ def real_main(): for inst in master_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() - wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes) + # wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes) + wait_for_cluster_state( + cluster_instances=(master_nodes + slave_nodes), + cluster_state='ssh-ready', + opts=opts + ) setup_cluster(conn, master_nodes, slave_nodes, opts, False) else: From 79692651aff75c258a753b87119bca435a628c74 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Tue, 9 Sep 2014 19:19:49 -0400 Subject: [PATCH 2/6] fix long line (PEP 8) --- ec2/spark_ec2.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index b3ee208aeb69c..f542c6366606f 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -649,7 +649,9 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts): 'running', 'terminated', etc. (would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250) """ - sys.stdout.write("Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state)) + sys.stdout.write( + "Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state) + ) sys.stdout.flush() while True: for i in cluster_instances: From bb67c06fee54665a0cb68b4717dd0d32004b205a Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Wed, 10 Sep 2014 14:57:23 -0400 Subject: [PATCH 3/6] deprecate wait option; remove dead code --- ec2/spark_ec2.py | 61 +++++++++++++++++------------------------------- 1 file changed, 22 insertions(+), 39 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index f542c6366606f..192836f92701e 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -32,6 +32,7 @@ import tempfile import time import urllib2 +import warnings from optparse import OptionParser from sys import stderr import boto @@ -60,11 +61,9 @@ def parse_args(): parser.add_option( "-s", "--slaves", type="int", default=1, help="Number of slaves to launch (default: %default)") - # NOTE: For strict "API" compatibility, we should probably leave this in - # and just mark it as deprecated / not used. - # parser.add_option( - # "-w", "--wait", type="int", default=120, - # help="Seconds to wait for nodes to start (default: %default)") + parser.add_option( + "-w", "--wait", type="int", + help="DEPRECATED - Seconds to wait for nodes to start") parser.add_option( "-k", "--key-pair", help="Key pair to use on instances") @@ -194,18 +193,6 @@ def get_or_make_group(conn, name): return conn.create_security_group(name, "Spark EC2 group") -# Wait for a set of launched instances to exit the "pending" state -# (i.e. either to start running or to fail and be terminated) -def wait_for_instances(conn, instances): - while True: - for i in instances: - i.update() - if len([i for i in instances if i.state == 'pending']) > 0: - time.sleep(5) - else: - return - - # Check whether a given EC2 instance object is in a state we consider active, # i.e. not terminating or terminated. We count both stopping and stopped as # active since we can restart stopped clusters. @@ -610,16 +597,6 @@ def setup_spark_cluster(master, opts): print "Ganglia started at http://%s:5080/ganglia" % master -# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up -def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes): - print "Waiting for instances to start up..." - time.sleep(5) - wait_for_instances(conn, master_nodes) - wait_for_instances(conn, slave_nodes) - print "Waiting %d more seconds..." % wait_secs - time.sleep(wait_secs) - - def is_ssh_available(host, opts): "Checks if SSH is available on the host." try: @@ -633,10 +610,7 @@ def is_ssh_available(host, opts): stdout=devnull, stderr=devnull ) - if ret == 0: - return True - else: - return False + return ret == 0 except subprocess.CalledProcessError as e: return False @@ -653,6 +627,7 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts): "Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state) ) sys.stdout.flush() + while True: for i in cluster_instances: s = i.update() # capture output to suppress print to screen in newer versions of boto @@ -660,16 +635,16 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts): if cluster_state == 'ssh-ready': if all(i.state == 'running' for i in cluster_instances) and \ all(is_ssh_available(host=i.ip_address, opts=opts) for i in cluster_instances): - print "" # so that next line of output starts on new line - return + break else: if all(i.state == cluster_state for i in cluster_instances): - print "" # so that next line of output starts on new line - return + break sys.stdout.write(".") sys.stdout.flush() time.sleep(3) + print "" # so that next line of output starts on new line + # Get number of local disks available for a given EC2 instance type. def get_num_disks(instance_type): @@ -903,6 +878,16 @@ def real_main(): (opts, action, cluster_name) = parse_args() # Input parameter validation + if opts.wait is not None: + # NOTE: DeprecationWarnings are silent in 2.7+ by default. + # To show them, run Python with the -Wdefault switch. + # See: https://docs.python.org/3.5/whatsnew/2.7.html + warnings.warn( + "This option is deprecated and has no effect. " + "spark-ec2 automatically waits as long as necessary for clusters to startup.", + DeprecationWarning + ) + if opts.ebs_vol_num > 8: print >> stderr, "ebs-vol-num cannot be greater than 8" sys.exit(1) @@ -925,9 +910,8 @@ def real_main(): (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) else: (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name) - # wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes) - # NOTE: This next line means if we have a terminally broken cluster, (e.gl for a resume) - # we'll keep waiting until the user exits. + # NOTE: This next line means if we have a terminally broken cluster, + # (e.g during a --resume) we'll keep waiting until the user exits. wait_for_cluster_state( cluster_instances=(master_nodes + slave_nodes), cluster_state='ssh-ready', @@ -1051,7 +1035,6 @@ def real_main(): for inst in master_nodes: if inst.state not in ["shutting-down", "terminated"]: inst.start() - # wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes) wait_for_cluster_state( cluster_instances=(master_nodes + slave_nodes), cluster_state='ssh-ready', From 26c5ed0747036378676ffd189b293b5fa31f2aac Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Wed, 10 Sep 2014 17:18:45 -0400 Subject: [PATCH 4/6] replace print with write() --- ec2/spark_ec2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 192836f92701e..ab9e5c225563b 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -643,7 +643,7 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts): sys.stdout.flush() time.sleep(3) - print "" # so that next line of output starts on new line + sys.stdout.write("\n") # Get number of local disks available for a given EC2 instance type. From 9a9e035a96a7b23d44f03f77b60fb071a05e2eb0 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Wed, 10 Sep 2014 19:40:58 -0400 Subject: [PATCH 5/6] remove extraneous comment --- ec2/spark_ec2.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index ab9e5c225563b..5bfe18cc18daa 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -910,8 +910,6 @@ def real_main(): (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) else: (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name) - # NOTE: This next line means if we have a terminally broken cluster, - # (e.g during a --resume) we'll keep waiting until the user exits. wait_for_cluster_state( cluster_instances=(master_nodes + slave_nodes), cluster_state='ssh-ready', From 43a69f00a2fd004a57b860b3ee6bda8fc1e9f840 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Sun, 14 Sep 2014 00:52:23 -0400 Subject: [PATCH 6/6] short-circuit SSH check; linear backoff --- ec2/spark_ec2.py | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 5bfe18cc18daa..1c804555a563d 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -63,7 +63,7 @@ def parse_args(): help="Number of slaves to launch (default: %default)") parser.add_option( "-w", "--wait", type="int", - help="DEPRECATED - Seconds to wait for nodes to start") + help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start") parser.add_option( "-k", "--key-pair", help="Key pair to use on instances") @@ -602,11 +602,8 @@ def is_ssh_available(host, opts): try: with open(os.devnull, 'w') as devnull: ret = subprocess.check_call( - ssh_command(opts) + - ['-t', '-t', - '-o', 'ConnectTimeout=3', - '%s@%s' % (opts.user, host), - stringify_command('true')], + ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3', + '%s@%s' % (opts.user, host), stringify_command('true')], stdout=devnull, stderr=devnull ) @@ -615,6 +612,14 @@ def is_ssh_available(host, opts): return False +def is_cluster_ssh_available(cluster_instances, opts): + for i in cluster_instances: + if not is_ssh_available(host=i.ip_address, opts=opts): + return False + else: + return True + + def wait_for_cluster_state(cluster_instances, cluster_state, opts): """ cluster_instances: a list of boto.ec2.instance.Instance @@ -628,20 +633,26 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts): ) sys.stdout.flush() + num_attempts = 0 + while True: + time.sleep(3 * num_attempts) + for i in cluster_instances: s = i.update() # capture output to suppress print to screen in newer versions of boto - # print "{instance}: {state}".format(instance=i.id, state=i.state) + if cluster_state == 'ssh-ready': if all(i.state == 'running' for i in cluster_instances) and \ - all(is_ssh_available(host=i.ip_address, opts=opts) for i in cluster_instances): + is_cluster_ssh_available(cluster_instances, opts): break else: if all(i.state == cluster_state for i in cluster_instances): break + + num_attempts += 1 + sys.stdout.write(".") sys.stdout.flush() - time.sleep(3) sys.stdout.write("\n")