<a href="https://colab.research.google.com/github/dalgual/dalgual.github.io/blob/main/wazeXGBoostFromCleanDataGPU_zepl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Waze ETL for XGBoost Spark with GPU

Mortgage is an example of xgboost classifier to do binary classification. This notebook will show you how to load data, train the xgboost model and use this model to predict if a mushroom is "poisonous". Camparing to original XGBoost Spark code, there're only one API difference.

## Load libraries
First load some common libraries will be used by both GPU version and CPU version xgboost.

In [None]:
%spark
import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassifier, XGBoostClassificationModel}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructField, StructType}
import org.apache.spark.SparkConf

In [None]:
%spark
import  org.apache.spark.sql.types
import  org.apache.spark.sql.functions

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.classification.RandomForestClassifier 
import  org.apache.spark.ml.feature.{StringIndexer, VectorIndexer, MinMaxScaler, SQLTransformer, Normalizer}
import  org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit, CrossValidator}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

import org.apache.spark.storage.StorageLevel


Besides CPU version requires some extra libraries, such as:

```scala
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.FloatType
```

## Build the schema and parameters
The mortgage data has 27 columns: 26 features and 1 label. "deinquency_12" is the label column. The schema will be used to load data in the future.

The next block also defines some key parameters used in xgboost training process.

In [None]:

val IS_CPU = false

val IS_40M = true
val IS_FULL = true

val IS_HDFS = true

var folder_name = "gb1.6.parquet" //"gb0.8.parquet" //"gb400m.parquet"

In [None]:
// Build the spark session and data reader as usual
val conf = new SparkConf()
conf.set("spark.executor.instances", "15") // ""20")
conf.set("spark.executor.cores", "7") // 7 
conf.set("spark.task.cpus", "7") // 4 but cannot change this here; The number of cores per executor (=4) has to be >= the task config: spark.task.cpus = 7 when run on yarn
conf.set("spark.executor.memory", "14g") //24g
conf.set("spark.rapids.memory.pinnedPool.size", "2G")
conf.set("spark.executor.memoryOverhead", "16G")
conf.set("spark.executor.extraJavaOptions", "-Dai.rapids.cudf.prefer-pinned=true")
conf.set("spark.locality.wait", "0s")
conf.set("spark.sql.files.maxPartitionBytes", "512m")
conf.set("spark.executor.resource.gpu.amount", "1")
conf.set("spark.task.resource.gpu.amount", "1")
conf.set("spark.plugins", "com.nvidia.spark.SQLPlugin")
conf.set("spark.rapids.sql.hasNans", "false")
conf.set("spark.rapids.sql.batchSizeBytes", "512M")
conf.set("spark.rapids.sql.reader.batchSizeBytes", "768M")
conf.set("spark.rapids.sql.variableFloatAgg.enabled", "true")
conf.set("spark.rapids.memory.gpu.pooling.enabled", "false")
// conf.set("spark.rapids.memory.gpu.allocFraction", "0.1")
val spark = SparkSession.builder.appName("waze-gpu")
                               .enableHiveSupport()
                               .config(conf)
                               .getOrCreate


In [None]:

val labelColName = "trueLabel"

val schema = StructType(List(
  StructField("location_x", DoubleType),
  StructField("location_y", DoubleType),
  StructField("sin_weekday", DoubleType),
  StructField("cos_weekday", DoubleType),
  StructField("sin_month", DoubleType),
  StructField("cos_month", DoubleType),
  StructField("sin_day", DoubleType), 
  StructField("cos_day", DoubleType),
  StructField("sin_hour", DoubleType),
  StructField("cos_hour", DoubleType),
  StructField("sin_min", DoubleType),
  StructField("cos_min", DoubleType),
  StructField("sin_sec", DoubleType),
  StructField("cos_sec", DoubleType),
  StructField("is_rush", IntegerType),
  StructField("is_weekend", IntegerType),
  StructField("is_holiday", IntegerType),
  StructField("level", IntegerType),
  StructField(labelColName, IntegerType)))

val featureNames = schema.filter(_.name != labelColName).map(_.name)

## Create a new spark session and load data

A new spark session should be created to continue all the following spark operations.

