Skip to content

Commit

Permalink
Add a convenient class to generate TPC-DS data (#196)
Browse files Browse the repository at this point in the history
How to use it:
```
build/sbt "test:runMain com.databricks.spark.sql.perf.tpcds.GenTPCDSData -d /root/tmp/tpcds-kit/tools -s 5 -l /root/tmp/tpcds5g -f parquet"
```

```
[root@spark-3267648 spark-sql-perf]# build/sbt "test:runMain com.databricks.spark.sql.perf.tpcds.GenTPCDSData --help"
[info] Running com.databricks.spark.sql.perf.tpcds.GenTPCDSData --help
[info] Usage: Gen-TPC-DS-data [options]
[info]
[info]   -m, --master <value>     the Spark master to use, default to local[*]
[info]   -d, --dsdgenDir <value>  location of dsdgen
[info]   -s, --scaleFactor <value>
[info]                            scaleFactor defines the size of the dataset to generate (in GB)
[info]   -l, --location <value>   root directory of location to create data in
[info]   -f, --format <value>     valid spark format, Parquet, ORC ...
[info]   -i, --useDoubleForDecimal <value>
[info]                            true to replace DecimalType with DoubleType
[info]   -e, --useStringForDate <value>
[info]                            true to replace DateType with StringType
[info]   -o, --overwrite <value>  overwrite the data that is already there
[info]   -p, --partitionTables <value>
[info]                            create the partitioned fact tables
[info]   -c, --clusterByPartitionColumns <value>
[info]                            shuffle to get partitions coalesced into single files
[info]   -v, --filterOutNullPartitionValues <value>
[info]                            true to filter out the partition with NULL key value
[info]   -t, --tableFilter <value>
[info]                            "" means generate all tables
[info]   -n, --numPartitions <value>
[info]                            how many dsdgen partitions to run - number of input tasks.
[info]   --help                   prints this usage text
```
  • Loading branch information
wangyum committed Mar 30, 2021
1 parent 65785a8 commit ca4ccea
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 24 deletions.
28 changes: 4 additions & 24 deletions README.md
Expand Up @@ -67,31 +67,11 @@ TPCDS kit needs to be installed on all cluster executor nodes under the same pat
It can be found [here](https://github.com/databricks/tpcds-kit).

```
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
// Set:
val rootDir = ... // root directory of location to create data in.
val databaseName = ... // name of database to create.
val scaleFactor = ... // scaleFactor defines the size of the dataset to generate (in GB).
val format = ... // valid spark format like parquet "parquet".
// Run:
val tables = new TPCDSTables(sqlContext,
dsdgenDir = "/tmp/tpcds-kit/tools", // location of dsdgen
scaleFactor = scaleFactor,
useDoubleForDecimal = false, // true to replace DecimalType with DoubleType
useStringForDate = false) // true to replace DateType with StringType
tables.genData(
location = rootDir,
format = format,
overwrite = true, // overwrite the data that is already there
partitionTables = true, // create the partitioned fact tables
clusterByPartitionColumns = true, // shuffle to get partitions coalesced into single files.
filterOutNullPartitionValues = false, // true to filter out the partition with NULL key value
tableFilter = "", // "" means generate all tables
numPartitions = 100) // how many dsdgen partitions to run - number of input tasks.
// Generate the data
build/sbt "test:runMain com.databricks.spark.sql.perf.tpcds.GenTPCDSData -d <dsdgenDir> -s <scaleFactor> -l <location> -f <format>"
```

```
// Create the specified database
sql(s"create database $databaseName")
// Create metastore tables in a specified database for your data.
Expand Down
121 changes: 121 additions & 0 deletions src/main/scala/com/databricks/spark/sql/perf/tpcds/GenTPCDSData.scala
@@ -0,0 +1,121 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.databricks.spark.sql.perf.tpcds

import org.apache.spark.sql.SparkSession

case class GenTPCDSDataConfig(
master: String = "local[*]",
dsdgenDir: String = null,
scaleFactor: String = null,
location: String = null,
format: String = null,
useDoubleForDecimal: Boolean = false,
useStringForDate: Boolean = false,
overwrite: Boolean = false,
partitionTables: Boolean = true,
clusterByPartitionColumns: Boolean = true,
filterOutNullPartitionValues: Boolean = true,
tableFilter: String = "",
numPartitions: Int = 100)

/**
* Gen TPCDS data.
* To run this:
* {{{
* build/sbt "test:runMain <this class> -d <dsdgenDir> -s <scaleFactor> -l <location> -f <format>"
* }}}
*/
object GenTPCDSData {
def main(args: Array[String]): Unit = {
val parser = new scopt.OptionParser[GenTPCDSDataConfig]("Gen-TPC-DS-data") {
opt[String]('m', "master")
.action { (x, c) => c.copy(master = x) }
.text("the Spark master to use, default to local[*]")
opt[String]('d', "dsdgenDir")
.action { (x, c) => c.copy(dsdgenDir = x) }
.text("location of dsdgen")
.required()
opt[String]('s', "scaleFactor")
.action((x, c) => c.copy(scaleFactor = x))
.text("scaleFactor defines the size of the dataset to generate (in GB)")
opt[String]('l', "location")
.action((x, c) => c.copy(location = x))
.text("root directory of location to create data in")
opt[String]('f', "format")
.action((x, c) => c.copy(format = x))
.text("valid spark format, Parquet, ORC ...")
opt[Boolean]('i', "useDoubleForDecimal")
.action((x, c) => c.copy(useDoubleForDecimal = x))
.text("true to replace DecimalType with DoubleType")
opt[Boolean]('e', "useStringForDate")
.action((x, c) => c.copy(useStringForDate = x))
.text("true to replace DateType with StringType")
opt[Boolean]('o', "overwrite")
.action((x, c) => c.copy(overwrite = x))
.text("overwrite the data that is already there")
opt[Boolean]('p', "partitionTables")
.action((x, c) => c.copy(partitionTables = x))
.text("create the partitioned fact tables")
opt[Boolean]('c', "clusterByPartitionColumns")
.action((x, c) => c.copy(clusterByPartitionColumns = x))
.text("shuffle to get partitions coalesced into single files")
opt[Boolean]('v', "filterOutNullPartitionValues")
.action((x, c) => c.copy(filterOutNullPartitionValues = x))
.text("true to filter out the partition with NULL key value")
opt[String]('t', "tableFilter")
.action((x, c) => c.copy(tableFilter = x))
.text("\"\" means generate all tables")
opt[Int]('n', "numPartitions")
.action((x, c) => c.copy(numPartitions = x))
.text("how many dsdgen partitions to run - number of input tasks.")
help("help")
.text("prints this usage text")
}

parser.parse(args, GenTPCDSDataConfig()) match {
case Some(config) =>
run(config)
case None =>
System.exit(1)
}
}

private def run(config: GenTPCDSDataConfig) {
val spark = SparkSession
.builder()
.appName(getClass.getName)
.master(config.master)
.getOrCreate()

val tables = new TPCDSTables(spark.sqlContext,
dsdgenDir = config.dsdgenDir,
scaleFactor = config.scaleFactor,
useDoubleForDecimal = config.useDoubleForDecimal,
useStringForDate = config.useStringForDate)

tables.genData(
location = config.location,
format = config.format,
overwrite = config.overwrite,
partitionTables = config.partitionTables,
clusterByPartitionColumns = config.clusterByPartitionColumns,
filterOutNullPartitionValues = config.filterOutNullPartitionValues,
tableFilter = config.tableFilter,
numPartitions = config.numPartitions)
}
}

0 comments on commit ca4ccea

Please sign in to comment.