From 2b92534d67f742f53ab4ae3b8845e6d794dd1b88 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Wed, 28 Jan 2015 23:33:17 -0500 Subject: [PATCH 1/2] show SSH output after status check pass --- ec2/spark_ec2.py | 46 +++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 0de4a62e203fd..ec8116a9152b6 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -32,6 +32,7 @@ import sys import tarfile import tempfile +import textwrap import time import urllib2 import warnings @@ -675,29 +676,40 @@ def setup_spark_cluster(master, opts): print "Ganglia started at http://%s:5080/ganglia" % master -def is_ssh_available(host, opts): +def is_ssh_available(host, opts, print_ssh_output=False): """ Check if SSH is available on a host. """ - try: - with open(os.devnull, 'w') as devnull: - ret = subprocess.check_call( - ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3', - '%s@%s' % (opts.user, host), stringify_command('true')], - stdout=devnull, - stderr=devnull - ) - return ret == 0 - except subprocess.CalledProcessError as e: - return False - - -def is_cluster_ssh_available(cluster_instances, opts): + s = subprocess.Popen( + ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3', + '%s@%s' % (opts.user, host), stringify_command('true')], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT # we pipe stderr through stdout to preserve output order + ) + cmd_output = s.communicate()[0] # [1] is stderr, which we redirected to stdout + + if s.returncode != 0 and print_ssh_output: + # extra leading newline is for spacing in wait_for_cluster_state() + print textwrap.dedent("""\n + Warning: SSH connection error. (This could be temporary.) + Host: {h} + SSH return code: {r} + SSH output: {o} + """).format( + h=host, + r=s.returncode, + o=cmd_output.strip() + ) + + return s.returncode == 0 + + +def is_cluster_ssh_available(cluster_instances, opts, print_ssh_output=False): """ Check if SSH is available on all the instances in a cluster. """ for i in cluster_instances: - if not is_ssh_available(host=i.ip_address, opts=opts): + if not is_ssh_available(host=i.ip_address, opts=opts, print_ssh_output=print_ssh_output): return False else: return True @@ -733,7 +745,7 @@ def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state): if all(i.state == 'running' for i in cluster_instances) and \ all(s.system_status.status == 'ok' for s in statuses) and \ all(s.instance_status.status == 'ok' for s in statuses) and \ - is_cluster_ssh_available(cluster_instances, opts): + is_cluster_ssh_available(cluster_instances, opts, print_ssh_output=True): break else: if all(i.state == cluster_state for i in cluster_instances): From 8bda6ed4ac56dde20f3f881baf4cc502f8fd5345 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Fri, 6 Feb 2015 15:57:01 -0800 Subject: [PATCH 2/2] default to print SSH output --- ec2/spark_ec2.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index ec8116a9152b6..0de8846a440b1 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -676,7 +676,7 @@ def setup_spark_cluster(master, opts): print "Ganglia started at http://%s:5080/ganglia" % master -def is_ssh_available(host, opts, print_ssh_output=False): +def is_ssh_available(host, opts, print_ssh_output=True): """ Check if SSH is available on a host. """ @@ -704,12 +704,12 @@ def is_ssh_available(host, opts, print_ssh_output=False): return s.returncode == 0 -def is_cluster_ssh_available(cluster_instances, opts, print_ssh_output=False): +def is_cluster_ssh_available(cluster_instances, opts): """ Check if SSH is available on all the instances in a cluster. """ for i in cluster_instances: - if not is_ssh_available(host=i.ip_address, opts=opts, print_ssh_output=print_ssh_output): + if not is_ssh_available(host=i.ip_address, opts=opts): return False else: return True @@ -745,7 +745,7 @@ def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state): if all(i.state == 'running' for i in cluster_instances) and \ all(s.system_status.status == 'ok' for s in statuses) and \ all(s.instance_status.status == 'ok' for s in statuses) and \ - is_cluster_ssh_available(cluster_instances, opts, print_ssh_output=True): + is_cluster_ssh_available(cluster_instances, opts): break else: if all(i.state == cluster_state for i in cluster_instances):