NOTE: in this notebook, the dependency jars have been loaded when installing toree kernel. Alternatively the jars can be loaded into notebook by [%AddJar magic](https://toree.incubator.apache.org/docs/current/user/faq/). However, there's one restriction for `%AddJar`: the jar uploaded can only be available when `AddJar` is called just after a new spark session is created. Do it as below:

```scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("mortgage-GPU").getOrCreate
%AddJar file:/data/libs/cudf-XXX-cuda10.jar
%AddJar file:/data/libs/rapids-4-spark-XXX.jar
%AddJar file:/data/libs/xgboost4j_3.0-XXX.jar
%AddJar file:/data/libs/xgboost4j-spark_3.0-XXX.jar
// ...
```

##### Please note the new jar "rapids-4-spark-XXX.jar" is only needed for GPU version, you can not add it to dependence list for CPU version.

In [None]:
// null error because of total 2+26 columns: remove product_type from above schema
//val reader = spark.read.option("header", true).schema(schema)
val reader = spark.read.option("header", true) //.schema(schema)

// total 2+25 columns & fit() error below: product_type does not exist
// val reader = spark.read.option("header", true).option("inferSchema" , "true")

In [None]:
val modelPath = "gs://hdp-240/waze/model/"
var file_location = "gs://hdp-240/waze/" + folder_name

/*
if (IS_40M){
    file_location = "gs://hdp-240/waze/gb400m.parquet"
    // val file_location = "gs://hdp-240/waze/gb40m.parquet"
} else {
    //val file_location = "/user/jwoo5/waze/gb1.6.parquet"
    //file_location = "gs://hdp-240/waze/gb1.6.parquet"
    file_location = "gs://hdp-240/waze/gb0.8.parquet"
}
*/
val entireData = reader.parquet(file_location)


In [None]:
val temp_table_name = "jampredictclean_1m_100mb_csv"
entireData.createOrReplaceTempView(temp_table_name)

In [None]:
entireData.printSchema()

In [None]:
//entireData.show(20)

In [None]:
/*
val jam_int_weekday = spark.sql("""UPDATE jampredictclean_1m_100mb_csv SET weekday =
  |CASE 
  |    WHEN weekday = 'Monday' THEN 0 
  |    WHEN weekday = 'Tuesday' THEN 1
  |    WHEN weekday = 'Wednesday' THEN 2 
  |    WHEN weekday = 'Thursday' THEN 3 
  |    WHEN weekday = 'Friday' THEN 4 
  |    WHEN weekday = 'Saturday' THEN 5 
  |    WHEN weekday = 'Sunday' THEN 6 
  |END
 |FROM jampredictclean_1m_100mb_csv""")
 */
 
 val jam_int_weekday = entireData.withColumn("weekday", when(col("weekday") === "Monday","0")
      .when(col("weekday") === "Tuesday","1")
      .when(col("weekday") === "Wednesday","2")
      .when(col("weekday") === "Thursday","3")
      .when(col("weekday") === "Friday","4")
      .when(col("weekday") === "Saturday","5")
      .when(col("weekday") === "Sunday","6"))
 

In [None]:
//jam_int_weekday.show(20)

In [None]:
val jam_temp_table_name = "jam_int_weekday"
jam_int_weekday.createOrReplaceTempView(jam_temp_table_name)

In [None]:
val holidays = spark.read.option("header", true).option("inferSchema" , true).csv("gs://hdp-240/waze/holidays_2018.csv")
//val holidays = spark.read.csv("gs://hdp-240/waze/holidays_2018.csv", inferSchema=True, header=True)



In [None]:
val joinCsv = entireData.join(holidays, (entireData("month") === holidays("month")) && (entireData("day") === holidays("day")), "left").select(entireData("location_x"), entireData("location_y"),entireData("pub_millis"),  entireData("level"), entireData("speed"), entireData("pub_date"), entireData("date_pst"), entireData("month"), entireData("day"), entireData("hour"), entireData("min"), entireData("sec"), entireData("weekday"), holidays("Comments"))


In [None]:
// Create a view or table: jwoo5
val temp_table_name_holy = "holidays_2018_csv"
joinCsv.createOrReplaceTempView(temp_table_name_holy)

In [None]:
joinCsv.show(3)

In [None]:
/*
val holy_int_weekday = spark.sql("""UPDATE t2 SET 
  |CASE 
  |    WHEN t2.weekday = 'Monday' THEN 0 
  |    WHEN t2.weekday = 'Tuesday' THEN 1
  |    WHEN t2.weekday = 'Wednesday' THEN 2 
  |    WHEN t2.weekday = 'Thursday' THEN 3 
  |    WHEN t2.weekday = 'Friday' THEN 4 
  |    WHEN t2.weekday = 'Saturday' THEN 5 
  |    WHEN t2.weekday = 'Sunday' THEN 6 
  |END AS weekday, 
 |FROM holidays_2018_csv t2""")
 */
 
  val holy_int_weekday = joinCsv.withColumn("weekday", when(col("weekday") === "Monday","0")
      .when(col("weekday") === "Tuesday","1")
      .when(col("weekday") === "Wednesday","2")
      .when(col("weekday") === "Thursday","3")
      .when(col("weekday") === "Friday","4")
      .when(col("weekday") === "Saturday","5")
      .when(col("weekday") === "Sunday","6"))

In [None]:
holy_int_weekday.show(10)

In [None]:
val holy_temp_table_name = "holy_int_weekday"
holy_int_weekday.createOrReplaceTempView(holy_temp_table_name)

In [None]:
//holy_int_weekday.show(20)

In [None]:
/*
val csv = spark.sql("""SELECT t1.location_x, t1.location_y, t1.pub_millis, t1.level, t1.speed, t1.pub_date, t1.date_pst, t1.month, t1.day, t1.hour, t1.min, t1.sec, 
  |CASE 
  |    WHEN t1.month = t2.month and t1.day = t2.day THEN 1 
  |    ELSE 0
  |END AS is_holiday 
 |FROM holy_int_weekday t2 
 |RIGHT JOIN jam_int_weekday t1  
 |on t1.month = t2.month and t1.day = t2.day""")
 */
 
val csv = jam_int_weekday.join(holy_int_weekday, jam_int_weekday("month") === holy_int_weekday("month") && jam_int_weekday("day") === holy_int_weekday("day"), "leftsemi").select(jam_int_weekday("location_x"), jam_int_weekday("location_y"), jam_int_weekday("pub_millis"), jam_int_weekday("level"), jam_int_weekday("speed"), jam_int_weekday("pub_date"), jam_int_weekday("date_pst"), jam_int_weekday("month"), jam_int_weekday("day"), jam_int_weekday("hour"), jam_int_weekday("min"),  jam_int_weekday("sec"), jam_int_weekday("weekday")).withColumn("is_holiday", lit(1))



In [None]:
val csv_neg = jam_int_weekday.join(holy_int_weekday, (jam_int_weekday("month") !== holy_int_weekday("month")) || (jam_int_weekday("day") !== holy_int_weekday("day")), "leftsemi").select(jam_int_weekday("location_x"), jam_int_weekday("location_y"), jam_int_weekday("pub_millis"), jam_int_weekday("level"), jam_int_weekday("speed"), jam_int_weekday("pub_date"), jam_int_weekday("date_pst"), jam_int_weekday("month"), jam_int_weekday("day"), jam_int_weekday("hour"), jam_int_weekday("min"), jam_int_weekday("sec"), jam_int_weekday("weekday")).withColumn("is_holiday", lit(0))

In [None]:
val csv_all = csv.union(csv_neg)

In [None]:
csv_all.filter("is_holiday==0").show(3)

In [None]:
csv_all.filter("is_holiday==1").show(3)

In [None]:
val data = csv_all.select((col("location_x").cast("Double")),(col("location_y").cast("Double")), (col("pub_millis").cast("Float")), (col("month").cast("Integer")), (col("day").cast("Integer")), (col("hour").cast("Integer")), (col("min").cast("Integer")), (col("sec").cast("Integer")),(col("weekday").cast("Integer")),(col("is_holiday").cast("Integer")), (col("level").cast("Integer")),((col("level") > 2).cast("Integer").alias("label")))



In [None]:
data.show(5)

In [None]:
val sqlTrans = new SQLTransformer().setStatement("""SELECT location_x, location_y, SIN((weekday)*(2*PI()/7)) as sin_weekday, COS((weekday)*(2*PI()/7)) as cos_weekday, SIN((month-1)*(2*PI()/12)) as sin_month, COS((month-1)*(2*PI()/12)) as cos_month, SIN((day-1)*(2*PI()/31)) as sin_day, COS((day-1)*(2*PI()/31)) as cos_day, SIN(hour*(2*PI()/24)) as sin_hour, COS(hour*(2*PI()/24)) as cos_hour, SIN(min*(2*PI()/60)) as sin_min, COS(min*(2*PI()/60)) as cos_min , SIN(sec*(2*PI()/60)) as sin_sec, COS(sec*(2*PI()/60)) as cos_sec,
   |CASE 
   | WHEN (hour+min/60) >= 7 and (hour+min/60) <= 9 THEN 1 
   | WHEN (hour+min/60) >= 15 and (hour+min/60) <= 18 THEN 1 
   | ELSE 0 
   |END AS is_rush, 
   |CASE 
   |WHEN weekday >5 THEN 1
   |ELSE 0
  |END AS is_weekend, 
|is_holiday, level, label FROM __THIS__""")

val dataTrans = sqlTrans.transform(data)


## Insert the data set to the storage

In [None]:
var pwd_folder = ""
if (IS_HDFS){
    pwd_folder = "/user/jwoo5/"
} else {
    pwd_folder = "hdp-240/"
}
val bucket_clean = pwd_folder + "waze_clean/"
var loc_size = folder_name // "/gb400m.parquet"

/*
if(IS_40M){
    //loc_size = "/gb40m.parquet"
    loc_size = "/gb400m.parquet"
} else {
    //loc_size = "/gb1.6.parquet"
    loc_size = "/gb0.8.parquet"
}
*/

var full_path = ""
/*
var splits = if (!IS_FULL) dataTrans.randomSplit(Array(0.75, 0.25), seed=1234L) 
var train  = if (!IS_FULL) splits(0).withColumnRenamed("label", "trueLabel")
var test  = if (!IS_FULL) spark.emptyDataFrame splits(1).withColumnRenamed("label", "trueLabel")
*/
var train_path = ""
var test_path = ""

var start = System.nanoTime

/*
if(IS_FULL){
	full_path = "gs://"+ bucket_clean+loc_size+"/full/"
    val full_writer = dataTrans.write.option("header", true) //.schema(schema)
    full_writer.format("parquet").mode("overwrite").save(full_path)
} else {
    var splits = dataTrans.randomSplit(Array(0.75, 0.25), seed=1234L)
    var train = splits(0).withColumnRenamed("label", "trueLabel")
    var test = splits(1).withColumnRenamed("label", "trueLabel")
    train_path = "gs://"+ bucket_clean+ loc_size+"/train/"
	test_path = "gs://"+ bucket_clean+loc_size+"/test/"
	
    // Train data: df.write.format(source).mode("overwrite").save(path)
    val train_writer = train.write.option("header", true) //.schema(schema)
    train_writer.format("parquet").mode("overwrite").save(train_path)
    
    var end = System.nanoTime
    print(end - start)
    // Test data
    val test_writer = test.write.option("header", true) //.schema(schema)
    test_writer.format("parquet").mode("overwrite").save(test_path)

    end = System.nanoTime
    print(end - start)
}							

*/

In [None]:
if (IS_HDFS){
    full_path = "hdfs://"+ bucket_clean+loc_size+"/full/"
} else {
    full_path = "gs://"+ bucket_clean+loc_size+"/full/"
    
}
val full_writer = dataTrans.write.option("header", true) //.schema(schema)
full_writer.format("parquet").mode("overwrite").save(full_path)

In [None]:
dataTrans.printSchema()

In [None]:
//dataTrans.show(5)

In [None]:
// Now lets actually process the data\n",
var start = System.nanoTime
/*
var full_writer = null //.schema(schema)
var train_writer = null
var test_writer = null

if(IS_FULL){
    val full_writer = dataTrans.write.option("header", true) //.schema(schema)
    full_writer.format("parquet").mode("overwrite").save(full_path)
    
} 
*/