Skip to content

Commit

Permalink
More updated to config and run scripts for streaming tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Aug 20, 2014
1 parent 74856df commit e0ec3fa
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 14 deletions.
31 changes: 19 additions & 12 deletions config/config.py.template
Expand Up @@ -24,7 +24,8 @@ SPARK_CLUSTER_URL = open("/root/spark-ec2/cluster-url", 'r').readline().strip()
# copy configurations from your existing Spark install.
USE_CLUSTER_SPARK = True

HDFS_URL = os.getenv("HDFS_URL")
# URL of the HDFS installation in the Spark EC2 cluster
HDFS_URL = "hdfs://%s:9000/test/" % socket.gethostname()
# HDFS_URL = "%s/test/" % os.getcwd()

############## If not using existing Spark installation ############
Expand Down Expand Up @@ -219,8 +220,14 @@ STREAMING_COMMON_OPTS = [
OptionSet("hdfs-url", [HDFS_URL]),
]

STREAMING_COMMON_JAVA_OPTS = COMMON_JAVA_OPTS

STREAMING_COMMON_JAVA_OPTS = [
# Fraction of JVM memory used for caching RDDs.
JavaOptionSet("spark.storage.memoryFraction", [0.66]),
JavaOptionSet("spark.serializer", ["org.apache.spark.serializer.JavaSerializer"]),
JavaOptionSet("spark.executor.memory", ["9g"]),
JavaOptionSet("spark.executor.extraJavaOptions", [" -XX:+UseConcMarkSweepGC "])
]
STREAMING_KEY_VAL_TEST_OPTS = STREAMING_COMMON_OPTS + streaming_batch_duration_opts(2000) + [
# Number of input streams.
OptionSet("num-streams", [1], can_scale=True),
Expand All @@ -242,20 +249,20 @@ STREAMING_HDFS_RECOVERY_TEST_OPTS = STREAMING_COMMON_OPTS + streaming_batch_dura
]

# this test is just to see if everything is setup properly
STREAMING_TESTS += [("basic", "streaming.perf.TestRunner basic",
SCALE_FACTOR, STREAMING_COMMON_JAVA_OPTS, STREAMING_COMMON_OPTS + streaming_batch_duration_opts(1000))]
STREAMING_TESTS += [("basic", "streaming.perf.TestRunner", SCALE_FACTOR,
STREAMING_COMMON_JAVA_OPTS, [ConstantOption("basic")] + STREAMING_COMMON_OPTS + streaming_batch_duration_opts(1000))]

STREAMING_TESTS += [("state-by-key", "streaming.perf.TestRunner state-by-key",
SCALE_FACTOR, STREAMING_COMMON_JAVA_OPTS, STREAMING_KEY_VAL_TEST_OPTS)]
STREAMING_TESTS += [("state-by-key", "streaming.perf.TestRunner", SCALE_FACTOR,
STREAMING_COMMON_JAVA_OPTS, [ConstantOption("state-by-key")] + STREAMING_KEY_VAL_TEST_OPTS)]

STREAMING_TESTS += [("group-by-key-and-window", "streaming.perf.TestRunner group-by-key-and-window",
SCALE_FACTOR, STREAMING_COMMON_JAVA_OPTS, STREAMING_KEY_VAL_TEST_OPTS + streaming_window_duration_opts(10000) )]
STREAMING_TESTS += [("group-by-key-and-window", "streaming.perf.TestRunner", SCALE_FACTOR,
STREAMING_COMMON_JAVA_OPTS, [ConstantOption("group-by-key-and-window")] + STREAMING_KEY_VAL_TEST_OPTS + streaming_window_duration_opts(10000) )]

STREAMING_TESTS += [("reduce-by-key-and-window", "streaming.perf.TestRunner reduce-by-key-and-window",
SCALE_FACTOR, STREAMING_COMMON_JAVA_OPTS, STREAMING_KEY_VAL_TEST_OPTS + streaming_window_duration_opts(10000) )]
STREAMING_TESTS += [("reduce-by-key-and-window", "streaming.perf.TestRunner", SCALE_FACTOR,
STREAMING_COMMON_JAVA_OPTS, [ConstantOption("reduce-by-key-and-window")] + STREAMING_KEY_VAL_TEST_OPTS + streaming_window_duration_opts(10000) )]

STREAMING_TESTS += [("hdfs-recovery", "streaming.perf.TestRunner hdfs-recovery",
SCALE_FACTOR, STREAMING_COMMON_JAVA_OPTS, STREAMING_HDFS_RECOVERY_TEST_OPTS)]
STREAMING_TESTS += [("hdfs-recovery", "streaming.perf.TestRunner hdfs-recovery", SCALE_FACTOR,
STREAMING_COMMON_JAVA_OPTS, [ConstantOption("hdfs-recovery")] + STREAMING_HDFS_RECOVERY_TEST_OPTS)]



Expand Down
6 changes: 4 additions & 2 deletions lib/sparkperf/testsuites.py
Expand Up @@ -66,10 +66,12 @@ def run_tests(cls, cluster, config, tests_to_run, test_group_name, output_filena
cluster.ensure_spark_stopped_on_slaves()
append_config_to_file(stdout_filename, java_opt_list, opt_list)
append_config_to_file(stderr_filename, java_opt_list, opt_list)
test_env["SPARK_SUBMIT_OPTS"] = " ".join(java_opt_list)
java_opts_str = " ".join(java_opt_list)
cmd = cls.get_spark_submit_cmd(cluster, config, main_class_or_script, opt_list,
stdout_filename, stderr_filename)
print("\nRunning command: %s\n" % cmd)
print("\nSetting env var SPARK_SUBMIT_OPTS: %s" % java_opts_str)
test_env["SPARK_SUBMIT_OPTS"] = java_opts_str
print("Running command: %s\n" % cmd)
Popen(cmd, shell=True, env=test_env).wait()
result_string = cls.process_output(config, short_name, opt_list,
stdout_filename, stderr_filename)
Expand Down

0 comments on commit e0ec3fa

Please sign in to comment.