# Explore Spark SQL and its performance using TPC-DS workload
This notebook sets up the Spark environment to run TPC-DS bench-mark on 1GB scale factor. TPC-DS is a widely used industry standard decision support benchmark that is used to evaluate performance of the data processing engines. Given TPC-DS excercises some key data warehouse features, running TPC-DS successfully reflects the readiness of Spark in terms of addressing the need of a data warehouse application. Apache Spark v2.0 supports all 99 decision support queries that is part of this benchmark. 

This notebook is written in scala and is intended to help the spark developers gain understanding on the setup steps required to run the benchmark.
<b>Please note :</b> Several additional tuning steps may be required when adapting this to an actual spark production cluster.

## Get the journey data
- Clone the tpcds journey repository to get access to all the data and scripts that are required to excercise this journey. 
- Normally the data and queries are generated by running the data and query generation utility from the tpcds toolkit available at http://www.tpc.org/tpcds. However for ease of use, the data and queries are pre-generated for 1GB scale factor. 
- We use the pre-generated data and queries to demonstrate how they can be used to run the tpcds queries against spark.

## Setup variables.
* Sets up variables that are used in the rest of this notebook.
* The path variables are relative to the git clone directory.
* tpcdsDatabaseName is hard-coded to "TPCDS1G". This can be changed if a different database name is desired.

In [1]:
val tpcdsRootDir = "/home/mlp/notebooks/spark-tpc-ds-performance-test"
val tpcdsWorkDir = "hdfs:///muggle.feng/spark-tpc-ds-performance-test/work"
val tpcdsDdlDir = s"${tpcdsRootDir}/src/ddl/individual"
val tpcdsGenDataDir = s"${tpcdsRootDir}/src/data"
val tpcdsQueriesDir = s"${tpcdsRootDir}/src/queries"
val tpcdsDatabaseName = "TPCDS"
var totalTime: Long = 0
println("TPCDS root directory is at : "+ tpcdsRootDir)
println("TPCDS ddl scripts directory is at: " + tpcdsDdlDir)
println("TPCDS data directory is at: "+ tpcdsGenDataDir)
println("TPCDS queries directory is at: "+ tpcdsQueriesDir)
val spark = SparkSession.
    builder().
    appName("tpc-ds-performance-muggle-feng").
    config("spark.ui.showConsoleProgress", false).
    config("spark.sql.autoBroadcastJoinThreshold", -1).
    config("spark.sql.crossJoin.enabled", true).
    getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

TPCDS root directory is at : /home/mlp/notebooks/spark-tpc-ds-performance-test
TPCDS ddl scripts directory is at: /home/mlp/notebooks/spark-tpc-ds-performance-test/src/ddl/individual
TPCDS data directory is at: /home/mlp/notebooks/spark-tpc-ds-performance-test/src/data
TPCDS queries directory is at: /home/mlp/notebooks/spark-tpc-ds-performance-test/src/queries


tpcdsRootDir = /home/mlp/notebooks/spark-tpc-ds-performance-test
tpcdsWorkDir = hdfs:///muggle.feng/spark-tpc-ds-performance-test/work
tpcdsDdlDir = /home/mlp/notebooks/spark-tpc-ds-performance-test/src/ddl/individual
tpcdsGenDataDir = /home/mlp/notebooks/spark-tpc-ds-performance-test/src/data
tpcdsQueriesDir = /home/mlp/notebooks/spark-tpc-ds-performance-test/src/queries
tpcdsDatabaseName = TPCDS
totalTime = 0
spark = org.apache.spark.sql.SparkSession@3183324b


org.apache.spark.sql.SparkSession@3183324b

### Utility function definitions.
* Defines the utility functions that are called from the cells below in the notebook.

In [8]:
def clearTableDirectory(tableName: String): Unit = {
    import sys.process._
    val commandStr1 = s"rm -rf spark-warehouse/tpcds2g.db/${tableName}/*"
    val commandStr2 = s"rm -rf spark-warehouse/tpcds2g.db/${tableName}"
    var exitCode = Process(commandStr1).!
    exitCode = Process(commandStr2).!
}

def createDatabase(): Unit = {
    spark.sql(s"DROP DATABASE IF EXISTS ${tpcdsDatabaseName} CASCADE")
    spark.sql(s"CREATE DATABASE ${tpcdsDatabaseName}")
    spark.sql(s"USE ${tpcdsDatabaseName}")
}

/**
 * Function to create a table in spark. It reads the DDL script for each of the
 * tpc-ds table and executes it on Spark.
 */
