Skip to content

Commit

Permalink
[SPARK-5122] Remove Shark from spark-ec2
Browse files Browse the repository at this point in the history
I moved the Spark-Shark version map [to the wiki](https://cwiki.apache.org/confluence/display/SPARK/Spark-Shark+version+mapping).

This PR has a [matching PR in mesos/spark-ec2](mesos/spark-ec2#89).

Author: Nicholas Chammas <nicholas.chammas@gmail.com>

Closes #3939 from nchammas/remove-shark and squashes the following commits:

66e0841 [Nicholas Chammas] fix style
ceeab85 [Nicholas Chammas] show default Spark GitHub repo
7270126 [Nicholas Chammas] validate Spark hashes
db4935d [Nicholas Chammas] validate spark version upfront
fc0d5b9 [Nicholas Chammas] remove Shark
  • Loading branch information
nchammas authored and Andrew Or committed Jan 9, 2015
1 parent 48cecf6 commit 167a5ab
Showing 1 changed file with 44 additions and 34 deletions.
78 changes: 44 additions & 34 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,26 @@
from optparse import OptionParser
from sys import stderr

VALID_SPARK_VERSIONS = set([
"0.7.3",
"0.8.0",
"0.8.1",
"0.9.0",
"0.9.1",
"0.9.2",
"1.0.0",
"1.0.1",
"1.0.2",
"1.1.0",
"1.1.1",
"1.2.0",
])

DEFAULT_SPARK_VERSION = "1.2.0"
DEFAULT_SPARK_GITHUB_REPO = "https://github.com/apache/spark"
SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))

MESOS_SPARK_EC2_BRANCH = "branch-1.3"

# A URL prefix from which to fetch AMI information
AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/{b}/ami-list".format(b=MESOS_SPARK_EC2_BRANCH)

Expand Down Expand Up @@ -126,8 +142,8 @@ def parse_args():
help="Version of Spark to use: 'X.Y.Z' or a specific git hash (default: %default)")
parser.add_option(
"--spark-git-repo",
default="https://github.com/apache/spark",
help="Github repo from which to checkout supplied commit hash")
default=DEFAULT_SPARK_GITHUB_REPO,
help="Github repo from which to checkout supplied commit hash (default: %default)")
parser.add_option(
"--hadoop-major-version", default="1",
help="Major version of Hadoop (default: %default)")
Expand Down Expand Up @@ -236,36 +252,33 @@ def get_or_make_group(conn, name, vpc_id):
return conn.create_security_group(name, "Spark EC2 group", vpc_id)


def get_validate_spark_version(version, repo):
if "." in version:
version = version.replace("v", "")
if version not in VALID_SPARK_VERSIONS:
print >> stderr, "Don't know about Spark version: {v}".format(v=version)
sys.exit(1)
return version
else:
github_commit_url = "{repo}/commit/{commit_hash}".format(repo=repo, commit_hash=version)
request = urllib2.Request(github_commit_url)
request.get_method = lambda: 'HEAD'
try:
response = urllib2.urlopen(request)
except urllib2.HTTPError, e:
print >> stderr, "Couldn't validate Spark commit: {url}".format(url=github_commit_url)
print >> stderr, "Received HTTP response code of {code}.".format(code=e.code)
sys.exit(1)
return version


# Check whether a given EC2 instance object is in a state we consider active,
# i.e. not terminating or terminated. We count both stopping and stopped as
# active since we can restart stopped clusters.
def is_active(instance):
return (instance.state in ['pending', 'running', 'stopping', 'stopped'])


# Return correct versions of Spark and Shark, given the supplied Spark version
def get_spark_shark_version(opts):
spark_shark_map = {
"0.7.3": "0.7.1",
"0.8.0": "0.8.0",
"0.8.1": "0.8.1",
"0.9.0": "0.9.0",
"0.9.1": "0.9.1",
# These are dummy versions (no Shark versions after this)
"1.0.0": "1.0.0",
"1.0.1": "1.0.1",
"1.0.2": "1.0.2",
"1.1.0": "1.1.0",
"1.1.1": "1.1.1",
"1.2.0": "1.2.0",
}
version = opts.spark_version.replace("v", "")
if version not in spark_shark_map:
print >> stderr, "Don't know about Spark version: %s" % version
sys.exit(1)
return (version, spark_shark_map[version])


# Attempt to resolve an appropriate AMI given the architecture and region of the request.
# Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/
# Last Updated: 2014-06-20
Expand Down Expand Up @@ -619,7 +632,7 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
print slave.public_dns_name
ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar)

modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs',
modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs',
'mapreduce', 'spark-standalone', 'tachyon']

if opts.hadoop_major_version == "1":
Expand Down Expand Up @@ -706,9 +719,7 @@ def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state):
sys.stdout.flush()

start_time = datetime.now()

num_attempts = 0
conn = ec2.connect_to_region(opts.region)

while True:
time.sleep(5 * num_attempts) # seconds
Expand Down Expand Up @@ -815,13 +826,11 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
cluster_url = "%s:7077" % active_master

if "." in opts.spark_version:
# Pre-built spark & shark deploy
(spark_v, shark_v) = get_spark_shark_version(opts)
# Pre-built Spark deploy
spark_v = get_validate_spark_version(opts.spark_version, opts.spark_git_repo)
else:
# Spark-only custom deploy
spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version)
shark_v = ""
modules = filter(lambda x: x != "shark", modules)

template_vars = {
"master_list": '\n'.join([i.public_dns_name for i in master_nodes]),
Expand All @@ -834,7 +843,6 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
"swap": str(opts.swap),
"modules": '\n'.join(modules),
"spark_version": spark_v,
"shark_version": shark_v,
"hadoop_major_version": opts.hadoop_major_version,
"spark_worker_instances": "%d" % opts.worker_instances,
"spark_master_opts": opts.master_opts
Expand Down Expand Up @@ -983,6 +991,8 @@ def real_main():
(opts, action, cluster_name) = parse_args()

# Input parameter validation
get_validate_spark_version(opts.spark_version, opts.spark_git_repo)

if opts.wait is not None:
# NOTE: DeprecationWarnings are silent in 2.7+ by default.
# To show them, run Python with the -Wdefault switch.
Expand Down

0 comments on commit 167a5ab

Please sign in to comment.