From e0ec3fad1063a54feabfb9a1a8a914cd17f5bc8c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 20 Aug 2014 23:05:50 +0000 Subject: [PATCH] More updated to config and run scripts for streaming tests --- config/config.py.template | 31 +++++++++++++++++++------------ lib/sparkperf/testsuites.py | 6 ++++-- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/config/config.py.template b/config/config.py.template index 94de287..3c28781 100755 --- a/config/config.py.template +++ b/config/config.py.template @@ -24,7 +24,8 @@ SPARK_CLUSTER_URL = open("/root/spark-ec2/cluster-url", 'r').readline().strip() # copy configurations from your existing Spark install. USE_CLUSTER_SPARK = True -HDFS_URL = os.getenv("HDFS_URL") +# URL of the HDFS installation in the Spark EC2 cluster +HDFS_URL = "hdfs://%s:9000/test/" % socket.gethostname() # HDFS_URL = "%s/test/" % os.getcwd() ############## If not using existing Spark installation ############ @@ -219,8 +220,14 @@ STREAMING_COMMON_OPTS = [ OptionSet("hdfs-url", [HDFS_URL]), ] -STREAMING_COMMON_JAVA_OPTS = COMMON_JAVA_OPTS +STREAMING_COMMON_JAVA_OPTS = [ + # Fraction of JVM memory used for caching RDDs. + JavaOptionSet("spark.storage.memoryFraction", [0.66]), + JavaOptionSet("spark.serializer", ["org.apache.spark.serializer.JavaSerializer"]), + JavaOptionSet("spark.executor.memory", ["9g"]), + JavaOptionSet("spark.executor.extraJavaOptions", [" -XX:+UseConcMarkSweepGC "]) +] STREAMING_KEY_VAL_TEST_OPTS = STREAMING_COMMON_OPTS + streaming_batch_duration_opts(2000) + [ # Number of input streams. OptionSet("num-streams", [1], can_scale=True), @@ -242,20 +249,20 @@ STREAMING_HDFS_RECOVERY_TEST_OPTS = STREAMING_COMMON_OPTS + streaming_batch_dura ] # this test is just to see if everything is setup properly -STREAMING_TESTS += [("basic", "streaming.perf.TestRunner basic", - SCALE_FACTOR, STREAMING_COMMON_JAVA_OPTS, STREAMING_COMMON_OPTS + streaming_batch_duration_opts(1000))] +STREAMING_TESTS += [("basic", "streaming.perf.TestRunner", SCALE_FACTOR, + STREAMING_COMMON_JAVA_OPTS, [ConstantOption("basic")] + STREAMING_COMMON_OPTS + streaming_batch_duration_opts(1000))] -STREAMING_TESTS += [("state-by-key", "streaming.perf.TestRunner state-by-key", - SCALE_FACTOR, STREAMING_COMMON_JAVA_OPTS, STREAMING_KEY_VAL_TEST_OPTS)] +STREAMING_TESTS += [("state-by-key", "streaming.perf.TestRunner", SCALE_FACTOR, + STREAMING_COMMON_JAVA_OPTS, [ConstantOption("state-by-key")] + STREAMING_KEY_VAL_TEST_OPTS)] -STREAMING_TESTS += [("group-by-key-and-window", "streaming.perf.TestRunner group-by-key-and-window", - SCALE_FACTOR, STREAMING_COMMON_JAVA_OPTS, STREAMING_KEY_VAL_TEST_OPTS + streaming_window_duration_opts(10000) )] +STREAMING_TESTS += [("group-by-key-and-window", "streaming.perf.TestRunner", SCALE_FACTOR, + STREAMING_COMMON_JAVA_OPTS, [ConstantOption("group-by-key-and-window")] + STREAMING_KEY_VAL_TEST_OPTS + streaming_window_duration_opts(10000) )] -STREAMING_TESTS += [("reduce-by-key-and-window", "streaming.perf.TestRunner reduce-by-key-and-window", - SCALE_FACTOR, STREAMING_COMMON_JAVA_OPTS, STREAMING_KEY_VAL_TEST_OPTS + streaming_window_duration_opts(10000) )] +STREAMING_TESTS += [("reduce-by-key-and-window", "streaming.perf.TestRunner", SCALE_FACTOR, + STREAMING_COMMON_JAVA_OPTS, [ConstantOption("reduce-by-key-and-window")] + STREAMING_KEY_VAL_TEST_OPTS + streaming_window_duration_opts(10000) )] -STREAMING_TESTS += [("hdfs-recovery", "streaming.perf.TestRunner hdfs-recovery", - SCALE_FACTOR, STREAMING_COMMON_JAVA_OPTS, STREAMING_HDFS_RECOVERY_TEST_OPTS)] + STREAMING_TESTS += [("hdfs-recovery", "streaming.perf.TestRunner hdfs-recovery", SCALE_FACTOR, + STREAMING_COMMON_JAVA_OPTS, [ConstantOption("hdfs-recovery")] + STREAMING_HDFS_RECOVERY_TEST_OPTS)] diff --git a/lib/sparkperf/testsuites.py b/lib/sparkperf/testsuites.py index fe9c575..a07ea15 100644 --- a/lib/sparkperf/testsuites.py +++ b/lib/sparkperf/testsuites.py @@ -66,10 +66,12 @@ def run_tests(cls, cluster, config, tests_to_run, test_group_name, output_filena cluster.ensure_spark_stopped_on_slaves() append_config_to_file(stdout_filename, java_opt_list, opt_list) append_config_to_file(stderr_filename, java_opt_list, opt_list) - test_env["SPARK_SUBMIT_OPTS"] = " ".join(java_opt_list) + java_opts_str = " ".join(java_opt_list) cmd = cls.get_spark_submit_cmd(cluster, config, main_class_or_script, opt_list, stdout_filename, stderr_filename) - print("\nRunning command: %s\n" % cmd) + print("\nSetting env var SPARK_SUBMIT_OPTS: %s" % java_opts_str) + test_env["SPARK_SUBMIT_OPTS"] = java_opts_str + print("Running command: %s\n" % cmd) Popen(cmd, shell=True, env=test_env).wait() result_string = cls.process_output(config, short_name, opt_list, stdout_filename, stderr_filename)