# Run TPC-DS throughput test with gazelle

## Define spark configuration to enable gazelle

In [None]:
%%init_spark

launcher.conf.set("spark.driver.extraClassPath", "/home/cloudtik/runtime/benchmark-tools/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar:/home/cloudtik/runtime/oap/oap_jars/gazelle-plugin-1.4.0-spark-3.2.1.jar") 
launcher.conf.set("spark.executor.extraClassPath", "/home/cloudtik/runtime/benchmark-tools/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar:/home/cloudtik/runtime/oap/oap_jars/gazelle-plugin-1.4.0-spark-3.2.1.jar") 
launcher.conf.set("spark.executor.instances", "48") 
launcher.conf.set("spark.driver.memory", "20g") 
launcher.conf.set("spark.driver.maxResultSize", "20g") 
launcher.conf.set("spark.executor.cores", "8") 
launcher.conf.set("spark.executor.memory", "8g") 
launcher.conf.set("spark.executor.memoryOverhead", "384") 
launcher.conf.set("spark.memory.offHeap.enabled", "true") 
launcher.conf.set("spark.memory.offHeap.size", "16g") 
launcher.conf.set("spark.dynamicAllocation.enabled", "false") 
launcher.conf.set("spark.executorEnv.CC", "/home/cloudtik/runtime/oap/bin/x86_64-conda_cos6-linux-gnu-cc") 
launcher.conf.set("spark.plugins", "com.intel.oap.GazellePlugin") 
launcher.conf.set("spark.executorEnv.LD_LIBRARY_PATH", "/home/cloudtik/runtime/oap/lib/") 
launcher.conf.set("spark.executorEnv.LIBARROW_DIR", "/home/cloudtik/runtime/oap/") 
launcher.conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") 
launcher.conf.set("spark.sql.join.preferSortMergeJoin", "false ") 
launcher.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "20480") 
launcher.conf.set("spark.sql.execution.arrow.maxRecordsPerBatc", "20480") 
launcher.conf.set("spark.sql.parquet.columnarReaderBatchSize", "20480") 
launcher.conf.set("spark.sql.autoBroadcastJoinThreshold", "10M") 
launcher.conf.set("spark.sql.broadcastTimeout", "3000") 
launcher.conf.set("spark.sql.crossJoin.enabled", "true") 
launcher.conf.set("spark.sql.columnar.window", "true") 
launcher.conf.set("spark.sql.columnar.sort", "true") 
launcher.conf.set("spark.sql.codegen.wholeStage", "true") 
launcher.conf.set("spark.sql.columnar.codegen.hashAggregate", "false") 
launcher.conf.set("spark.sql.shuffle.partitions", "384") 
launcher.conf.set("spark.kryoserializer.buffer.max", "128m") 
launcher.conf.set("spark.kryoserializer.buffer", "32m") 
launcher.conf.set("spark.oap.sql.columnar.preferColumnar", "false") 
launcher.conf.set("spark.oap.sql.columnar.sortmergejoin.lazyread", "true") 
launcher.conf.set("spark.oap.sql.columnar.sortmergejoin", "true") 
launcher.conf.set("spark.oap.sql.columnar.coreRange", "0-31,64-95|32-63,96-127") 
launcher.conf.set("spark.oap.sql.columnar.joinOptimizationLevel", "18") 
launcher.conf.set("spark.oap.sql.columnar.shuffle.customizedCompression.codec", "lz4") 
launcher.conf.set("spark.executorEnv.ARROW_ENABLE_NULL_CHECK_FOR_GET", "false") 
launcher.conf.set("spark.executorEnv.ARROW_ENABLE_UNSAFE_MEMORY_ACCESS", "true")
launcher.conf.set("spark.executorEnv.AWS_ACCESS_KEY_ID", "xxxxxx") 
launcher.conf.set("spark.executorEnv.AWS_SECRET_ACCESS_KEY", "xxxxxx")


## Define the benchmark configuration

In [None]:
val streamNumber = 2              // how many streams you want to start 
val scaleFactor = "1"           // data scale 1GB
val iterations = 1              // how many times to run the whole set of queries.
val format = "parquet"          // support parquer or orc
// support s3a://s3_bucket, gs://gs_bucket, hdfs://namenode_ip:9000
// wasbs://container@storage_account.blob.core.windows.net
// abfs://container@storage_account.dfs.core.windows.net
val fsdir = "hdfs://namenode_ip:9000" 
val partitionTables = true      // create partition tables
val query_filter = Seq()        // Seq() == all queries
//val query_filter = Seq("q1-v2.4", "q2-v2.4") // run subset of queries
val randomizeQueries = true    // run queries in a random order. Recommended for parallel runs.
val recreateDatabase = false    // If the previous table creation failed, then this value needs to be set to true


## Create tables

In [None]:
import java.text.SimpleDateFormat;
import java.util.Date
import java.util.concurrent.Executors
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import com.databricks.spark.sql.perf.tpcds.TPCDS
import com.databricks.spark.sql.perf.Benchmark.ExperimentStatus
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, lit, substring}

val current_time = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss").format(new Date)
// detailed results will be written as JSON to this location.
var resultLocation = s"${fsdir}/shared/data/results/tpcds_${format}/${scaleFactor}/${current_time}"
var databaseName = s"tpcds_${format}_scale_${scaleFactor}_db"
val use_arrow = true            // when you want to use gazella_plugin to run TPC-DS, you need to set it true.
val data_path = s"${fsdir}/shared/data/tpcds/tpcds_${format}/${scaleFactor}"


