diff --git a/bin/run b/bin/run index 52a192ca..7d28227c 100755 --- a/bin/run +++ b/bin/run @@ -1,4 +1,6 @@ #!/bin/bash -ARGS="runMain com.databricks.spark.sql.perf.RunBenchmark $@" +# runs spark-sql-perf from the current directory + +ARGS="runBenchmark $@" build/sbt "$ARGS" \ No newline at end of file diff --git a/bin/spark-perf b/bin/spark-perf new file mode 100755 index 00000000..eade7247 --- /dev/null +++ b/bin/spark-perf @@ -0,0 +1,9 @@ +#!/bin/bash + +# runs the most recent published version of spark-sql-perf, from within the spark directory +# spark is compiled using SBT + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +VERSION=`git -C $DIR/../ tag | tail -n 1 | cut -c 2-` +ARGS="sparkPackage com.databricks:spark-sql-perf_2.10:$VERSION com.databricks.spark.sql.perf.RunBenchmark $@" +build/sbt "$ARGS" \ No newline at end of file diff --git a/build.sbt b/build.sbt index 23f31efa..68462243 100644 --- a/build.sbt +++ b/build.sbt @@ -52,6 +52,16 @@ dbcClusters += sys.env.getOrElse("DBC_USERNAME", sys.error("Please set DBC_USERN dbcLibraryPath := s"/Users/${sys.env.getOrElse("DBC_USERNAME", sys.error("Please set DBC_USERNAME"))}/lib" +val runBenchmark = inputKey[Unit]("runs a benchmark") + +runBenchmark := { + import complete.DefaultParsers._ + val args = spaceDelimited("[args]").parsed + val scalaRun = (runner in run).value + val classpath = (fullClasspath in Compile).value + scalaRun.run("com.databricks.spark.sql.perf.RunBenchmark", classpath.map(_.data), args, streams.value.log) +} + import ReleaseTransformations._ /** Push to the team directory instead of the user's homedir for releases. */ diff --git a/src/main/scala/com/databricks/spark/sql/perf/RunBenchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/RunBenchmark.scala index 40691679..ea557b06 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/RunBenchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/RunBenchmark.scala @@ -27,7 +27,8 @@ import scala.util.Try case class RunConfig( benchmarkName: String = null, filter: Option[String] = None, - iterations: Int = 3) + iterations: Int = 3, + baseline: Option[Long] = None) /** * Runs a benchmark locally and prints the results to the screen. @@ -46,6 +47,9 @@ object RunBenchmark { opt[Int]('i', "iterations") .action((x, c) => c.copy(iterations = x)) .text("the number of iterations to run") + opt[Long]('c', "compare") + .action((x, c) => c.copy(baseline = Some(x))) + .text("the timestamp of the baseline experiment to compare with") help("help") .text("prints this usage text") } @@ -96,6 +100,8 @@ object RunBenchmark { println("== STARTING EXPERIMENT ==") experiment.waitForFinish(1000 * 60 * 30) + + sqlContext.setConf("spark.sql.shuffle.partitions", "1") experiment.getCurrentRuns() .withColumn("result", explode($"results")) .select("result.*") @@ -106,7 +112,28 @@ object RunBenchmark { avg($"executionTime") as 'avgTimeMs, stddev($"executionTime") as 'stdDev) .orderBy("name") - .show() + .show(truncate = false) println(s"""Results: sqlContext.read.json("${experiment.resultPath}")""") + + config.baseline.foreach { baseTimestamp => + val baselineTime = when($"timestamp" === baseTimestamp, $"executionTime").otherwise(null) + val thisRunTime = when($"timestamp" === experiment.timestamp, $"executionTime").otherwise(null) + + val data = sqlContext.read.json(benchmark.resultsLocation) + .coalesce(1) + .where(s"timestamp IN ($baseTimestamp, ${experiment.timestamp})") + .withColumn("result", explode($"results")) + .select("timestamp", "result.*") + .groupBy("name") + .agg( + avg(baselineTime) as 'baselineTimeMs, + avg(thisRunTime) as 'thisRunTimeMs, + stddev(baselineTime) as 'stddev) + .withColumn( + "percentChange", ($"baselineTimeMs" - $"thisRunTimeMs") / $"baselineTimeMs" * 100) + .filter('thisRunTimeMs.isNotNull) + + data.show(truncate = false) + } } } \ No newline at end of file