# Introduction to XGBoost Spark with GPU

The goal of this notebook is to show how to train a XGBoost Model with Spark RAPIDS XGBoost library on GPUs. The dataset used with this notebook is derived from Fannie Mae’s Single-Family Loan Performance Data with all rights reserved by Fannie Mae. This processed dataset is redistributed with permission and consent from Fannie Mae. This notebook uses XGBoost to train 12-month mortgage loan delinquency prediction model. 

Note: The demo is derived from Spark-Rapids-Example repo. For PySpark based XGBoost, please refer to the Spark-RAPIDS-examples 22.04 branch that uses NVIDIA’s Spark XGBoost version. 

## Load libraries
First load some common libraries will be used by both GPU version and CPU version xgboost.

In [1]:
import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassifier, XGBoostClassificationModel}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructField, StructType}

Besides CPU version requires some extra libraries, such as:

```scala
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.FloatType
```

## Set the dataset path

In [2]:
// You need to update them to your real paths! The input data files can be the output of mortgage-etl jobs, or you can
// just use the provided sample datasets upder datasets path. 
val dataRoot = sys.env.getOrElse("DATA_ROOT", "/data")
val trainPath = dataRoot + "/mortgage/csv/train/"
val evalPath  = dataRoot + "/mortgage/csv/test/"
val transPath = dataRoot + "/mortgage/csv/test/"

dataRoot = /data
trainPath = /data/mortgage/csv/train/
evalPath = /data/mortgage/csv/test/
transPath = /data/mortgage/csv/test/


/data/mortgage/csv/test/

## Build the schema and parameters
The mortgage data has 27 columns: 26 features and 1 label. "deinquency_12" is the label column. The schema will be used to load data in the future.

The next block also defines some key parameters used in xgboost training process.

In [3]:
val labelColName = "delinquency_12"
val schema = StructType(List(
  StructField("orig_channel", DoubleType),
  StructField("first_home_buyer", DoubleType),
  StructField("loan_purpose", DoubleType),
  StructField("property_type", DoubleType),
  StructField("occupancy_status", DoubleType),
  StructField("property_state", DoubleType),
  StructField("product_type", DoubleType),
  StructField("relocation_mortgage_indicator", DoubleType),
  StructField("seller_name", DoubleType),
  StructField("mod_flag", DoubleType),
  StructField("orig_interest_rate", DoubleType),
  StructField("orig_upb", IntegerType),
  StructField("orig_loan_term", IntegerType),
  StructField("orig_ltv", DoubleType),
  StructField("orig_cltv", DoubleType),
  StructField("num_borrowers", DoubleType),
  StructField("dti", DoubleType),
  StructField("borrower_credit_score", DoubleType),
  StructField("num_units", IntegerType),
  StructField("zip", IntegerType),
  StructField("mortgage_insurance_percent", DoubleType),
  StructField("current_loan_delinquency_status", IntegerType),
  StructField("current_actual_upb", DoubleType),
  StructField("interest_rate", DoubleType),
  StructField("loan_age", DoubleType),
  StructField("msa", DoubleType),
  StructField("non_interest_bearing_upb", DoubleType),
  StructField(labelColName, IntegerType)))

val featureNames = schema.filter(_.name != labelColName).map(_.name).toArray

val commParamMap = Map(
  "objective" -> "binary:logistic",
  "num_round" -> 100)