if (use_arrow){
    resultLocation = s"${fsdir}/shared/data/results/tpcds_arrow/${scaleFactor}/"
    databaseName = s"tpcds_arrow_scale_${scaleFactor}_db"
    val tables = Seq("call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", "customer_address", "customer_demographics", "date_dim", "household_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "store_sales", "time_dim", "warehouse", "web_page", "web_returns", "web_sales", "web_site")
    if (spark.catalog.databaseExists(s"$databaseName")) {
        if (!recreateDatabase) {
            println(s"Using existing $databaseName")
        } else {
            println(s"$databaseName exists, now drop and recreate it...")
            sql(s"drop database if exists $databaseName cascade")
            sql(s"create database if not exists $databaseName").show
        }
    } else {
        println(s"$databaseName doesn't exist. Creating...")
        sql(s"create database if not exists $databaseName").show
    }
    sql(s"use $databaseName").show
    for (table <- tables) {
        if (spark.catalog.tableExists(s"$table")){
            println(s"$table exists.")
        }else{
            spark.catalog.createTable(s"$table", s"$data_path/$table", "arrow")
        }
    }
    if (partitionTables) {
        for (table <- tables) {
            try{
                sql(s"ALTER TABLE $table RECOVER PARTITIONS").show
            }catch{
                case e: Exception => println(e)
            }
        }
    }
} else {
    // Check whether the database is created, we create external tables if not
    val databaseExists = spark.catalog.databaseExists(s"$databaseName")
    if (databaseExists && !recreateDatabase) {
        println(s"Using existing $databaseName")
    } else {
        if (databaseExists) {
            println(s"$databaseName exists, now drop and recreate it...")
            sql(s"drop database if exists $databaseName cascade")
        } else {
            println(s"$databaseName doesn't exist. Creating...")
        }

        import com.databricks.spark.sql.perf.tpcds.TPCDSTables

        val tables = new TPCDSTables(spark.sqlContext, "", s"${scaleFactor}", false)
        tables.createExternalTables(data_path, format, databaseName, overwrite = true, discoverPartitions = partitionTables)
    }
}

val timeout = 60 // timeout in hours


## Run queries

In [None]:
sql(s"use $databaseName")
import com.databricks.spark.sql.perf.tpcds.TPCDS
val tpcds = new TPCDS (sqlContext = spark.sqlContext)
def queries(stream: Int) = {
  val filtered_queries = query_filter match {
    case Seq() => tpcds.tpcds2_4Queries
    case _ => tpcds.tpcds2_4Queries.filter(q => query_filter.contains(q.name))
  }
  if (randomizeQueries && stream != 0) scala.util.Random.shuffle(filtered_queries) else filtered_queries
}

class ThreadStream(experiment:ExperimentStatus, i:Int) extends Thread{
    override def run(){
        println("stream_" + i + " has started...")
        println(experiment.toString)
        experiment.waitForFinish(timeout*60*60)
        println("stream_" + i + " has finished.")
    }
}

val threadPool:ExecutorService = Executors.newFixedThreadPool(streamNumber)
val experiments:Array[ExperimentStatus] = new Array[ExperimentStatus](streamNumber)

try {
    for(i <- 0 to (streamNumber - 1)){
        experiments(i) = tpcds.runExperiment(
            queries(i), 
            iterations = iterations,
            resultLocation = resultLocation,
            tags = Map("runtype" -> "benchmark", "database" -> databaseName, "scale_factor" -> scaleFactor)
        )
        threadPool.execute(new ThreadStream(experiments(i), i))
    }
}finally{
    threadPool.shutdown()
    threadPool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)
}

val summary_dfs = new Array[DataFrame](streamNumber)
for(i <- 0 to (streamNumber - 1)){
   summary_dfs(i) = experiments(i).getCurrentResults.withColumn("Name", substring(col("name"), 2, 100)).withColumn("Runtime", (col("parsingTime") + col("analysisTime") + col("optimizationTime") + col("planningTime") + col("executionTime")) / 1000.0).select('Name, 'Runtime).agg(sum("Runtime")).withColumn("stream", lit("stream_" + i)).select("stream", "sum(Runtime)")
}
var summary_df = summary_dfs(0)
for (i <- 0 to (streamNumber - 1)){
    if (i != 0) {
        summary_df = summary_df.union(summary_dfs(i))
    }   
}
summary_df = summary_df.union(summary_df.agg(max("sum(Runtime)")).withColumnRenamed("max(sum(Runtime))","sum(Runtime)").withColumn("stream", lit("max_stream")).select("stream", "sum(Runtime)"))
summary_df.show()

// Save the performance summary dataframe to a CSV file with a specified file name
val finalResultPath = s"${resultLocation}/summary"
summary_df.repartition(1).write.option("header", "true").mode("overwrite").csv(finalResultPath)

import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI

val fs = FileSystem.get(URI.create(finalResultPath), sc.hadoopConfiguration)
val file = fs.globStatus(new Path(s"$finalResultPath/*.csv"))(0).getPath().getName()
val srcPath=new Path(s"$finalResultPath/$file")
val destPath= new Path(s"$finalResultPath/summary.csv")
fs.rename(srcPath, destPath)

for(i <- 0 to (streamNumber - 1)){
   println(s"stream_${i} result is saved to ${experiments(i).resultPath}")
}

println(s"Performance summary is saved to ${destPath}")
