Skip to content

Commit

Permalink
[SPARK-24111][SQL] Add the TPCDS v2.7 (latest) queries in TPCDSQueryB…
Browse files Browse the repository at this point in the history
…enchmark

## What changes were proposed in this pull request?
This pr added  the TPCDS v2.7 (latest) queries in `TPCDSQueryBenchmark`.
These query files have been added in `SPARK-23167`.

## How was this patch tested?
Manually checked.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21177 from maropu/AddTpcdsV2_7InBenchmark.
  • Loading branch information
maropu authored and gatorsmile committed May 2, 2018
1 parent 5be8aab commit e4c91c0
Showing 1 changed file with 35 additions and 17 deletions.
Expand Up @@ -58,10 +58,13 @@ object TPCDSQueryBenchmark extends Logging {
}.toMap
}

def tpcdsAll(dataLocation: String, queries: Seq[String]): Unit = {
val tableSizes = setupTables(dataLocation)
def runTpcdsQueries(
queryLocation: String,
queries: Seq[String],
tableSizes: Map[String, Long],
nameSuffix: String = ""): Unit = {
queries.foreach { name =>
val queryString = resourceToString(s"tpcds/$name.sql",
val queryString = resourceToString(s"$queryLocation/$name.sql",
classLoader = Thread.currentThread().getContextClassLoader)

// This is an indirect hack to estimate the size of each query's input by traversing the
Expand All @@ -78,7 +81,7 @@ object TPCDSQueryBenchmark extends Logging {
}
val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum
val benchmark = new Benchmark(s"TPCDS Snappy", numRows, 5)
benchmark.addCase(name) { i =>
benchmark.addCase(s"$name$nameSuffix") { _ =>
spark.sql(queryString).collect()
}
logInfo(s"\n\n===== TPCDS QUERY BENCHMARK OUTPUT FOR $name =====\n")
Expand All @@ -87,10 +90,20 @@ object TPCDSQueryBenchmark extends Logging {
}
}

def filterQueries(
origQueries: Seq[String],
args: TPCDSQueryBenchmarkArguments): Seq[String] = {
if (args.queryFilter.nonEmpty) {
origQueries.filter(args.queryFilter.contains)
} else {
origQueries
}
}

def main(args: Array[String]): Unit = {
val benchmarkArgs = new TPCDSQueryBenchmarkArguments(args)

// List of all TPC-DS queries
// List of all TPC-DS v1.4 queries
val tpcdsQueries = Seq(
"q1", "q2", "q3", "q4", "q5", "q6", "q7", "q8", "q9", "q10", "q11",
"q12", "q13", "q14a", "q14b", "q15", "q16", "q17", "q18", "q19", "q20",
Expand All @@ -103,20 +116,25 @@ object TPCDSQueryBenchmark extends Logging {
"q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90",
"q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99")

// This list only includes TPC-DS v2.7 queries that are different from v1.4 ones
val tpcdsQueriesV2_7 = Seq(
"q5a", "q6", "q10a", "q11", "q12", "q14", "q14a", "q18a",
"q20", "q22", "q22a", "q24", "q27a", "q34", "q35", "q35a", "q36a", "q47", "q49",
"q51a", "q57", "q64", "q67a", "q70a", "q72", "q74", "q75", "q77a", "q78",
"q80a", "q86a", "q98")

// If `--query-filter` defined, filters the queries that this option selects
val queriesToRun = if (benchmarkArgs.queryFilter.nonEmpty) {
val queries = tpcdsQueries.filter { case queryName =>
benchmarkArgs.queryFilter.contains(queryName)
}
if (queries.isEmpty) {
throw new RuntimeException(
s"Empty queries to run. Bad query name filter: ${benchmarkArgs.queryFilter}")
}
queries
} else {
tpcdsQueries
val queriesV1_4ToRun = filterQueries(tpcdsQueries, benchmarkArgs)
val queriesV2_7ToRun = filterQueries(tpcdsQueriesV2_7, benchmarkArgs)

if ((queriesV1_4ToRun ++ queriesV2_7ToRun).isEmpty) {
throw new RuntimeException(
s"Empty queries to run. Bad query name filter: ${benchmarkArgs.queryFilter}")
}

tpcdsAll(benchmarkArgs.dataLocation, queries = queriesToRun)
val tableSizes = setupTables(benchmarkArgs.dataLocation)
runTpcdsQueries(queryLocation = "tpcds", queries = queriesV1_4ToRun, tableSizes)
runTpcdsQueries(queryLocation = "tpcds-v2.7.0", queries = queriesV2_7ToRun, tableSizes,
nameSuffix = "-v2.7")
}
}

0 comments on commit e4c91c0

Please sign in to comment.