# Introduction to XGBoost Spark with GPU

Taxi is an example of XGBoost regressor. This notebook will show you how to load data, train the XGBoost model and use this model to predict "fare_amount" of your taxi trip.

# Reproducing  [6404](https://github.com/NVIDIA/spark-rapids/issues/6404)

Install spylon kernel https://github.com/vericast/spylon-kernel

Start Jupyter in spark-rapids-examples root
```Bash
SPARK_HOME=~/dist/spark-3.1.1-bin-hadoop3.2 DATA_ROOT=$PWD/datasets jupyter notebook
```

## Load libraries
First load some common libraries will be used by both GPU version and CPU version XGBoost.


There choose exactly one of the following init_spark cells under working/broken.

If you want to try another init_spark cell, go to "Kernel -> Restart & Clear Output" before proceeding 

 ### Working config for rapids-4-spark 21.08 (before shim rework) relying on static extraClassPath
 This is equivalent to moving jars unders $SPARK_HOME/jars but better because the SPARK_HOME remains immutable
 
 But it's still worse than using --jars / --packages, since as far as extraClassPath for executors ["Users typically should not need to set
    this option"](https://github.com/apache/spark/blob/25759a0de6dd09ecc440d009fba6d661558e7261/docs/configuration.md?plain=1#L561) 

In [None]:
%%init_spark
import os

m2dir = os.path.expanduser('~/.m2/repository')

xgboost_v = '1.6.1'
spark_rapids_v = '21.08.0'
cudf_v = '21.08.2'

launcher.conf.spark.driver.extraClassPath = [
    f"{m2dir}/ml/dmlc/xgboost4j-gpu_2.12/{xgboost_v}/xgboost4j-gpu_2.12-{xgboost_v}.jar",
    f"{m2dir}/ml/dmlc/xgboost4j-spark-gpu_2.12/{xgboost_v}/xgboost4j-spark-gpu_2.12-{xgboost_v}.jar",
    f"{m2dir}/com/nvidia/rapids-4-spark_2.12/{spark_rapids_v}/rapids-4-spark_2.12-{spark_rapids_v}.jar",
    f"{m2dir}/ai/rapids/cudf/{cudf_v}/cudf-{cudf_v}-cuda11.jar"
]
launcher.conf.spark.plugins = "com.nvidia.spark.SQLPlugin"

 ### Broken config for rapids-4-spark 21.08 (before shim rework) relying on --jars

In [None]:
%%init_spark
import os

m2dir = os.path.expanduser('~/.m2/repository')

xgboost_v = '1.6.1'
spark_rapids_v = '21.08.0'
cudf_v = '21.08.2'

launcher.jars = [
    f"{m2dir}/ml/dmlc/xgboost4j-gpu_2.12/{xgboost_v}/xgboost4j-gpu_2.12-{xgboost_v}.jar",
    f"{m2dir}/ml/dmlc/xgboost4j-spark-gpu_2.12/{xgboost_v}/xgboost4j-spark-gpu_2.12-{xgboost_v}.jar",
    f"{m2dir}/com/nvidia/rapids-4-spark_2.12/{spark_rapids_v}/rapids-4-spark_2.12-{spark_rapids_v}.jar",
    f"{m2dir}/ai/rapids/cudf/{cudf_v}/cudf-{cudf_v}-cuda11.jar"
]
launcher.conf.spark.plugins = "com.nvidia.spark.SQLPlugin"

 ### Broken config for rapids-4-spark 21.08 (before shim rework) relying on --packages

In [None]:
%%init_spark
import os

m2dir = os.path.expanduser('~/.m2/repository')

xgboost_v = '1.6.1'
spark_rapids_v = '21.08.0'
cudf_v = '21.08.2'

launcher.repositories = [
    "https://oss.sonatype.org/content/repositories/releases"
]

launcher.packages = [
    f"ml.dmlc:xgboost4j-spark-gpu_2.12:{xgboost_v}",
    f"com.nvidia:rapids-4-spark_2.12:{spark_rapids_v}",
    f"ai.rapids:cudf:{cudf_v}"
]
launcher.conf.spark.plugins = "com.nvidia.spark.SQLPlugin"

### Working config for rapids-4-spark 22.08 relying on static extraClassPath and default force.caller.classloader=true

it works "almost by accident" because XGBoost4j 1.6.1 does not even compile against 21.10+. But force.caller.classloader=true makes it possible for xgboost4j to access GpuColumnVector at runtime 

In [None]:
%%init_spark
import os

m2dir = os.path.expanduser('~/.m2/repository')