def createTable(tableName: String): Unit = {
  println(s"Creating table $tableName ..")
  spark.sql(s"DROP TABLE IF EXISTS $tableName")
  clearTableDirectory(tableName)  
  val (fileName, content) = 
    spark.sparkContext.wholeTextFiles(s"${tpcdsDdlDir}/$tableName.sql").collect()(0) 
    
  // Remove the replace for the .dat once it is fixed in the github repo
  //val sqlStmts = content.stripLineEnd
  //  .replace('\n', ' ')
  //  .replace("${TPCDS_GENDATA_DIR}", tpcdsGenDataDir)
  //  .replace("csv", "org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").split(";")
  sqlStmts.map(stmt => spark.sql(stmt))    
}  

import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.DataFrame
import scala.io.Source

def readFromLocal(fileName: String): String = {
  val source = Source.fromFile(fileName)
  return source.mkString
}

def runQuery(queryStr: String,
             individual: Boolean = true,
             resultDir: String): Seq[(String, Double, Int, String)] = {
  val querySummary = ArrayBuffer.empty[(String, Double, Int, String)]
  val queryName = s"${tpcdsQueriesDir}/query${queryStr}.sql"
  //val (_, content) = spark.sparkContext.wholeTextFiles(queryName).collect()(0)
  val content = readFromLocal(queryName)
  val queries = content.split("\n")
    .filterNot (_.startsWith("--"))
    .mkString(" ").split(";")
  
  var cnt = 1  
  for (query <- queries)  {
   val start = System.nanoTime()
   val df = spark.sql(query)   
   val result = spark.sql(query).collect
   val timeElapsed = (System.nanoTime() - start) / 1000000000
   val name = if (queries.length > 1) {
       s"query${queryStr}-${cnt}"
   } else {
       s"query${queryStr}"
   }  
   val resultFile = s"${resultDir}/${name}-notebook.res"  
   df.coalesce(1)
      .write.format("com.databricks.spark.csv")
      .option("header", "true")
      .mode("overwrite")
      .save(resultFile)
   totalTime = totalTime + timeElapsed
  
   querySummary += Tuple4.apply(name, timeElapsed, result.length, resultFile)
   cnt += 1                
  }
  querySummary 
}

// run function for each table in tables array
def forEachTable(tables: Array[String], f: (String) => Unit): Unit = {
  for ( table <- tables) {
    try {
      f(table)
    } catch {
      case e: Throwable => {
        println("EXCEPTION!! " + e.getMessage())
        throw e
      }
    }
  }
}

def runIndividualQuery(queryNum: Int, resultDir: String = tpcdsWorkDir ): DataFrame = {
    val queryStr = "%02d".format(queryNum) 
    val testSummary = ArrayBuffer.empty[(String, Double, Int, String)] 
    try {      
      println(s"Running TPC-DS Query : $queryStr")  
      testSummary ++= runQuery(queryStr, true, resultDir)
    } catch {
        case e: Throwable => {
            println("Error in query "+ queryNum + " msg = " + e.getMessage)
        }
    }
    testSummary.toDF("QueryName","ElapsedTime","RowsReturned", "ResultLocation")
}

def runAllQueries(resultDir: String = tpcdsWorkDir): DataFrame = {
  val testSummary = ArrayBuffer.empty[(String, Double, Int, String)]    
  var queryErrors = 0
  for (i <- 1 to 99) {
    try{
      val queryStr = "%02d".format(i)
      println(s"Running TPC-DS Query : $queryStr")   
      testSummary ++= runQuery(queryStr, false, resultDir)
    } catch {
       case e: Throwable => {
            println("Error in query "+ i + " msg = " + e.getMessage)
            queryErrors += 1
       }
    }
  }

  println("=====================================================")
  if ( queryErrors > 0) {
    println(s"Query execution failed with $queryErrors errors")
  } else {
    println("All TPC-DS queries ran successfully")
  }
  println (s"Total Elapsed Time so far: ${totalTime} seconds.")
  println("=====================================================")
  testSummary.toDF("QueryName","ElapsedTime","RowsReturned", "ResultLocation")
}

def displaySummary(summaryDF: DataFrame): Unit = {
    summaryDF.select("QueryName", "ElapsedTime", "RowsReturned").show(10000)
}

def displayResult(queryNum: Int, summaryDF: DataFrame) = {
   val queryStr = "%02d".format(queryNum)
   // Find result files for this query number. For some queries there are
   // multiple result files. 
   val files = summaryDF.where(s"queryName like 'query${queryStr}%'").select("ResultLocation").collect()
   for (file <- files) {
       val fileName = file.getString(0)
       val df = spark.read
         .format("csv")
         .option("header", "true") //reading the headers
         .option("inferSchema", "true")
         .option("mode", "DROPMALFORMED")
         .load(fileName)
       val numRows:Int = df.count().toInt
       df.show(numRows, truncate=false)
   }
}

