Skip to content

Commit

Permalink
[KYUUBI #1743] Fix parallelism of DataGenerator and other enhancements
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

The parallelism of DataGenerator always is `spark.sparkContext.defaultParallelism`, it does not make sense for generating large scale data.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1743 from pan3793/tpcds.

Closes #1743

62f7c86 [Cheng Pan] nit
fdcf832 [Cheng Pan] nit
a52ff48 [Cheng Pan] Fix parallelism of DataGenerator and other enhancements

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
pan3793 committed Jan 13, 2022
1 parent ab4184d commit 01f0ea8
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 98 deletions.
33 changes: 18 additions & 15 deletions dev/kyuubi-tpcds/README.md
Expand Up @@ -16,57 +16,60 @@
-->

# Introduction
This module includes tpcds data generator and benchmark.
This module includes TPC-DS data generator and benchmark tool.

# How to use

package jar with following command:
`./build/mvn install -DskipTests -Ptpcds -pl dev/kyuubi-tpcds -am`
`./build/mvn clean package -Ptpcds -pl dev/kyuubi-tpcds -am`

## data generator
## Data Generator

Support options:

| key | default | description |
|-------------|---------|------------------------------|
| db | default | the databases to write data |
| scaleFactor | 1 | the scale factor of tpcds |
| key | default | description |
|--------------|-----------------|-----------------------------------|
| db | default | the database to write data |
| scaleFactor | 1 | the scale factor of TPC-DS |
| format | parquet | the format of table to store data |
| parallel | scaleFactor * 2 | the parallelism of Spark job |

Example: the following command to generate 10GB data with new database `tpcds_sf10`.

```shell
$SPARK_HOME/bin/spark-submit \
--class org.apache.kyuubi.tpcds.DataGenerator \
kyuubi-tpcds-*.jar --db tpcds_sf10 --scaleFactor 10
kyuubi-tpcds_*.jar \
--db tpcds_sf10 --scaleFactor 10 --format parquet --parallel 20
```

## do benchmark
## Benchmark Tool

Support options:

| key | default | description |
|------------|----------------------|--------------------------------------------------------|
| db | none(required) | the tpcds database |
| db | none(required) | the TPC-DS database |
| benchmark | tpcds-v2.4-benchmark | the name of application |
| iterations | 3 | the number of iterations to run |
| filter | a | filter on the name of the queries to run, e.g. q1-v2.4 |

Example: the following command to benchmark tpcds sf10 with exists database `tpcds_sf10`.
Example: the following command to benchmark TPC-DS sf10 with exists database `tpcds_sf10`.

```shell
$SPARK_HOME/bin/spark-submit \
--class org.apache.kyuubi.tpcds.benchmark.RunBenchmark \
kyuubi-tpcds-*.jar --db tpcds_sf10
kyuubi-tpcds_*.jar --db tpcds_sf10
```

We also support run one of the tpcds query:
We also support run one of the TPC-DS query:
```shell
$SPARK_HOME/bin/spark-submit \
--class org.apache.kyuubi.tpcds.benchmark.RunBenchmark \
kyuubi-tpcds-*.jar --db tpcds_sf10 --filter q1-v2.4
kyuubi-tpcds_*.jar --db tpcds_sf10 --filter q1-v2.4
```

The result of tpcds benchmark like:
The result of TPC-DS benchmark like:

| name | minTimeMs | maxTimeMs | avgTimeMs | stdDev | stdDevPercent |
|---------|-----------|-------------|------------|----------|----------------|
Expand Down
Expand Up @@ -17,32 +17,29 @@

package org.apache.kyuubi.tpcds

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory

case class RunConfig(
db: String = "default",
scaleFactor: Int = 1)

/**
* Usage:
* <p>
* Run following command to generate 10GB data with new database `tpcds_sf10`.
* {{{
* `$SPARK_HOME/bin/spark-submit \
* --conf spark.sql.tpcds.scale.factor=10 \
* --conf spark.sql.tpcds.database=tpcds_sf10 \
* $SPARK_HOME/bin/spark-submit \
* --class org.apache.kyuubi.tpcds.DataGenerator \
* kyuubi-tpcds-*.jar`
* kyuubi-tpcds_*.jar \
* --db tpcds_sf10 --scaleFactor 10 --format parquet --parallel 20
* }}}
*/
object DataGenerator {
private val logger =
LoggerFactory.getLogger(this.getClass.getSimpleName.stripSuffix("$"))

val SCALE_FACTOR_KEY = "spark.sql.tpcds.scale.factor"
val DATABASE_KEY = "spark.sql.tpcds.database"
case class Config(
db: String = "default",
scaleFactor: Int = 1,
format: String = "parquet",
parallel: Option[Int] = None)

private val logger = LoggerFactory.getLogger(this.getClass.getSimpleName.stripSuffix("$"))

def initTable(spark: SparkSession): Seq[TableGenerator] = {
import spark.implicits._
Expand Down Expand Up @@ -597,52 +594,50 @@ object DataGenerator {
promotion)
}

def run(config: RunConfig): Unit = {
val conf = new SparkConf()
val db = conf.get(DATABASE_KEY, config.db)
val scaleFactor = conf.get(DataGenerator.SCALE_FACTOR_KEY, config.scaleFactor.toString).toInt

def run(config: Config): Unit = {
val spark = SparkSession.builder()
.appName(s"Kyuubi TPCDS Generation - ${scaleFactor}GB")
.config(conf)
.appName(s"Kyuubi TPC-DS Generation - ${config.scaleFactor}GB")
.enableHiveSupport()
.getOrCreate()

logger.info(s"Generating TPCDS tables under database $db")
spark.sql(s"CREATE DATABASE IF NOT EXISTS $db")
spark.sql(s"USE $db")
spark.sql(s"DESC DATABASE $db").show()

val parallel = spark.sparkContext.defaultParallelism
logger.info(s"Generating TPC-DS tables under database ${config.db}")
spark.sql(s"CREATE DATABASE IF NOT EXISTS ${config.db}")
spark.sql(s"USE ${config.db}")
spark.sql(s"DESC DATABASE ${config.db}").show()

val tpcdsTables = initTable(spark)
tpcdsTables.par.foreach { table =>
table.setScaleFactor(scaleFactor)
table.setParallelism(parallel)
table.setScaleFactor(config.scaleFactor)
table.setFormat(config.format)
config.parallel.foreach(table.setParallelism)
spark.sparkContext.setJobDescription(table.toString)
logger.info(s"$table")
logger.info(s"Generating $table")
table.create()
}
}

def main(args: Array[String]): Unit = {
val parser = new scopt.OptionParser[RunConfig]("tpcds-data-generator") {
head("tpcds-data-generator", "")
val parser = new scopt.OptionParser[Config]("tpcds-data-generator") {
head("Kyuubi TPC-DS Data Generator")
opt[String]('d', "db")
.action { (x, c) => c.copy(db = x) }
.text("the databases to write data")
.text("the database to write data")
opt[Int]('s', "scaleFactor")
.action { (x, c) => c.copy(scaleFactor = x) }
.text("the scale factor of tpcds")
help("help")
.text("the scale factor of TPC-DS")
opt[String]('f', "format")
.action { (x, c) => c.copy(format = x) }
.text("the format of table to store data")
opt[Int]('p', "parallel")
.action { (x, c) => c.copy(parallel = Some(x)) }
.text("the parallelism of Spark job")
help('h', "help")
.text("prints this usage text")
}

parser.parse(args, RunConfig()) match {
case Some(config) =>
run(config)
case None =>
System.exit(1)
parser.parse(args, Config()) match {
case Some(config) => run(config)
case None => sys.exit(1)
}
}
}
Expand Up @@ -40,15 +40,18 @@ case class TableGenerator(
private val rawSchema: StructType = StructType(fields.map(f => StructField(f.name, StringType)))

private var scaleFactor: Int = 1
def setScaleFactor(scale: Int): Unit = this.scaleFactor = scale

private var parallelism: Int = scaleFactor * 2
private var _parallelism: Option[Int] = None
private def parallelism: Int = _parallelism.getOrElse(scaleFactor * 2)
def setParallelism(parallel: Int): Unit = this._parallelism = Some(parallel max 2)

private val ss: SparkSession = SparkSession.active
private val format: String = ss.conf.get("spark.sql.sources.default", "parquet")
private var _format: Option[String] = None
private def format: String = _format.getOrElse(ss.conf.get("spark.sql.sources.default"))
def setFormat(format: String): Unit = this._format = Some(format)

private def radix: Int = {
math.min(math.max(5, scaleFactor / 100), parallelism)
}
private def radix: Int = (scaleFactor / 100) max 5 min parallelism

private def toDF: DataFrame = {
val rawRDD = ss.sparkContext.parallelize(1 to parallelism, parallelism).flatMap { i =>
Expand Down Expand Up @@ -101,8 +104,8 @@ case class TableGenerator(
val data = Paths.get(tempDir.toString, s"${name}_${i}_$parallelism.dat")
val iterator =
if (Files.exists(data)) {
// ... realized that when opening the dat files I should use the “Cp1252” encoding.
// https://github.com/databricks/spark-sql-perf/pull/104
// The data generated by `dsdgen` encoding in "Cp1252".
// See detail at https://github.com/databricks/spark-sql-perf/pull/104
// noinspection SourceNotClosed
Source.fromFile(data.toFile, "cp1252", 8192).getLines
} else {
Expand All @@ -123,14 +126,6 @@ case class TableGenerator(
ss.createDataFrame(rowRDD, rawSchema).select(columns: _*)
}

def setScaleFactor(scale: Int): Unit = {
this.scaleFactor = scale
}

def setParallelism(parallel: Int): Unit = {
this.parallelism = math.max(2, parallel)
}

def create(): Unit = {
val data =
if (partitionCols.isEmpty) {
Expand Down
Expand Up @@ -36,7 +36,7 @@ abstract class Benchmark(

import Benchmark._

val resultsLocation =
val resultsLocation: String =
sparkSession.conf.get(
"spark.sql.perf.results",
"/spark/sql/performance")
Expand All @@ -45,15 +45,16 @@ abstract class Benchmark(

implicit protected def toOption[A](a: A): Option[A] = Option(a)

val buildInfo = Try(getClass.getClassLoader.loadClass("org.apache.spark.BuildInfo")).map { cls =>
cls.getMethods
.filter(_.getReturnType == classOf[String])
.filterNot(_.getName == "toString")
.map(m => m.getName -> m.invoke(cls).asInstanceOf[String])
.toMap
}.getOrElse(Map.empty)
val buildInfo: Map[String, String] =
Try(getClass.getClassLoader.loadClass("org.apache.spark.BuildInfo")).map { cls =>
cls.getMethods
.filter(_.getReturnType == classOf[String])
.filterNot(_.getName == "toString")
.map(m => m.getName -> m.invoke(cls).asInstanceOf[String])
.toMap
}.getOrElse(Map.empty)

def currentConfiguration = BenchmarkConfiguration(
def currentConfiguration: BenchmarkConfiguration = BenchmarkConfiguration(
sqlConf = sparkSession.conf.getAll,
sparkConf = sparkContext.getConf.getAll.toMap,
defaultParallelism = sparkContext.defaultParallelism,
Expand Down Expand Up @@ -82,7 +83,7 @@ abstract class Benchmark(
tags: Map[String, String] = Map.empty,
timeout: Long = 0L,
resultLocation: String = resultsLocation,
forkThread: Boolean = true) = {
forkThread: Boolean = true): ExperimentStatus = {

new ExperimentStatus(
executionsToRun,
Expand Down Expand Up @@ -150,7 +151,7 @@ object Benchmark {
val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]()
val currentMessages = new collection.mutable.ArrayBuffer[String]()

def logMessage(msg: String) = {
def logMessage(msg: String): Unit = {
println(msg)
currentMessages += msg
}
Expand All @@ -170,10 +171,11 @@ object Benchmark {
case h :: t => for (xh <- h; xt <- cartesianProduct(t)) yield xh :: xt
}

val timestamp = System.currentTimeMillis()
val timestamp: Long = System.currentTimeMillis()
val resultPath = s"$resultsLocation/timestamp=$timestamp"
val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList)
val resultsFuture = Future {
val combinations: Seq[List[Int]] =
cartesianProduct(variations.map(l => l.options.indices.toList).toList)
val resultsFuture: Future[Unit] = Future {
// Run the benchmarks!
val results: Seq[ExperimentRun] = (1 to iterations).flatMap { i =>
combinations.map { setup =>
Expand Down Expand Up @@ -260,7 +262,7 @@ object Benchmark {
}

/** Waits for the finish of the experiment. */
def waitForFinish(timeoutInSeconds: Int) = {
def waitForFinish(timeoutInSeconds: Int): Unit = {
Await.result(resultsFuture, timeoutInSeconds.seconds)
}

Expand Down
Expand Up @@ -53,7 +53,7 @@ class Query(
}
}

lazy val tablesInvolved = buildDataFrame.queryExecution.logical collect {
lazy val tablesInvolved: Seq[String] = buildDataFrame.queryExecution.logical collect {
case r: UnresolvedRelation => r.tableName
}

Expand Down
Expand Up @@ -34,9 +34,11 @@ case class RunConfig(
/**
* Usage:
* <p>
* Run following command to benchmark tpcds sf10 with exists database `tpcds_sf10`.
* Run following command to benchmark TPC-DS sf10 with exists database `tpcds_sf10`.
* {{{
* $SPARK_HOME/bin/spark-submit --class org.apache.kyuubi.tpcds.benchmark.RunBenchmark kyuubi-tpcds-*.jar --db tpcds_sf10
* $SPARK_HOME/bin/spark-submit \
* --class org.apache.kyuubi.tpcds.benchmark.RunBenchmark \
* kyuubi-tpcds_*.jar --db tpcds_sf10
* }}}
*/
object RunBenchmark {
Expand All @@ -61,10 +63,8 @@ object RunBenchmark {
}

parser.parse(args, RunConfig()) match {
case Some(config) =>
run(config)
case None =>
System.exit(1)
case Some(config) => run(config)
case None => sys.exit(1)
}
}

Expand Down Expand Up @@ -95,7 +95,7 @@ object RunBenchmark {
val experiment = benchmark.runExperiment(
executionsToRun = allQueries,
iterations = config.iterations,
tags = Map("host" -> InetAddress.getLocalHost().getHostName()))
tags = Map("host" -> InetAddress.getLocalHost.getHostName))

println("== STARTING EXPERIMENT ==")
experiment.waitForFinish(1000 * 60 * 30)
Expand Down
Expand Up @@ -24,5 +24,5 @@ import org.apache.spark.sql.SparkSession
*/
class TPCDS(@transient sparkSession: SparkSession)
extends Benchmark(sparkSession)
with Tpcds_2_4_Queries
with TPCDS_2_4_Queries
with Serializable {}

0 comments on commit 01f0ea8

Please sign in to comment.