From 45816cc5e58c121152a38873573c4204340597ce Mon Sep 17 00:00:00 2001 From: Mike Jennings Date: Wed, 17 Dec 2014 19:29:06 -0800 Subject: [PATCH] Manual revert of d12c0711faa3d4333513fcbbbee4868bcb784a26 --- docs/ec2-scripts.md | 19 ------------- ec2/spark_ec2.py | 66 +++++++++++---------------------------------- 2 files changed, 15 insertions(+), 70 deletions(-) diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md index d50f445d7ecc7..ed51d0abb3a45 100644 --- a/docs/ec2-scripts.md +++ b/docs/ec2-scripts.md @@ -94,25 +94,6 @@ another. permissions on your private key file, you can run `launch` with the `--resume` option to restart the setup process on an existing cluster. -# Launching a Cluster in a VPC - -- Run - `./spark-ec2 -k -i -s --vpc-id= --subnet-id= launch `, - where `` is the name of your EC2 key pair (that you gave it - when you created it), `` is the private key file for your - key pair, `` is the number of slave nodes to launch (try - 1 at first), `` is the name of your VPC, `` is the - name of your subnet, and `` is the name to give to your - cluster. - - For example: - - ```bash - export AWS_SECRET_ACCESS_KEY=AaBbCcDdEeFGgHhIiJjKkLlMmNnOoPpQqRrSsTtU -export AWS_ACCESS_KEY_ID=ABCDEFG1234567890123 -./spark-ec2 --key-pair=awskey --identity-file=awskey.pem --region=us-west-1 --zone=us-west-1a --vpc-id=vpc-a28d24c7 --subnet-id=subnet-4eb27b39 --spark-version=1.1.0 launch my-spark-cluster - ``` - # Running Applications - Go into the `ec2` directory in the release of Spark you downloaded. diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 4e8f5c1f44041..e8754d485eba6 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -166,10 +166,6 @@ def parse_args(): parser.add_option( "--copy-aws-credentials", action="store_true", default=False, help="Add AWS credentials to hadoop configuration to allow Spark to access S3") - parser.add_option( - "--subnet-id", default=None, help="VPC subnet to launch instances in") - parser.add_option( - "--vpc-id", default=None, help="VPC to launch instances in") (opts, args) = parser.parse_args() if len(args) != 2: @@ -194,14 +190,14 @@ def parse_args(): # Get the EC2 security group of the given name, creating it if it doesn't exist -def get_or_make_group(conn, name, vpc_id): +def get_or_make_group(conn, name): groups = conn.get_all_security_groups() group = [g for g in groups if g.name == name] if len(group) > 0: return group[0] else: print "Creating security group " + name - return conn.create_security_group(name, "Spark EC2 group", vpc_id) + return conn.create_security_group(name, "Spark EC2 group") # Check whether a given EC2 instance object is in a state we consider active, @@ -311,26 +307,12 @@ def launch_cluster(conn, opts, cluster_name): user_data_content = user_data_file.read() print "Setting up security groups..." - master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id) - slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id) + master_group = get_or_make_group(conn, cluster_name + "-master") + slave_group = get_or_make_group(conn, cluster_name + "-slaves") authorized_address = opts.authorized_address if master_group.rules == []: # Group was just now created - if opts.vpc_id is None: - master_group.authorize(src_group=master_group) - master_group.authorize(src_group=slave_group) - else: - master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=master_group) - master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=master_group) - master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=master_group) - master_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=slave_group) - master_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=slave_group) - master_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=slave_group) + master_group.authorize(src_group=master_group) + master_group.authorize(src_group=slave_group) master_group.authorize('tcp', 22, 22, authorized_address) master_group.authorize('tcp', 8080, 8081, authorized_address) master_group.authorize('tcp', 18080, 18080, authorized_address) @@ -342,22 +324,8 @@ def launch_cluster(conn, opts, cluster_name): if opts.ganglia: master_group.authorize('tcp', 5080, 5080, authorized_address) if slave_group.rules == []: # Group was just now created - if opts.vpc_id is None: - slave_group.authorize(src_group=master_group) - slave_group.authorize(src_group=slave_group) - else: - slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=master_group) - slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=master_group) - slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=master_group) - slave_group.authorize(ip_protocol='icmp', from_port=-1, to_port=-1, - src_group=slave_group) - slave_group.authorize(ip_protocol='tcp', from_port=0, to_port=65535, - src_group=slave_group) - slave_group.authorize(ip_protocol='udp', from_port=0, to_port=65535, - src_group=slave_group) + slave_group.authorize(src_group=master_group) + slave_group.authorize(src_group=slave_group) slave_group.authorize('tcp', 22, 22, authorized_address) slave_group.authorize('tcp', 8080, 8081, authorized_address) slave_group.authorize('tcp', 50060, 50060, authorized_address) @@ -377,12 +345,11 @@ def launch_cluster(conn, opts, cluster_name): if opts.ami is None: opts.ami = get_spark_ami(opts) - # we use group ids to work around https://github.com/boto/boto/issues/350 - additional_group_ids = [] + additional_groups = [] if opts.additional_security_group: - additional_group_ids = [sg.id - for sg in conn.get_all_security_groups() - if opts.additional_security_group in (sg.name, sg.id)] + additional_groups = [sg + for sg in conn.get_all_security_groups() + if opts.additional_security_group in (sg.name, sg.id)] print "Launching instances..." try: @@ -429,10 +396,9 @@ def launch_cluster(conn, opts, cluster_name): placement=zone, count=num_slaves_this_zone, key_name=opts.key_pair, - security_group_ids=[slave_group.id] + additional_group_ids, + security_groups=[slave_group] + additional_groups, instance_type=opts.instance_type, block_device_map=block_map, - subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content) my_req_ids += [req.id for req in slave_reqs] @@ -480,13 +446,12 @@ def launch_cluster(conn, opts, cluster_name): num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) if num_slaves_this_zone > 0: slave_res = image.run(key_name=opts.key_pair, - security_group_ids=[slave_group.id] + additional_group_ids, + security_groups=[slave_group] + additional_groups, instance_type=opts.instance_type, placement=zone, min_count=num_slaves_this_zone, max_count=num_slaves_this_zone, block_device_map=block_map, - subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content) slave_nodes += slave_res.instances @@ -508,13 +473,12 @@ def launch_cluster(conn, opts, cluster_name): if opts.zone == 'all': opts.zone = random.choice(conn.get_all_zones()).name master_res = image.run(key_name=opts.key_pair, - security_group_ids=[master_group.id] + additional_group_ids, + security_groups=[master_group] + additional_groups, instance_type=master_type, placement=opts.zone, min_count=1, max_count=1, block_device_map=block_map, - subnet_id=opts.subnet_id, placement_group=opts.placement_group, user_data=user_data_content)