def explainQuery(queryNum: Int) = {
  val queryStr = "%02d".format(queryNum)  
  val queryName = s"${tpcdsQueriesDir}/query${queryStr}.sql"   
  //val (_, content) = spark.sparkContext.wholeTextFiles(queryName).collect()(0)
  val content = readFromLocal(queryName)
  val queries = content.split("\n")
    .filterNot (_.startsWith("--"))
    .mkString(" ").split(";")
    
  for (query <- queries)  {    
    spark.sql(query).explain(true) 
  }
}

Name: Unknown Error
Message: lastException: Throwable = null
<console>:75: error: not found: value sqlStmts
         sqlStmts.map(stmt => spark.sql(stmt))
         ^

StackTrace: 

## Setup the TPC-DS schema
* Create the database as specified by tpcdsDatabaseName
* Create all the TPC-DS tables
* Load the data into the tables in parquet format. Since the data generated by tpc-ds toolkit is in CSV format, we do the loading in multi steps.
  * Step 1: we create tables in csv format by pointing the location to the generated data
  * Step 2: we create parquet tables by using CTAS to convert text data into parquet format
  * Step 3: we drop the text based tables as we longer need them

In [3]:
// TPC-DS table names.
val tables = Array("call_center", "catalog_sales",
                   "customer_demographics", "income_band",
                   "promotion", "store", "time_dim", "web_returns",
                   "catalog_page", "customer", "date_dim",
                   "inventory", "reason", "store_returns", "warehouse",
                   "web_sales", "catalog_returns", "customer_address",
                   "household_demographics", "item", "ship_mode", "store_sales",
                   "web_page", "web_site" )

// Create database
createDatabase

// Create table
forEachTable(tables, table => createTable(table))


Hive history file=/tmp/mlp/hive_job_log_ba32da5b-0368-4978-94d0-fa239d937214_645930869.txt


tables = Array(call_center, catalog_sales, customer_demographics, income_band, promotion, store, time_dim, web_returns, catalog_page, customer, date_dim, inventory, reason, store_returns, warehouse, web_sales, catalog_returns, customer_address, household_demographics, item, ship_mode, store_sales, web_page, web_site)


[]

## Verify table creation and data loading.
* Run a simple Spark SQL query to get the count of rows
* Verify that the row counts are as expected

In [7]:
// Run a count query and get the counts
val rowCounts = tables.map { table =>
    spark.table(table).count()
}

val expectedCounts = Array (
    6, 1441548, 1920800, 20, 300, 12, 86400,
    71763,  11718, 100000, 73049, 11745000, 
    35, 287514, 5, 719384, 144067, 50000, 7200,
    18000, 20, 2880404, 60, 30
)

var errorCount = 0;
val zippedCountsWithIndex = rowCounts.zip(expectedCounts).zipWithIndex
for ((pair, index) <- zippedCountsWithIndex) {
    if (pair._1 != pair._2) {
        println(s"""ERROR!! Row counts for ${tables(index)} does not match.
        Expected=${expectedCounts(index)} but found ${rowCounts(index)}""")
        errorCount += 1
    }
}

println("=====================================================")
if ( errorCount > 0) {
  println(s"Load verification failed with $errorCount errors")
} else {
  println("Loaded and verified the table counts successfully")
}
println("=====================================================")

Loaded and verified the table counts successfully


