In [None]:
//this configuration is used for core nodes with 2x g4dn.12xlarge for total 8x T4 GPU
//You can also launch the cluster with 2x p3.8xlarge as core nodes, use 8 workers,  6 cores per executor, 36G executor memory & 24G Overhead memory


%%configure -f
{
    "driverMemory": "10000M",
    "driverCores": 6,
    "executorMemory": "16000M",
    "conf" : {"spark.executor.instances":8, "spark.executor.cores": 6, "spark.task.cpus": 6, "spark.yarn.maxAppAttempts": 1, "spark.yarn.executor.memoryOverhead": "16G", "spark.sql.files.maxPartitionBytes": 4294967296, "spark.dynamicAllocation.enabled": false},
    "jars" : ["https://repo1.maven.org/maven2/ai/rapids/cudf/0.9.2/cudf-0.9.2.jar",
      "https://repo1.maven.org/maven2/ai/rapids/xgboost4j-spark_2.x/1.0.0-Beta5/xgboost4j-spark_2.x-1.0.0-Beta5.jar",
      "https://repo1.maven.org/maven2/ai/rapids/xgboost4j_2.x/1.0.0-Beta5/xgboost4j_2.x-1.0.0-Beta5.jar"]
}

In [None]:
sc.listJars.foreach(println)

In [None]:
%%info

In [None]:
// import notebook source
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructField, StructType}
import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassifier, XGBoostClassificationModel}
import ml.dmlc.xgboost4j.scala.spark.rapids.{GpuDataReader, GpuDataset}


In [None]:
//For AWS us-east-1 region, please use belowing S3 bucket
//For AWS us-west-2 region, please use DATA_PREFIX = "s3://spark-xgboost-mortgage-dataset-east1/csv/"

val DATA_PREFIX = "s3://spark-xgboost-mortgage-dataset-east1/csv/"
val trainPath = DATA_PREFIX + "train/20*"
val evalPath  = DATA_PREFIX + "eval/20*"


In [None]:
sc.listJars.foreach(println)

val spark = SparkSession.builder.appName("mortgage-gpu").getOrCreate

val dataReader = new GpuDataReader(spark)

val labelColName = "delinquency_12"


In [None]:
val schema = StructType(List(
    StructField("orig_channel", DoubleType),
    StructField("first_home_buyer", DoubleType),
    StructField("loan_purpose", DoubleType),
    StructField("property_type", DoubleType),
    StructField("occupancy_status", DoubleType),
    StructField("property_state", DoubleType),
    StructField("product_type", DoubleType),
    StructField("relocation_mortgage_indicator", DoubleType),
    StructField("seller_name", DoubleType),
    StructField("mod_flag", DoubleType),
    StructField("orig_interest_rate", DoubleType),
    StructField("orig_upb", IntegerType),
    StructField("orig_loan_term", IntegerType),
    StructField("orig_ltv", DoubleType),
    StructField("orig_cltv", DoubleType),
    StructField("num_borrowers", DoubleType),
    StructField("dti", DoubleType),
    StructField("borrower_credit_score", DoubleType),
    StructField("num_units", IntegerType),
    StructField("zip", IntegerType),
    StructField("mortgage_insurance_percent", DoubleType),
    StructField("current_loan_delinquency_status", IntegerType),
    StructField("current_actual_upb", DoubleType),
    StructField("interest_rate", DoubleType),
    StructField("loan_age", DoubleType),
    StructField("msa", DoubleType),
    StructField("non_interest_bearing_upb", DoubleType),
    StructField(labelColName, IntegerType)))



In [None]:
// you can set max_depth to 8 and num_round to 100 to shorten the training time

val commParamMap = Map(
    "eta" -> 0.1,
    "gamma" -> 0.1,
    "missing" -> 0.0,
    "max_depth" -> 20,
    "max_leaves" -> 256,
    "grow_policy" -> "depthwise",
    "min_child_weight" -> 30,
    "lambda" -> 1,
    "scale_pos_weight" -> 2,
    "subsample" -> 1,
    "nthread" -> 6,
    "num_round" -> 1000,
    "num_workers" -> 8,
    "tree_method" -> "gpu_hist")


In [None]:
var (trainSet, evalSet) = {
  dataReader.option("header", true).schema(schema)
  (dataReader.csv(trainPath), dataReader.csv(evalPath))}

val featureNames = schema.filter(_.name != labelColName).map(_.name)

object Benchmark {
  def time[R](phase: String)(block: => R): (R, Float) = {
    val t0 = System.currentTimeMillis
    val result = block // call-by-name
    val t1 = System.currentTimeMillis
    println("==> Benchmark: Elapsed time for [" + phase + "]: " + ((t1 - t0).toFloat / 1000) + "s")
    (result, (t1 - t0).toFloat / 1000)
  }
}



In [None]:
val modelPath = "/tmp/model"
val xgbClassifier = new XGBoostClassifier(commParamMap).setLabelCol(labelColName).setFeaturesCols(featureNames)


In [None]:
println("\n------ Training ------")
val (model, _) = Benchmark.time("train") {
        xgbClassifier.fit(trainSet)
}


In [None]:
// Save model if modelPath exists
model.write.overwrite().save(modelPath)
val xgbClassificationModel = model


In [None]:
println("\n------ Transforming ------")
val (results, _) = Benchmark.time("transform") {
  xgbClassificationModel.transform(evalSet)
}


In [None]:
println("\n------Accuracy of Evaluation------")
val evaluator = new MulticlassClassificationEvaluator().setLabelCol(labelColName)
val accuracy = evaluator.evaluate(results)
println(accuracy)