labelColName = delinquency_12
schema = StructType(StructField(orig_channel,DoubleType,true), StructField(first_home_buyer,DoubleType,true), StructField(loan_purpose,DoubleType,true), StructField(property_type,DoubleType,true), StructField(occupancy_status,DoubleType,true), StructField(property_state,DoubleType,true), StructField(product_type,DoubleType,true), StructField(relocation_mortgage_indicator,DoubleType,true), StructField(seller_name,DoubleType,true), StructField(mod_flag,DoubleType,true), StructField(orig_interest_rate,DoubleType,true), StructField(orig_upb,IntegerType,true), StructField(orig_loan_term,IntegerType,true), StructField(orig_ltv,DoubleType,true), StructField(orig_cltv,DoubleType,true), StructField(num_borrowers,DoubleT...


StructType(StructField(orig_channel,DoubleType,true), StructField(first_home_buyer,DoubleType,true), StructField(loan_purpose,DoubleType,true), StructField(property_type,DoubleType,true), StructField(occupancy_status,DoubleType,true), StructField(property_state,DoubleType,true), StructField(product_type,DoubleType,true), StructField(relocation_mortgage_indicator,DoubleType,true), StructField(seller_name,DoubleType,true), StructField(mod_flag,DoubleType,true), StructField(orig_interest_rate,DoubleType,true), StructField(orig_upb,IntegerType,true), StructField(orig_loan_term,IntegerType,true), StructField(orig_ltv,DoubleType,true), StructField(orig_cltv,DoubleType,true), StructField(num_borrowers,DoubleT...

## Create a new spark session and load data

A new spark session should be created to continue all the following spark operations.

NOTE: in this notebook, the dependency jars have been loaded when installing toree kernel. Alternatively the jars can be loaded into notebook by [%AddJar magic](https://toree.incubator.apache.org/docs/current/user/faq/). However, there's one restriction for `%AddJar`: the jar uploaded can only be available when `AddJar` is called just after a new spark session is created. Do it as below:

```scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("mortgage-GPU").getOrCreate
%AddJar file:/data/libs/rapids-4-spark-XXX.jar
%AddJar file:/data/libs/xgboost4j-spark-gpu_2.12-XXX.jar
%AddJar file:/data/libs/xgboost4j-gpu_2.12-XXX.jar
// ...
```

##### Please note the new jar "rapids-4-spark-XXX.jar" is only needed for GPU version, you can not add it to dependence list for CPU version.

In [4]:
// Build the spark session and data reader as usual
val sparkSession = SparkSession.builder.appName("mortgage-gpu").getOrCreate
val reader = sparkSession.read.option("header", true).schema(schema)

sparkSession = org.apache.spark.sql.SparkSession@26420dda
reader = org.apache.spark.sql.DataFrameReader@77740a8c


org.apache.spark.sql.DataFrameReader@77740a8c

In [5]:
// Please make sure to change the api to reader.parquet if you load parquet files.
val trainSet = reader.csv(trainPath)
val evalSet  = reader.csv(evalPath)
val transSet = reader.csv(transPath)

trainSet = [orig_channel: double, first_home_buyer: double ... 26 more fields]
evalSet = [orig_channel: double, first_home_buyer: double ... 26 more fields]
transSet = [orig_channel: double, first_home_buyer: double ... 26 more fields]


[orig_channel: double, first_home_buyer: double ... 26 more fields]

## Set xgboost parameters and build a XGBoostClassifier

For CPU version, `num_workers` is recommended being equal to the number of CPU cores, while for GPU version, it should be set to the number of GPUs in Spark cluster.

Besides the `tree_method` for CPU version is also different from that for GPU version. Now only "gpu_hist" is supported for training on GPU.

```scala
// difference in parameters
  "num_workers" -> 12,
  "tree_method" -> "hist",
```

In [6]:
val xgbParamFinal = commParamMap ++ Map("tree_method" -> "gpu_hist", "num_workers" -> 1)

xgbParamFinal = Map(objective -> binary:logistic, num_round -> 100, tree_method -> gpu_hist, num_workers -> 1)


Map(objective -> binary:logistic, num_round -> 100, tree_method -> gpu_hist, num_workers -> 1)

In [7]:
val xgbClassifier = new XGBoostClassifier(xgbParamFinal)
      .setLabelCol(labelColName)
      .setFeaturesCol(featureNames)

xgbClassifier = xgbc_ecac6474dbb2


xgbc_ecac6474dbb2

## Benchmark and train
The object `benchmark` is used to compute the elapsed time of some operations.

Training with evaluation sets is also supported in 2 ways, the same as CPU version's behavior:

* Call API `setEvalSets` after initializing an XGBoostClassifier

```scala
xgbClassifier.setEvalSets(Map("eval" -> evalSet))

```

* Use parameter `eval_sets` when initializing an XGBoostClassifier

```scala
val paramMapWithEval = paramMap + ("eval_sets" -> Map("eval" -> evalSet))
val xgbClassifierWithEval = new XGBoostClassifier(paramMapWithEval)
```

Here chooses the API way to set evaluation sets.

In [8]:
xgbClassifier.setEvalSets(Map("eval" -> evalSet))

xgbc_ecac6474dbb2

In [9]:
object Benchmark {
  def time[R](phase: String)(block: => R): (R, Float) = {
    val t0 = System.currentTimeMillis
    val result = block // call-by-name
    val t1 = System.currentTimeMillis
    println("Elapsed time [" + phase + "]: " + ((t1 - t0).toFloat / 1000) + "s")
    (result, (t1 - t0).toFloat / 1000)
  }
}

defined object Benchmark


In [10]:
// Start training
println("\n------ Training ------")
val (xgbClassificationModel, _) = Benchmark.time("train") {
  xgbClassifier.fit(trainSet)
}


------ Training ------
Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=10.19.183.210, DMLC_TRACKER_PORT=38315, DMLC_NUM_WORKER=1}


xgbClassificationModel = xgbc_ecac6474dbb2


Elapsed time [train]: 8.083s


xgbc_ecac6474dbb2

## Transformation and evaluation
Here uses `transSet` to evaluate our model and prints some useful columns to show our prediction result. After that `MulticlassClassificationEvaluator` is used to calculate an overall accuracy of our predictions.

In [11]:
println("\n------ Transforming ------")
val (results, _) = Benchmark.time("transform") {
  val ret = xgbClassificationModel.transform(transSet).cache()
  ret.foreachPartition((_: Iterator[_]) => ())
  ret
}
results.select("orig_channel", labelColName,"rawPrediction","probability","prediction").show(10)

println("\n------Accuracy of Evaluation------")
val evaluator = new MulticlassClassificationEvaluator().setLabelCol(labelColName)
val accuracy = evaluator.evaluate(results)
println(accuracy)


------ Transforming ------
Elapsed time [transform]: 1.916s
+------------+--------------+--------------------+--------------------+----------+
|orig_channel|delinquency_12|       rawPrediction|         probability|prediction|
+------------+--------------+--------------------+--------------------+----------+
|         0.0|             0|[7.57764625549316...|[0.99948849738575...|       0.0|
|         0.0|             0|[8.74893283843994...|[0.99984139463049...|       0.0|
|         0.0|             0|[8.74893283843994...|[0.99984139463049...|       0.0|
|         0.0|             0|[8.74893283843994...|[0.99984139463049...|       0.0|
|         0.0|             0|[7.57764625549316...|[0.99948849738575...|       0.0|
|         0.0|             0|[7.57764625549316...|[0.99948849738575...|       0.0|
|         0.0|             0|[7.57764625549316...|[0.99948849738575...|       0.0|
|         0.0|             0|[6.58476591110229...|[0.99862065445631...|       0.0|
|         0.0|            

results = [orig_channel: double, first_home_buyer: double ... 29 more fields]
evaluator = MulticlassClassificationEvaluator: uid=mcEval_d9645b60a007, metricName=f1, metricLabel=0.0, beta=1.0, eps=1.0E-15
accuracy = 1.0


1.0

## Save the model to disk and load model
Save the model to disk and then load it to memory. After that use the loaded model to do a new prediction.

In [12]:
xgbClassificationModel.write.overwrite.save(dataRoot + "/model/mortgage")

val modelFromDisk = XGBoostClassificationModel.load(dataRoot + "/model/mortgage")

val (results2, _) = Benchmark.time("transform2") {
  modelFromDisk.transform(transSet)
}
results2.show(10)

Elapsed time [transform2]: 0.044s


modelFromDisk = xgbc_ecac6474dbb2
results2 = [orig_channel: double, first_home_buyer: double ... 29 more fields]


+------------+----------------+------------+-------------+----------------+--------------+------------+-----------------------------+-----------+--------+------------------+--------+--------------+--------+---------+-------------+----+---------------------+---------+---+--------------------------+-------------------------------+------------------+-------------+--------+-------+------------------------+--------------+--------------------+--------------------+----------+
|orig_channel|first_home_buyer|loan_purpose|property_type|occupancy_status|property_state|product_type|relocation_mortgage_indicator|seller_name|mod_flag|orig_interest_rate|orig_upb|orig_loan_term|orig_ltv|orig_cltv|num_borrowers| dti|borrower_credit_score|num_units|zip|mortgage_insurance_percent|current_loan_delinquency_status|current_actual_upb|interest_rate|loan_age|    msa|non_interest_bearing_upb|delinquency_12|       rawPrediction|         probability|prediction|
+------------+----------------+------------+--------

[orig_channel: double, first_home_buyer: double ... 29 more fields]

In [13]:
sparkSession.close()