# Running XGBoost on Azure HDInsight
This Notebook will walk you through on the detailed steps on how to build, install, and run XGBoost on HDInsight, the managed Hadoop and Spark solution on Azure. This notebook is authored by Xiaoyong Zhu, Program Manager in HDInsight team.
![XGBoost](https://raw.githubusercontent.com/dmlc/dmlc.github.io/master/img/logo-m/xgboost.png) 

### XGBoost
XGBoost is an optimized distributed gradient boosting library designed to be highly efficient, flexible and portable. It implements machine learning algorithms under the Gradient Boosting framework. XGBoost provides a parallel tree boosting (also known as GBDT, GBM) that solve many data science problems in a fast and accurate way. The same code runs on major distributed environment (Hadoop, SGE, MPI) and can solve problems beyond billions of examples.

It is not designed as a generic Machine Learning framework; it is designed as a library very specialized in boosting tree algorithm, and is widely used from production to experimental projects.

For more details on XGBoost, please go to XGBoost [GitHub page](https://github.com/dmlc/xgboost).



### XGBoost with Spark
The following figure illustrates the new pipeline architecture with the latest XGBoost4J-Spark.
![XGBoost with Spark](https://raw.githubusercontent.com/dmlc/web-data/master/xgboost/unified_pipeline_new.png)

With XGBoost4J-Spark, users are able to use both low- and high-level memory abstraction in Spark, i.e. RDD and DataFrame/Dataset. The DataFrame/Dataset abstraction grants the user to manipulate structured datasets and utilize the built-in routines in Spark or User Defined Functions (UDF) to explore the value distribution in columns before they feed data into the machine learning phase in the pipeline. In the following example, the structured sales records can be saved in a JSON file, parsed as DataFrame through Spark's API and feed to train XGBoost model in two lines of Scala code.

### How to use this notebook
This notebook basically provides an E2E workflow with which you can:
- Building XGBoost jars
- Deploying the jars to Azure Storage
- Build a simple Boosting Tree algorithm to HDInsight using Spark Pipelines
- Tune Hyper-Parameters for a boosting Tree model
- Explain the Parameters of the model built
- Save the model to Azure Storage

### Building XGBoost from source code
The following code snippet 

- installs the required libraries for building XGBoost
- builds XGBoost using Maven
- put the compiled jars to the default storage account of the HDInsight cluster
- put the sample data to the default storage account of the HDInsight cluster

The cell below is using the %%sh magic which will execute the code below as bash scripts in the head node.

You might see something like this when building xgboost. This is expected and is part of the test case. The final test should pass.

    Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=10.0.0.15, DMLC_TRACKER_PORT=9091, DMLC_NUM_WORKER=4}
    17/08/14 22:41:34 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
    java.lang.RuntimeException: Worker exception.
            at ml.dmlc.xgboost4j.scala.spark.RabitTrackerRobustnessSuite$$anonfun$1$$anonfun$2.apply(RabitTrackerRobustnessSuite.scala:72)
            at ml.dmlc.xgboost4j.scala.spark.RabitTrackerRobustnessSuite$$anonfun$1$$anonfun$2.apply(RabitTrackerRobustnessSuite.scala:66)
            at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)


In [1]:
%%sh
sudo apt-get update
sudo apt-get install -y maven git build-essential cmake python-setuptools
git clone --recursive https://github.com/dmlc/xgboost

#builds XGBoost using Maven
cd xgboost/jvm-packages
mvn -DskipTests=true install

#put the compiled packge to shared storage
#put to root folder for simplicity
hadoop fs -put -f xgboost4j-spark/target/xgboost4j-spark-0.7.jar /
hadoop fs -put -f xgboost4j/target/xgboost4j-0.7.jar /
hadoop fs -put -f xgboost4j-example/target/xgboost4j-example-0.7.jar /


#put the sample data to shared storage
hadoop fs -put -f ..//demo/data/agaricus.txt* /

[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Build Order:
[INFO] 
[INFO] xgboost-jvm
[INFO] xgboost4j
[INFO] xgboost4j-spark
[INFO] xgboost4j-flink
[INFO] xgboost4j-example
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building xgboost-jvm 0.7
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- scalastyle-maven-plugin:0.8.0:check (checkstyle) @ xgboost-jvm ---
Processed 0 file(s)
Found 0 errors
Found 0 infos
Finished in 138 ms
[INFO] 
[INFO] --- maven-checkstyle-plugin:2.17:check (checkstyle) @ xgboost-jvm ---
[INFO] 
[INFO] --- scala-maven-plugin:3.2.2:compile (default) @ xgboost-jvm ---
[INFO] No sources to compile
[INFO] 
[INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @ xgboost-jvm ---
[INFO] No sources to compile
[INF

sudo: no tty present and no askpass program specified
sudo: no tty present and no askpass program specified
fatal: destination path 'xgboost' already exists and is not an empty directory.
Aug 17, 2017 11:39:02 PM ml.dmlc.xgboost4j.java.XGBoost crossValidation
INFO: [0]	cv-test-error:0.014439	cv-train-error:0.014431
Aug 17, 2017 11:39:02 PM ml.dmlc.xgboost4j.java.XGBoost crossValidation
INFO: [1]	cv-test-error:0.001229	cv-train-error:0.001228


### Start a Spark session with XGBoost4J-Spark library loaded
After putting the jars and the files to the Azure Storage, which is shared across all the HDInsight nodes, the next step is to start a Spark session and call the XGBoost libraries. 

In the configure cell below, first we need to load those jar files to the Spark session, so we can use XGBoost APIs in this Jupyter Notebook.

We also need to exclude a few spark jars because there are some conflicts between Livy (which is the REST API used on HDInsight to execute Spark code), and XGBoost.

In [2]:
%%configure -f
{ "jars": ["wasb:///xgboost4j-spark-0.7.jar", "wasb:///xgboost4j-0.7.jar", "wasb:///xgboost4j-example-0.7.jar"],
  "conf": {
    "spark.jars.excludes": "org.scala-lang:scala-reflect:2.11.8,org.scala-lang:scala-compiler:2.11.8,org.scala-lang:scala-library:2.11.8"
   }
}

### Import XGBoost and Spark Packages
We then import the XGBoost packages and start a Spark application

In [3]:
import ml.dmlc.xgboost4j.scala.Booster
import ml.dmlc.xgboost4j.scala.spark.XGBoost
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import ml.dmlc.xgboost4j.scala.spark.{XGBoostEstimator, XGBoost}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1502756750987_0006,spark,idle,Link,Link,✔


SparkSession available as 'spark'.
import ml.dmlc.xgboost4j.scala.spark.{XGBoostEstimator, XGBoost}

### Train a simple XGBoost model
We read data from the default BLOB storage account. The data is already put there if you run the %%sh cell above. But you can also get the data from [XGBoost repo](https://github.com/dmlc/xgboost/tree/master/demo/data).

In [4]:
// create training and testing dataframes
val inputTrainPath = "wasb:///agaricus.txt.train"
val inputTestPath = "wasb:///agaricus.txt.test"
val outputModelPath = "wasb:///XGBoostModelOutput"
val numWorkers = 4

// number of iterations
val numRound = 100

// build dataset
val trainDF = spark.sqlContext.read.format("libsvm").load(inputTrainPath)
val testDF = spark.sqlContext.read.format("libsvm").load(inputTestPath)
// start training
val paramMap = List(
  "eta" -> 0.1f,
  "max_depth" -> 6,
  "objective" -> "binary:logistic").toMap

val xgboostModel = XGBoost.trainWithDataFrame(
  trainDF, paramMap, numRound, nWorkers = numWorkers, useExternalMemory = true)

xgboostModel: ml.dmlc.xgboost4j.scala.spark.XGBoostModel = XGBoostClassificationModel_3515d47ea760

In [5]:
// construct the pipeline       
val pipeline = new Pipeline().setStages(Array(new XGBoostEstimator(Map[String, Any]("num_rounds" -> 100))))
// use the transformed dataframe as training dataset
val xgboostModelPipeLine = pipeline.fit(trainDF)

// predict with the trained model
val xgBoostModelPipelineTransform = xgboostModelPipeLine.transform(testDF)


xgBoostModelPipelineTransform.show()

+-----+--------------------+----------+
|label|            features|prediction|
+-----+--------------------+----------+
|  0.0|(126,[0,8,18,20,2...|  0.350358|
|  1.0|(126,[2,8,18,20,2...|0.64995027|
|  0.0|(126,[0,8,19,20,2...|  0.350358|
|  0.0|(126,[2,8,18,20,2...|  0.350358|
|  0.0|(126,[3,6,10,21,2...|0.35038674|
|  0.0|(126,[2,9,19,20,2...|0.35384613|
|  1.0|(126,[2,8,10,20,2...|0.64995027|
|  0.0|(126,[0,8,19,20,2...|  0.350358|
|  1.0|(126,[2,8,18,20,2...|0.64995027|
|  0.0|(126,[3,8,19,20,2...| 0.3509434|
|  1.0|(126,[2,8,10,20,2...|0.64995027|
|  0.0|(126,[0,8,19,20,2...|  0.350358|
|  0.0|(126,[3,6,13,21,2...|0.35038674|
|  0.0|(126,[0,8,18,20,2...|  0.350358|
|  0.0|(126,[3,9,10,21,2...|0.35038674|
|  0.0|(126,[3,8,19,20,2...| 0.3509434|
|  0.0|(126,[0,9,19,20,2...|  0.350358|
|  1.0|(126,[2,8,10,20,2...|0.64995027|
|  0.0|(126,[5,6,10,21,2...|0.35038674|
|  0.0|(126,[2,9,19,20,2...|  0.350358|
+-----+--------------------+----------+
only showing top 20 rows

### Tune Hyper Parameters for your XGBoost Model using Spark
Transform the test data and look at the results

In [6]:
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation.RegressionEvaluator

val xgboostParam = new mutable.HashMap[String, Any]()
xgboostParam += "eta" -> 0.1
xgboostParam += "max_depth" -> 6
xgboostParam += "silent" -> 1
xgboostParam += "ntreelimit" -> 1000
xgboostParam += "objective" -> "reg:linear"
xgboostParam += "subsample" -> 0.8
xgboostParam += "num_round" -> 100

val xgbEstimator = new XGBoostEstimator(xgboostParam.toMap).setFeaturesCol("features").
  setLabelCol("label")
val paramGrid = new ParamGridBuilder().addGrid(xgbEstimator.round, Array(20, 50)).addGrid(xgbEstimator.eta, Array(0.1, 0.4)).build()
val tv = new TrainValidationSplit().setEstimator(xgbEstimator).setEvaluator(new RegressionEvaluator().setLabelCol("label")).setEstimatorParamMaps(paramGrid).setTrainRatio(0.8)  // Use 3+ in practice


val bestModel = tv.fit(trainDF)


bestModel: org.apache.spark.ml.tuning.TrainValidationSplitModel = tvs_806d5d397a37

In [7]:
// xgboost-spark appends the column containing prediction results
bestModel.transform(testDF).show()

+-----+--------------------+------------+
|label|            features|  prediction|
+-----+--------------------+------------+
|  0.0|(126,[0,8,18,20,2...| 4.887581E-6|
|  1.0|(126,[2,8,18,20,2...|  0.99994767|
|  0.0|(126,[0,8,19,20,2...| 4.887581E-6|
|  0.0|(126,[2,8,18,20,2...| 4.887581E-6|
|  0.0|(126,[3,6,10,21,2...|2.4557114E-5|
|  0.0|(126,[2,9,19,20,2...|2.4852157E-4|
|  1.0|(126,[2,8,10,20,2...|  0.99994767|
|  0.0|(126,[0,8,19,20,2...|1.3649464E-5|
|  1.0|(126,[2,8,18,20,2...|   0.9999572|
|  0.0|(126,[3,8,19,20,2...|5.0514936E-5|
|  1.0|(126,[2,8,10,20,2...|  0.99994767|
|  0.0|(126,[0,8,19,20,2...|1.3649464E-5|
|  0.0|(126,[3,6,13,21,2...|1.3977289E-5|
|  0.0|(126,[0,8,18,20,2...| 4.887581E-6|
|  0.0|(126,[3,9,10,21,2...|1.3977289E-5|
|  0.0|(126,[3,8,19,20,2...|5.9366226E-5|
|  0.0|(126,[0,9,19,20,2...|1.3649464E-5|
|  1.0|(126,[2,8,10,20,2...|  0.99994767|
|  0.0|(126,[5,6,10,21,2...|2.4557114E-5|
|  0.0|(126,[2,9,19,20,2...|1.3649464E-5|
+-----+--------------------+------

### Explain Parameters of the XGBoost Model
Let's also take a look at the parameters of the model we trained

In [9]:
bestModel.booster.getModelDump()

res29: Array[String] =
Array("0:[f28<2] yes=1,no=2,missing=2
	1:[f108<2] yes=3,no=4,missing=4
		3:leaf=0.185965
		4:[f66<2] yes=7,no=8,missing=8
			7:[f38<2] yes=13,no=14,missing=14
				13:leaf=-0.15
				14:leaf=0.177143
			8:[f7<2] yes=15,no=16,missing=16
				15:leaf=0.1
				16:leaf=-0.199117
	2:[f55<2] yes=5,no=6,missing=6
		5:[f20<2] yes=9,no=10,missing=10
			9:leaf=-0.198104
			10:leaf=0.177778
		6:[f59<2] yes=11,no=12,missing=12
			11:leaf=-0.195062
			12:[f22<2] yes=17,no=18,missing=18
				17:leaf=-0.180952
				18:[f23<2] yes=19,no=20,missing=20
					19:leaf=-0.18
					20:leaf=0.199735
", "0:[f28<2] yes=1,no=2,missing=2
	1:[f108<2] yes=3,no=4,missing=4
		3:leaf=0.170083
		4:[f66<2] yes=7,no=8,missing=8
			7:[f38<2] yes=13,no=14,missing=14
				13:leaf=-0.139357
				14:leaf=0.162618...

### Save the model to Azure Storage
XGBoost can save the model to Azure Storage. We need to specify the implicit value sc, which is required by the saveModelAsHadoopFile API. It is the sparkContext type so we need to get it from the default spark (which is of sparkSession type)

In [10]:
//set sc value which is required by the saveModelAsHadoopFile API. It is the sparkContext type so we need to get it from the default spark (which is of sparkSession type)
implicit val sc = spark.sparkContext
xgboostModel.saveModelAsHadoopFile(outputModelPath)

### References
Most of the code above are copied from XGBoost repository, notebly the [SparkWithDataFrame example](https://github.com/dmlc/xgboost/blob/771a95aec6015aa6de2dcf822220aa064e5fe52c/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/spark/SparkWithDataFrame.scala) and [SparkModelTuningTool example](https://github.com/dmlc/xgboost/blob/771a95aec6015aa6de2dcf822220aa064e5fe52c/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/spark/SparkModelTuningTool.scala)

### Acknowlegement
Thanks to Nan Zhu (zhna@microsoft.com), Software Engineer in Microsoft for helping out and identifying the potential scala conflict between Livy and XGBoost.