xgboost_v = '1.6.1'
spark_rapids_v = '22.08.0'


launcher.conf.spark.driver.extraClassPath = [
    f"{m2dir}/ml/dmlc/xgboost4j-gpu_2.12/{xgboost_v}/xgboost4j-gpu_2.12-{xgboost_v}.jar",
    f"{m2dir}/ml/dmlc/xgboost4j-spark-gpu_2.12/{xgboost_v}/xgboost4j-spark-gpu_2.12-{xgboost_v}.jar",
    f"{m2dir}/com/nvidia/rapids-4-spark_2.12/{spark_rapids_v}/rapids-4-spark_2.12-{spark_rapids_v}.jar"
]
launcher.conf.spark.plugins = "com.nvidia.spark.SQLPlugin"

### Broken config for rapids-4-spark 22.08 relying on static extraClassPath but changing force.caller.classloader=false

In [None]:
%%init_spark
import os

m2dir = os.path.expanduser('~/.m2/repository')

xgboost_v = '1.6.1'
spark_rapids_v = '22.08.0'


launcher.conf.spark.driver.extraClassPath = [
    f"{m2dir}/ml/dmlc/xgboost4j-gpu_2.12/{xgboost_v}/xgboost4j-gpu_2.12-{xgboost_v}.jar",
    f"{m2dir}/ml/dmlc/xgboost4j-spark-gpu_2.12/{xgboost_v}/xgboost4j-spark-gpu_2.12-{xgboost_v}.jar",
    f"{m2dir}/com/nvidia/rapids-4-spark_2.12/{spark_rapids_v}/rapids-4-spark_2.12-{spark_rapids_v}.jar"
]
launcher.conf.spark.plugins = "com.nvidia.spark.SQLPlugin"
launcher.conf.set("spark.rapids.force.caller.classloader", False)

### Hence, rapids-4-spark 22.10.0-SNAPSHOT implementing force.caller.classloader=false as the only call path is now broken

In [None]:
%%init_spark
import os

m2dir = os.path.expanduser('~/.m2/repository')

xgboost_v = '1.6.1'
spark_rapids_v = '22.10.0-SNAPSHOT'
launcher.conf.spark.driver.extraClassPath = [
    f"{m2dir}/ml/dmlc/xgboost4j-gpu_2.12/{xgboost_v}/xgboost4j-gpu_2.12-{xgboost_v}.jar",
    f"{m2dir}/ml/dmlc/xgboost4j-spark-gpu_2.12/{xgboost_v}/xgboost4j-spark-gpu_2.12-{xgboost_v}.jar",
    f"{m2dir}/com/nvidia/rapids-4-spark_2.12/{spark_rapids_v}/rapids-4-spark_2.12-{spark_rapids_v}.jar"
]
launcher.conf.spark.plugins = "com.nvidia.spark.SQLPlugin"

# Main Notebook Logic

In [None]:
import ml.dmlc.xgboost4j.scala.spark.{XGBoostRegressor, XGBoostRegressionModel}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructField, StructType}

Besides CPU version requires some extra libraries, such as:

```scala
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.FloatType
```

## Set the dataset path

In [None]:
// You need to update them to your real paths! The input data files can be the output of taxi-etl jobs, or you can
// just use the provided sample datasets upder datasets path. 
val dataRoot = sys.env.getOrElse("DATA_ROOT", "/data")
val trainPath = dataRoot + "/taxi/csv/train/"
val evalPath  = dataRoot + "/taxi/csv/test/"
val transPath = dataRoot + "/taxi/csv/test/"

## Build the schema of the dataset
The Taxi data has 16 columns: 15 features and 1 label. "fare_amount" is the label column. The schema will be used to load data in the future. 

The next block also defines some key parameters used in XGBoost training process.

In [None]:
val labelName = "fare_amount"
lazy val schema =
  StructType(Array(
    StructField("vendor_id", DoubleType),
    StructField("passenger_count", DoubleType),
    StructField("trip_distance", DoubleType),
    StructField("pickup_longitude", DoubleType),
    StructField("pickup_latitude", DoubleType),
    StructField("rate_code", DoubleType),
    StructField("store_and_fwd", DoubleType),
    StructField("dropoff_longitude", DoubleType),
    StructField("dropoff_latitude", DoubleType),
    StructField(labelName, DoubleType),
    StructField("hour", DoubleType),
    StructField("year", IntegerType),
    StructField("month", IntegerType),
    StructField("day", DoubleType),
    StructField("day_of_week", DoubleType),
    StructField("is_weekend", DoubleType)
  ))