rowCounts = Array(6, 1441548, 1920800, 20, 300, 12, 86400, 71763, 11718, 100000, 73049, 11745000, 35, 287514, 5, 719384, 144067, 50000, 7200, 18000, 20, 2880404, 60, 30)
expectedCounts = Array(6, 1441548, 1920800, 20, 300, 12, 86400, 71763, 11718, 100000, 73049, 11745000, 35, 287514, 5, 719384, 144067, 50000, 7200, 18000, 20, 2880404, 60, 30)
errorCount = 0
zippedCountsWithIndex = Array(((6,6),0), ((1441548,1441548),1), ((1920800,1920800),2), ((20,20),3), ((300,300),4), ((12,12),5), ((86400,86400),6), ((71763,71763),7), ((11718,11718),8), ((100000,100000),9), ((73049,73049),10), ((11745000,11745000),11), ((35,35),12), ((287514,287514),13), ((5,5),14), ((719384,719384),15), ((144067,144067),16), ((50000,50000),17), ((7200,7200),18)...


Array(((6,6),0), ((1441548,1441548),1), ((1920800,1920800),2), ((20,20),3), ((300,300),4), ((12,12),5), ((86400,86400),6), ((71763,71763),7), ((11718,11718),8), ((100000,100000),9), ((73049,73049),10), ((11745000,11745000),11), ((35,35),12), ((287514,287514),13), ((5,5),14), ((719384,719384),15), ((144067,144067),16), ((50000,50000),17), ((7200,7200),18)...

## Run a single query
* Run a query given a query number between 1 to 99
* Display the query results, the elapsed time to execute the query and the number of rows returned for the query
* To run a different query, please change the QUERY_NUM to a valid value from 1 to 99

In [4]:
val QUERY_NUM = 1
val result = runIndividualQuery(QUERY_NUM, "hdfs:///muggle.feng")
displaySummary(result)
displayResult(QUERY_NUM, result)

Running TPC-DS Query : 01
+---------+-----------+------------+
|QueryName|ElapsedTime|RowsReturned|
+---------+-----------+------------+
|  query01|       27.0|         100|
+---------+-----------+------------+

+----------------+
|c_customer_id   |
+----------------+
|AAAAAAAAAAABBAAA|
|AAAAAAAAAAADBAAA|
|AAAAAAAAAAADBAAA|
|AAAAAAAAAAAKAAAA|
|AAAAAAAAAABDAAAA|
|AAAAAAAAAABHBAAA|
|AAAAAAAAAABLAAAA|
|AAAAAAAAAABMAAAA|
|AAAAAAAAAACHAAAA|
|AAAAAAAAAACMAAAA|
|AAAAAAAAAADDAAAA|
|AAAAAAAAAADGAAAA|
|AAAAAAAAAADGBAAA|
|AAAAAAAAAADGBAAA|
|AAAAAAAAAADPAAAA|
|AAAAAAAAAAEBAAAA|
|AAAAAAAAAAEFBAAA|
|AAAAAAAAAAEGBAAA|
|AAAAAAAAAAEIAAAA|
|AAAAAAAAAAEMAAAA|
|AAAAAAAAAAFAAAAA|
|AAAAAAAAAAFPAAAA|
|AAAAAAAAAAGGBAAA|
|AAAAAAAAAAGHBAAA|
|AAAAAAAAAAGJAAAA|
|AAAAAAAAAAGMAAAA|
|AAAAAAAAAAHEBAAA|
|AAAAAAAAAAHFBAAA|
|AAAAAAAAAAIEBAAA|
|AAAAAAAAAAJGBAAA|
|AAAAAAAAAAJHBAAA|
|AAAAAAAAAAKCAAAA|
|AAAAAAAAAAKCAAAA|
|AAAAAAAAAAKJAAAA|
|AAAAAAAAAAKMAAAA|
|AAAAAAAAAAKMAAAA|
|AAAAAAAAAALAAAAA|
|AAAAAAAAAALABAAA|
|AAAAAAAA

QUERY_NUM = 1
result = [QueryName: string, ElapsedTime: double ... 2 more fields]


[QueryName: string, ElapsedTime: double ... 2 more fields]

## Run all the TPC-DS queries
* Runs all the queries starting from 1 to 99
* The query results are saved and can be queried by calling getResults method.
* The summary will be shown at the end.

In [5]:
val result = runAllQueries()
displaySummary(result)

Running TPC-DS Query : 01
Running TPC-DS Query : 02
Running TPC-DS Query : 03
Running TPC-DS Query : 04
Running TPC-DS Query : 05
Running TPC-DS Query : 06
Running TPC-DS Query : 07
Running TPC-DS Query : 08
Running TPC-DS Query : 09
Running TPC-DS Query : 10
Running TPC-DS Query : 11
Running TPC-DS Query : 12
Running TPC-DS Query : 13
Running TPC-DS Query : 14
Running TPC-DS Query : 15
Running TPC-DS Query : 16
Running TPC-DS Query : 17
Running TPC-DS Query : 18
Running TPC-DS Query : 19
Running TPC-DS Query : 20
Running TPC-DS Query : 21
Running TPC-DS Query : 22
Running TPC-DS Query : 23
Running TPC-DS Query : 24
Running TPC-DS Query : 25
Running TPC-DS Query : 26
Running TPC-DS Query : 27
Running TPC-DS Query : 28
Running TPC-DS Query : 29
Running TPC-DS Query : 30
Running TPC-DS Query : 31
Running TPC-DS Query : 32
Running TPC-DS Query : 33
Running TPC-DS Query : 34
Running TPC-DS Query : 35
Running TPC-DS Query : 36
Running TPC-DS Query : 37
Running TPC-DS Query : 38
Running TPC-

result = [QueryName: string, ElapsedTime: double ... 2 more fields]


[QueryName: string, ElapsedTime: double ... 2 more fields]

## Display Result for a individual Query
* Reads the result file for the given query stored when thery are run in previous steps.
* Certain queries have multiple associated result files. The result files are read in sequence and
  results are displayed.
* If the result file(s) are not found , then an error is displayed.  


In [6]:
displayResult(1, result)

+----------------+
|c_customer_id   |
+----------------+
|AAAAAAAAAAABBAAA|
|AAAAAAAAAAADBAAA|
|AAAAAAAAAAADBAAA|
|AAAAAAAAAAAKAAAA|
|AAAAAAAAAABDAAAA|
|AAAAAAAAAABHBAAA|
|AAAAAAAAAABLAAAA|
|AAAAAAAAAABMAAAA|
|AAAAAAAAAACHAAAA|
|AAAAAAAAAACMAAAA|
|AAAAAAAAAADDAAAA|
|AAAAAAAAAADGAAAA|
|AAAAAAAAAADGBAAA|
|AAAAAAAAAADGBAAA|
|AAAAAAAAAADPAAAA|
|AAAAAAAAAAEBAAAA|
|AAAAAAAAAAEFBAAA|
|AAAAAAAAAAEGBAAA|
|AAAAAAAAAAEIAAAA|
|AAAAAAAAAAEMAAAA|
|AAAAAAAAAAFAAAAA|
|AAAAAAAAAAFPAAAA|
|AAAAAAAAAAGGBAAA|
|AAAAAAAAAAGHBAAA|
|AAAAAAAAAAGJAAAA|
|AAAAAAAAAAGMAAAA|
|AAAAAAAAAAHEBAAA|
|AAAAAAAAAAHFBAAA|
|AAAAAAAAAAIEBAAA|
|AAAAAAAAAAJGBAAA|
|AAAAAAAAAAJHBAAA|
|AAAAAAAAAAKCAAAA|
|AAAAAAAAAAKCAAAA|
|AAAAAAAAAAKJAAAA|
|AAAAAAAAAAKMAAAA|
|AAAAAAAAAAKMAAAA|
|AAAAAAAAAALAAAAA|
|AAAAAAAAAALABAAA|
|AAAAAAAAAALGAAAA|
|AAAAAAAAAALHBAAA|
|AAAAAAAAAALJAAAA|
|AAAAAAAAAANHAAAA|
|AAAAAAAAAANHBAAA|
|AAAAAAAAAANJAAAA|
|AAAAAAAAAANMAAAA|
|AAAAAAAAAANMAAAA|
|AAAAAAAAAANNAAAA|
|AAAAAAAAAAOBBAAA|
|AAAAAAAAAAODBAAA|
|AAAAAAAAAAO

## Display SQL Execution Plan
* Display the analyzed, optimized and phyical plan for a given query.
* Can be used by developers for debugging purposes.
* QUERY_NUM can be changed to display the plan for different query.

In [9]:
val QUERY_NUM=1
explainQuery(QUERY_NUM)

Name: org.apache.hadoop.mapreduce.lib.input.InvalidInputException
Message: Input path does not exist: hdfs://rtcluster/home/mlp/notebooks/spark-tpc-ds-performance-test/src/queries/query01.sql
StackTrace:   at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:323)
  at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
  at org.apache.spark.input.WholeTextFileInputFormat.setMinPartitions(WholeTextFileInputFormat.scala:52)
  at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:54)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:2

In [10]:
%%brunel
data('result') bar x(QueryName) y(ElapsedTime) title("Query Execution Time in seconds", "Execution Summary":footer)

Name: Error parsing magics!
Message: Magic brunel does not exist!
StackTrace: 

## Learn more
Visit [Apache Spark](https://spark.apache.org) for learning about spark. For questions or requests plese visit [Spark Community](https://spark.apache.org/community.html). To get involved , see [Contributing to Apache Spark](https://spark.apache.org/contributing.html).

## Authors
* Dilip Biswal is a Senior Software Engineer at the Spark Technology Center at IBM. He is an active Apache Spark contributor and works in the open source community.
  He is experienced in Relational Databases, Distributed Computing and Big Data Analytics.  He has extensively worked on SQL engines like Informix, Derby, and Big SQL.
* Sunitha Kambhampati is an Advisory Software Engineer at the Spark Technology Center at IBM. She is an Apache Spark contributor and works in the open source community. She is experienced in Big Data Analytics.
* Xin Wu is an Advisory Software Engineer and is an active contributor for Apache Spark. He has experiences in distributed query processing engines like BigSQL, DB2.