From c00460430d03f70aac669153371e1f687e5e81bc Mon Sep 17 00:00:00 2001 From: Michelangelo D'Agostino Date: Sat, 28 Mar 2015 12:29:47 -0500 Subject: [PATCH 1/3] ENH: Add --private-ips flag. --- ec2/spark_ec2.py | 59 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 879a52cef8ff0..1f826d01fd982 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -282,6 +282,10 @@ def parse_args(): parser.add_option( "--vpc-id", default=None, help="VPC to launch instances in") + parser.add_option( + "--private-ips", action="store_true", default=False, + help="Use private IP's for instances rather than public if VPC/subnet " + + "requires that.") (opts, args) = parser.parse_args() if len(args) != 2: @@ -707,7 +711,8 @@ def get_instances(group_names): # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): - master = master_nodes[0].public_dns_name + master = master_nodes[0].public_dns_name if not opts.private_ips else \ + master_nodes[0].private_ip_address if deploy_ssh_key: print "Generating cluster's SSH key on master..." key_setup = """ @@ -719,8 +724,10 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) print "Transferring cluster's SSH key to slaves..." for slave in slave_nodes: - print slave.public_dns_name - ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar) + slave_address = slave.public_dns_name if not opts.private_ips else \ + slave.private_ip_address + print slave_address + ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar) modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs', 'mapreduce', 'spark-standalone', 'tachyon'] @@ -809,7 +816,8 @@ def is_cluster_ssh_available(cluster_instances, opts): Check if SSH is available on all the instances in a cluster. """ for i in cluster_instances: - if not is_ssh_available(host=i.public_dns_name, opts=opts): + dns_name = i.public_dns_name if not opts.private_ips else i.private_ip_address + if not is_ssh_available(host=dns_name, opts=opts): return False else: return True @@ -923,7 +931,8 @@ def get_num_disks(instance_type): # # root_dir should be an absolute path to the directory with the files we want to deploy. def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): - active_master = master_nodes[0].public_dns_name + active_master = master_nodes[0].public_dns_name if not opts.private_ips else \ + master_nodes[0].private_ip_address num_disks = get_num_disks(opts.instance_type) hdfs_data_dirs = "/mnt/ephemeral-hdfs/data" @@ -948,10 +957,14 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): print "Deploying Spark via git hash; Tachyon won't be set up" modules = filter(lambda x: x != "tachyon", modules) + master_addresses = [i.public_dns_name if not opts.private_ips else \ + i.private_ip_address for i in master_nodes] + slave_addresses = [i.public_dns_name if not opts.private_ips else \ + i.private_ip_address for i in slave_nodes] template_vars = { - "master_list": '\n'.join([i.public_dns_name for i in master_nodes]), + "master_list": '\n'.join(master_addresses), "active_master": active_master, - "slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]), + "slave_list": '\n'.join(slave_addresses), "cluster_url": cluster_url, "hdfs_data_dirs": hdfs_data_dirs, "mapred_local_dirs": mapred_local_dirs, @@ -1011,7 +1024,8 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): # # root_dir should be an absolute path. def deploy_user_files(root_dir, opts, master_nodes): - active_master = master_nodes[0].public_dns_name + active_master = master_nodes[0].public_dns_name if not opts.private_ips else \ + master_nodes[0].private_ip_address command = [ 'rsync', '-rv', '-e', stringify_command(ssh_command(opts)), @@ -1230,7 +1244,8 @@ def real_main(): if any(master_nodes + slave_nodes): print "The following instances will be terminated:" for inst in master_nodes + slave_nodes: - print "> %s" % inst.public_dns_name + print "> %s" % inst.public_dns_name if not opts.private_ips else \ + inst.private_ip_address print "ALL DATA ON ALL NODES WILL BE LOST!!" msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name) @@ -1294,13 +1309,18 @@ def real_main(): elif action == "login": (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - master = master_nodes[0].public_dns_name - print "Logging into master " + master + "..." - proxy_opt = [] - if opts.proxy_port is not None: - proxy_opt = ['-D', opts.proxy_port] - subprocess.check_call( - ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)]) + if not master_nodes[0].public_dns_name and not opts.private_ips: + print "Master has no public DNS name. Maybe you meant to specify " \ + "--private-ips?" + else: + master = master_nodes[0].public_dns_name if not opts.private_ips else \ + master_nodes[0].private_ip_address + print "Logging into master " + master + "..." + proxy_opt = [] + if opts.proxy_port is not None: + proxy_opt = ['-D', opts.proxy_port] + subprocess.check_call( + ssh_command(opts) + proxy_opt + ['-t', '-t', "%s@%s" % (opts.user, master)]) elif action == "reboot-slaves": response = raw_input( @@ -1318,7 +1338,12 @@ def real_main(): elif action == "get-master": (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name) - print master_nodes[0].public_dns_name + if not master_nodes[0].public_dns_name and not opts.private_ips: + print "Master has no public DNS name. Maybe you meant to specify " \ + "--private-ips?" + else: + print master_nodes[0].public_dns_name if not opts.private_ips else \ + master_nodes[0].private_ip_address elif action == "stop": response = raw_input( From a4a2eaca3d67e5d59ca9c62b1375aa859d09aa17 Mon Sep 17 00:00:00 2001 From: Michelangelo D'Agostino Date: Mon, 6 Apr 2015 08:08:12 -0500 Subject: [PATCH 2/3] ENH: Fix IP's typo and refactor conditional logic into functions. --- ec2/spark_ec2.py | 42 ++++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 1f826d01fd982..da289743cdbe4 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -284,7 +284,7 @@ def parse_args(): help="VPC to launch instances in") parser.add_option( "--private-ips", action="store_true", default=False, - help="Use private IP's for instances rather than public if VPC/subnet " + + help="Use private IPs for instances rather than public if VPC/subnet " + "requires that.") (opts, args) = parser.parse_args() @@ -711,8 +711,7 @@ def get_instances(group_names): # Deploy configuration files and run setup scripts on a newly launched # or started EC2 cluster. def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): - master = master_nodes[0].public_dns_name if not opts.private_ips else \ - master_nodes[0].private_ip_address + master = get_dns_name(master_nodes[0], opts.private_ips) if deploy_ssh_key: print "Generating cluster's SSH key on master..." key_setup = """ @@ -724,8 +723,7 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key): dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh']) print "Transferring cluster's SSH key to slaves..." for slave in slave_nodes: - slave_address = slave.public_dns_name if not opts.private_ips else \ - slave.private_ip_address + slave_address = get_dns_name(slave, opts.private_ips) print slave_address ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar) @@ -816,7 +814,7 @@ def is_cluster_ssh_available(cluster_instances, opts): Check if SSH is available on all the instances in a cluster. """ for i in cluster_instances: - dns_name = i.public_dns_name if not opts.private_ips else i.private_ip_address + dns_name = get_dns_name(i, opts.private_ips) if not is_ssh_available(host=dns_name, opts=opts): return False else: @@ -931,8 +929,7 @@ def get_num_disks(instance_type): # # root_dir should be an absolute path to the directory with the files we want to deploy. def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): - active_master = master_nodes[0].public_dns_name if not opts.private_ips else \ - master_nodes[0].private_ip_address + active_master = get_dns_name(master_nodes[0], opts.private_ips) num_disks = get_num_disks(opts.instance_type) hdfs_data_dirs = "/mnt/ephemeral-hdfs/data" @@ -957,10 +954,8 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): print "Deploying Spark via git hash; Tachyon won't be set up" modules = filter(lambda x: x != "tachyon", modules) - master_addresses = [i.public_dns_name if not opts.private_ips else \ - i.private_ip_address for i in master_nodes] - slave_addresses = [i.public_dns_name if not opts.private_ips else \ - i.private_ip_address for i in slave_nodes] + master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes] + slave_addresses = [get_dns_name(i, opts.private_ips) for i in slave_nodes] template_vars = { "master_list": '\n'.join(master_addresses), "active_master": active_master, @@ -1024,8 +1019,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules): # # root_dir should be an absolute path. def deploy_user_files(root_dir, opts, master_nodes): - active_master = master_nodes[0].public_dns_name if not opts.private_ips else \ - master_nodes[0].private_ip_address + active_master = get_dns_name(master_nodes[0], opts.private_ips) command = [ 'rsync', '-rv', '-e', stringify_command(ssh_command(opts)), @@ -1135,6 +1129,17 @@ def get_partition(total, num_partitions, current_partitions): num_slaves_this_zone += 1 return num_slaves_this_zone +# Gets the IP address, taking into account the --private-ips flag +def get_ip_address(instance, private_ips=False): + ip = instance.ip_address if not private_ips else \ + instance.private_ip_address + return ip + +# Gets the DNS name, taking into account the --private-ips flag +def get_dns_name(instance, private_ips=False): + dns = instance.public_dns_name if not private_ips else \ + instance.private_ip_address + return dns def real_main(): (opts, action, cluster_name) = parse_args() @@ -1244,8 +1249,7 @@ def real_main(): if any(master_nodes + slave_nodes): print "The following instances will be terminated:" for inst in master_nodes + slave_nodes: - print "> %s" % inst.public_dns_name if not opts.private_ips else \ - inst.private_ip_address + print "> %s" % get_dns_name(inst, opts.private_ips) print "ALL DATA ON ALL NODES WILL BE LOST!!" msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name) @@ -1313,8 +1317,7 @@ def real_main(): print "Master has no public DNS name. Maybe you meant to specify " \ "--private-ips?" else: - master = master_nodes[0].public_dns_name if not opts.private_ips else \ - master_nodes[0].private_ip_address + master = get_dns_name(master_nodes[0], opts.private_ips) print "Logging into master " + master + "..." proxy_opt = [] if opts.proxy_port is not None: @@ -1342,8 +1345,7 @@ def real_main(): print "Master has no public DNS name. Maybe you meant to specify " \ "--private-ips?" else: - print master_nodes[0].public_dns_name if not opts.private_ips else \ - master_nodes[0].private_ip_address + print get_dns_name(master_nodes[0], opts.private_ips) elif action == "stop": response = raw_input( From b684c67fc622a6996a71d546de92e1a6ffbe6f49 Mon Sep 17 00:00:00 2001 From: Michelangelo D'Agostino Date: Wed, 8 Apr 2015 08:40:08 -0500 Subject: [PATCH 3/3] STY: A few python lint changes. --- ec2/spark_ec2.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index da289743cdbe4..0c1f24761d0de 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -1129,18 +1129,21 @@ def get_partition(total, num_partitions, current_partitions): num_slaves_this_zone += 1 return num_slaves_this_zone + # Gets the IP address, taking into account the --private-ips flag def get_ip_address(instance, private_ips=False): ip = instance.ip_address if not private_ips else \ - instance.private_ip_address + instance.private_ip_address return ip + # Gets the DNS name, taking into account the --private-ips flag def get_dns_name(instance, private_ips=False): dns = instance.public_dns_name if not private_ips else \ - instance.private_ip_address + instance.private_ip_address return dns + def real_main(): (opts, action, cluster_name) = parse_args()