val featureNames = schema.filter(_.name != labelName).map(_.name).toArray

lazy val paramMap = Map(
  "num_round" -> 100
)

## Create a new spark session and load data

A new spark session should be created to continue all the following spark operations.

NOTE: in this notebook, the dependency jars have been loaded when installing toree kernel. Alternatively the jars can be loaded into notebook by [%AddJar magic](https://toree.incubator.apache.org/docs/current/user/faq/). However, there's one restriction for `%AddJar`: the jar uploaded can only be available when `AddJar` is called just after a new spark session is created. Do it as below:

```scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("taxi-GPU").getOrCreate
%AddJar file:/data/libs/rapids-4-spark-XXX.jar
%AddJar file:/data/libs/xgboost4j-spark-gpu_2.12-XXX.jar
%AddJar file:/data/libs/xgboost4j-gpu_2.12-XXX.jar
// ...
```

##### Please note the new jar "rapids-4-spark-XXX.jar" is only needed for GPU version, you can not add it to dependence list for CPU version.

In [None]:
// Build the spark session and data reader as usual
val sparkSession = SparkSession.builder().appName("taxi-GPU").getOrCreate
val reader = sparkSession.read.option("header", true).schema(schema)

In [None]:
// Please make sure to change the api to reader.parquet if you load parquet files.
val trainSet = reader.csv(trainPath)
val evalSet  = reader.csv(evalPath)
val transSet = reader.csv(transPath)

## Set XGBoost parameters and build a XGBoostRegressor

For CPU version, `num_workers` is recommended being equal to the number of CPU cores, while for GPU version, it should be set to the number of GPUs in Spark cluster.

Besides the `tree_method` for CPU version is also different from that for GPU version. Now only "gpu_hist" is supported for training on GPU.

```scala
// difference in parameters
  "num_workers" -> 12
  "tree_method" -> "hist",

In [None]:
val xgbParamFinal = paramMap ++ Map("tree_method" -> "gpu_hist", "num_workers" -> 1)

In [None]:
val xgbRegressor = new XGBoostRegressor(xgbParamFinal)
  .setLabelCol(labelName)
  .setFeaturesCol(featureNames)

## Benchmark and train
The object `benchmark` is used to compute the elapsed time of some operations.

Training with evaluation sets is also supported in 2 ways, the same as CPU version's behavior:

* Call API `setEvalSets` after initializing an XGBoostRegressor

```scala
xgbRegressor.setEvalSets(Map("eval" -> evalSet))

```

* Use parameter `eval_sets` when initializing an XGBoostRegressor

```scala
val paramMapWithEval = paramMap + ("eval_sets" -> Map("eval" -> evalSet))
val xgbRegressorWithEval = new XGBoostRegressor(paramMapWithEval)
```

Here chooses the API way to set evaluation sets.

In [None]:
xgbRegressor.setEvalSets(Map("eval" -> evalSet))

In [None]:
object Benchmark {
  def time[R](phase: String)(block: => R): (R, Float) = {
    val t0 = System.currentTimeMillis
    val result = block // call-by-name
    val t1 = System.currentTimeMillis
    println("Elapsed time [" + phase + "]: " + ((t1 - t0).toFloat / 1000) + "s")
    (result, (t1 - t0).toFloat / 1000)
  }
}

In [None]:
// start training
val (model, _) = Benchmark.time("train") {
  xgbRegressor.fit(trainSet)
}

## Transformation and evaluation
Here uses `transSet` to evaluate our model and use some key columns to show our predictions. Finally we use `RegressionEvaluator` to calculate an overall `rmse` of our predictions.

In [None]:
// start transform
val (prediction, _) = Benchmark.time("transform") {
  val ret = model.transform(transSet).cache()
  ret.foreachPartition((_: Iterator[_]) => ())
  ret
}

In [None]:
prediction.select("vendor_id", "passenger_count", "trip_distance", labelName, "prediction").show(10)
val evaluator = new RegressionEvaluator().setLabelCol(labelName)
val (rmse, _) = Benchmark.time("evaluation") {
  evaluator.evaluate(prediction)
}
println(s"RMSE == $rmse")

## Save the model to disk and load model
Save the model to disk and then load it to memory. After that use the loaded model to do a new prediction.

In [None]:
model.write.overwrite.save(dataRoot + "/model/taxi")

val modelFromDisk = XGBoostRegressionModel.load(dataRoot + "/model/taxi")
val (results2, _) = Benchmark.time("transform2") {
  modelFromDisk.transform(transSet)
}
results2.select("vendor_id", "passenger_count", "trip_distance", labelName, "prediction").show(5)