In [1]:
%%init_spark
launcher.num_executors = 3
launcher.executor_cores = 3
launcher.driver_memory = '2g'
launcher.executor_memory = '12g'

In this notebook we are going to work with a dataset from _Allstate Insurance Company_. The dataset is made of about 300'000 rows, anonymised and masked, with more than 100 categorical and numerical features related to insurances claims. What we have to do it is to understand whether a customer is likely to make a claim or not based on his insurance features history

**If you are running a notebook in `almond-scala` you have to run this command first to import the right spark libraries**

In [None]:
import $ivy.`org.apache.spark::spark-sql:2.4.3`
import $ivy.`org.apache.spark::spark-mllib:2.4.5`

After importing `spark-sql` and `spark-mllib` libraries we can _unzip the input file contained in_ `insurancedata.tar.bz2` . 
To do that from the Scala interface we are importing `sys.process._` and calling the command line `tar -xjf insurancedata.tar.bz2`

In [2]:
import sys.process._

Intitializing Scala interpreter ...

Spark Web UI available at https://datalab-cm-controlnode1.datalab.prod.temp.aws.sb-cloud.net:8090/proxy/application_1584597944032_0001
SparkContext available as 'sc' (version = 2.0.0.cloudera2, master = yarn, app id = application_1584597944032_0001)
SparkSession available as 'spark'


import sys.process._


In [3]:
"tar -xjf insurancedata.tar.bz2"!

res0: Int = 0


At this point we can start to import all the libraries needed for this project.  
First thing to notice is a substantial difference between Scala and Python or Java as regards the wildcard import. Scala makes a wildcard import with the underscore symbol `_` rather than with asterisk `*`. As a matter of fact `*` can be used as a valid identifier, so that we can write something like `val * = 123`. Thus,to import all the classes from a package we write `import org.apache.spark.sql._`

In [4]:
import org.apache.spark.sql._ //import all the sql framework
import org.apache.spark.sql.functions._ //import the sql functions such as expression or dates
import org.apache.spark.sql.types._ //import the sql types, like String, Integer and so on 


import java.text.SimpleDateFormat
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, StringIndexerModel, VectorAssembler}
import org.apache.spark.sql.SparkSession
import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, MulticlassMetrics, RegressionMetrics}
import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator, RegressionEvaluator}
import org.apache.spark.ml.linalg.{DenseVector, Vector} 
import org.apache.spark.sql.{Row, DataFrame}
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}


import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Dataset
import java.text.SimpleDateFormat
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, StringIndexerModel, VectorAssembler}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.classification.{RandomForestClassifier, RandomForestClassificationModel, LogisticRegression}
import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, MulticlassMetrics, RegressionMetrics}
import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator, RegressionEvaluator}
import org.apache.spark.ml.linalg.{DenseVector, Vector}
import org.apache.spark.sql.{Row, DataFrame}
import org.apache.spark.ml.regression.{LinearRegress...

To disable the Spark logging we add the following import statement:

In [5]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

import org.apache.log4j.{Level, Logger}


**if you're running on `almond-scala` access to the spark notebook Session:**

In [None]:
val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}.getOrCreate()
}

At this point we have everything set upand we are ready to start with a deep immersion into the Scala functionalities

First we are going to look at each stage of our pre and processing, then we'll write an object to implement all together 

### Data Preprocessing 

Here we are going to deal with the very first spark command. 
Using the current `spark` session we are going to read the train and test csv file. 
`option` is a huge attribute, where we can specify how to open the file:
- "header": if "true" we are importing the file's header 
- "inferSchema": if "true" spark will try to infer/guess the underlying schema of our file 
- "sep": this specifies the separtor character in our file, it may be a comma "," or ";" or whatever (e.g. (option("sep",":")) 
- "orc.bloom.filter.columns": as a further example we can deal with the orc.columns filtering them depending on our choice

`format` is in charge of communicating the file format to spark:
- "com.databricks.spark.csv": csv file
- "json"
- "parquet"
- "orc" 

then, self-explanatory `load` which takes as an argument the name of the file. 
Few things should be said about `cache` but I'll write something apart about this command and the methods to cache and persist our data in memory

In [11]:
//prepocessing data 
var trainSample = 1.0 
var testSample = 1.0
val train="insurance/train.csv"
val test ="insurance/test.csv"

val trainInput = spark.read
    .option("header","true")
    .option("inferSchema","true")
    .format("com.databricks.spark.csv")
    .load(train)
    .cache

val testInput  = spark.read
    .option("header","true")
    .option("inferSchema","true")
    .format("com.databricks.spark.csv")
    .load(test)
    .cache

trainSample: Double = 1.0
testSample: Double = 1.0
train: String = insurance/train.csv
test: String = insurance/test.csv
trainInput: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, cat1: string ... 130 more fields]
testInput: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, cat1: string ... 129 more fields]


Now we are going to do a bit of preprocess and feat transformation onto `trainInput`. 
`withColumnRenamed` is a spark attribute to rename a column into another way and `sample` just select a percentage of the current dataset. In this case `trainSample` and `testSample` are 1.0, which means take the entire dataset.

In [12]:
println("prepare the data for training...")
var data = trainInput.withColumnRenamed("loss","label").sample(false,trainSample)

prepare the data for training...


data: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, cat1: string ... 130 more fields]


The following is a minuscle data check for `nan` values. There are tons of way to deal with `nan` but this will be describe later on in the series

In [13]:
var DF = data.na.drop()
if (data==DF)
    println("no null values found in the dataframe")
else {
    println("null values exist in the dataframe")
    data = DF
}
val seed = 12345L

//here we are splitting the training into a train and validation dataset, 0.75 and 0.25
val splits = data.randomSplit(Array(0.75,0.25),seed)
val (trainingData, validationData) = (splits(0),splits(1))

null values exist in the dataframe


DF: org.apache.spark.sql.DataFrame = [id: int, cat1: string ... 130 more fields]
seed: Long = 12345
splits: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = Array([id: int, cat1: string ... 130 more fields], [id: int, cat1: string ... 130 more fields])
trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, cat1: string ... 130 more fields]
validationData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, cat1: string ... 130 more fields]


In [14]:
// Take a sample for test
val testData = testInput.sample(false, testSample).cache

testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, cat1: string ... 129 more fields]


### Feature transformation

In the following we are implementing few feature transformation functions. I wrongly said function due to my Pyhton background, in Scala something like: 
``` 
def ThisIsaMethod(x: Type, y: Type) (var_return: Type): Type = someoperations
``` 
is a Method. On the contrary a function can be something like:
```
val add = (x: Int, y: Int) => x + y
```
The following methods are:
- `isCateg`: checks if the current column is a categorical one. The categorical columns start with "cat" 
- `categNewCol`: this method take a column name as an input. If `isCateg` return "true" then we are creating a string `s"idx_${c}"` where `s` is to start a string, "idx_${c}" is just a string with the cat-column name 
- `removeTooManyCategs`: if the columns is "cat109" or "cat110" or "cat112" or "cat113" or "cat116" and so on we'll remove it from the dataframe 
- `onlyFeatureCols`: if the columnn name is different from "id" or "label" return 1. These are the features we are working on, while "id" is just an identification number for a particular claim and "label" is the target

In [15]:
//identify the categorical data
def isCateg(c:String): Boolean= c.startsWith("cat") //simply our columns start with cat if they're categorical 

def categNewCol(c:String): String = if (isCateg(c)) s"idx_${c}" else c //if the colum is categorical call it "idx_COLNUMB" else COLNUMB
def removeTooManyCategs(c: String): Boolean= !(c matches "cat(109$|110$|112$|113$|116$)") //remove these columns 
//nb in the removeTooManyCategs Boolean=! raise an error, be sure that
//Boolean= since it's the type we are returning from the function
def onlyFeatureCols(c:String): Boolean= !(c matches "id|label")


isCateg: (c: String)Boolean
categNewCol: (c: String)String
removeTooManyCategs: (c: String)Boolean
onlyFeatureCols: (c: String)Boolean


We are creating then a pipeline in order to get the features we want to work with. Starting from the `trainingData` columns we are removing the unwanted features with `removeTooManyCategs`, collecting only the features that we want to preserver with `onlyFeatureCols` and we are performing a map to `categNewCol`.
Making up a pipeline is pivotal in scala programming, as it's a readable and chain way to act which makes our code elegant and reproducible

In [16]:
//Now create a pipeline to clean the columns 
//these are all teh categorical columns
val featureCols = trainingData.columns
    .filter(removeTooManyCategs)
    .filter(onlyFeatureCols)
    .map(categNewCol)

featureCols: Array[String] = Array(idx_cat1, idx_cat2, idx_cat3, idx_cat4, idx_cat5, idx_cat6, idx_cat7, idx_cat8, idx_cat9, idx_cat10, idx_cat11, idx_cat12, idx_cat13, idx_cat14, idx_cat15, idx_cat16, idx_cat17, idx_cat18, idx_cat19, idx_cat20, idx_cat21, idx_cat22, idx_cat23, idx_cat24, idx_cat25, idx_cat26, idx_cat27, idx_cat28, idx_cat29, idx_cat30, idx_cat31, idx_cat32, idx_cat33, idx_cat34, idx_cat35, idx_cat36, idx_cat37, idx_cat38, idx_cat39, idx_cat40, idx_cat41, idx_cat42, idx_cat43, idx_cat44, idx_cat45, idx_cat46, idx_cat47, idx_cat48, idx_cat49, idx_cat50, idx_cat51, idx_cat52, idx_cat53, idx_cat54, idx_cat55, idx_cat56, idx_cat57, idx_cat58, idx_cat59, idx_cat60, idx_cat61, idx_cat62, idx_cat63, idx_cat64, idx_cat65, idx_cat66, idx_cat67, idx_cat68, idx_cat69, idx_cat70, i...

In [17]:
//now pipeline, take the columns and filter if is categorical, 
// map (c=> new StringIndexes())
val stringnIndexerStages = trainingData.columns
    .filter(isCateg)
    .map(c=> new StringIndexer()
    .setInputCol(c)
    .setOutputCol(categNewCol(c))
    .fit(trainInput.select(c).union(testInput.select(c))) 
        )

stringnIndexerStages: Array[org.apache.spark.ml.feature.StringIndexerModel] = Array(strIdx_42a976be3c22, strIdx_aea933f3db11, strIdx_587e76c2dbaf, strIdx_316d27ebd7a0, strIdx_807c37d4885c, strIdx_f3090f7b14f6, strIdx_b6f2e720a374, strIdx_8df18b8bcbb8, strIdx_6b4b1f4c8715, strIdx_cc276b450f8c, strIdx_08b578a611b3, strIdx_42905204bcc9, strIdx_d14caf79b462, strIdx_93891d37be2b, strIdx_c3266a2193d7, strIdx_a474099c197d, strIdx_04e4df3d517d, strIdx_522d83aff814, strIdx_27ba270ee777, strIdx_31e8c46e37fb, strIdx_927b74f1dbbe, strIdx_17260da703aa, strIdx_db67b258b61d, strIdx_63ae1beccb23, strIdx_86f1861c3a79, strIdx_e759731ad184, strIdx_e8967f21624e, strIdx_401c9bf64e8d, strIdx_175f0958b40e, strIdx_0f279d7e0d29, strIdx_6b56e8bb7326, strIdx_3b581007da00, strIdx_dd8b725ce550, strIdx_d17edec38348,...

In [18]:
//now transform the feature columns into a single vector columne
val assembler = new VectorAssembler()
    .setInputCols(featureCols)
    .setOutputCol("features")

assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_b988e7d15d02


### Model: Linear Regression

At this stage things are getting more serious. We are trying to make up an object for the preprocessing which encloses all the previous functionalities we have tested. Then, we are creating a Linear Regression model and chain it with preprocessing in a pipeline

In [19]:
//define a preprocessing object, which encapsulate all the previous 
//commands 
object Preprocessing {
  var trainSample = 1.0
  var testSample = 1.0
  val train = "insurance/train.csv"
  val test = "insurance/test.csv"

  println("Reading data from " + train + " file")

  val trainInput = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .format("com.databricks.spark.csv")
    .load(train)
    .cache

  val testInput = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .format("com.databricks.spark.csv")
    .load(test)
    .cache

  println("Preparing data for training model")
  var data = trainInput.withColumnRenamed("loss", "label").sample(false, trainSample)
  var DF = data.na.drop()

  // Null check
  if (data == DF)
    println("No null values in the DataFrame")

  else {
    println("Null values exist in the DataFrame")
    data = DF
  }
  
  val seed = 12345L
  val splits = data.randomSplit(Array(0.75, 0.25), seed)
  val (trainingData, validationData) = (splits(0), splits(1))

  trainingData.cache
  validationData.cache

  val testData = testInput.sample(false, testSample).cache

  def isCateg(c: String): Boolean = c.startsWith("cat")
  def categNewCol(c: String): String = if (isCateg(c)) s"idx_${c}" else c
  // Function to remove categorical columns with too many categories
  def removeTooManyCategs(c: String): Boolean = !(c matches "cat(109$|110$|112$|113$|116$)")
  // Function to select only feature columns (omit id and label)
  def onlyFeatureCols(c: String): Boolean = !(c matches "id|label")

  // Definitive set of feature columns
  val featureCols = trainingData.columns
    .filter(removeTooManyCategs)
    .filter(onlyFeatureCols)
    .map(categNewCol)

  // StringIndexer for categorical columns (OneHotEncoder should be evaluated as well)
  val stringIndexerStages = trainingData.columns.filter(isCateg)
      .map(c => new StringIndexer()
      .setInputCol(c)
      .setOutputCol(categNewCol(c))
      .fit(trainInput.select(c).union(testInput.select(c))))

  // VectorAssembler for training features
  val assembler = new VectorAssembler()
    .setInputCols(featureCols)
    .setOutputCol("features")
}

defined object Preprocessing


Before running the Linear Regression model we want to set its hyper parameters. In particular, we are defining a sequence of values for each of the following parameters, as we want to run a `GridSearch` over the parameter space, in order to find the best linear regression model and results:

- `numFolds` : we are performing a k-fold validation, in this case I reduced it to 3 but you are free to increase this number (e.g. 5, 8, 10)
- `MaxIter` : this defines the maximum number of iterations the Linear Regression model can run to improve results. If the number of iterations of Linear Regression is greater than `MaxIter` the algorithm will be stopped. Just for sake of computational cost I set `MaxIter` to 100. 
- `RegParam` : this is the regulatory parameter, to constraint the phase space explored by Linear Regression. It can fluctuate from 1e-8 up to 1e8. As a rule it's better always to test this number in advance and tune it. 
- `Tol` : this is the tolerance, namely the maximum error we can accept from our results
- `ElasticNetParam`: this parameter tune the strenght of the elastic-net penalty.

In [20]:
//define the Linear regression parameters 
val numFolds= 3 //exploit the k-fold method 
val MaxIter: Seq[Int] = Seq(100)
val RegParam: Seq[Double] = Seq(0.001)
val Tol: Seq[Double] = Seq(1e-4)
val ElasticNetParam:  Seq[Double] = Seq(0.001)

numFolds: Int = 3
MaxIter: Seq[Int] = List(10)
RegParam: Seq[Double] = List(0.001)
Tol: Seq[Double] = List(1.0E-4)
ElasticNetParam: Seq[Double] = List(0.001)


In [21]:
//define the model estimator
val model = new LinearRegression()
        .setFeaturesCol("features")
        .setLabelCol("label")

model: org.apache.spark.ml.regression.LinearRegression = linReg_6e918775bd2f


In [22]:
//build a pipeline estimator 
val pipeline= new Pipeline()
        .setStages(( Preprocessing.stringIndexerStages:+ Preprocessing.assembler) :+model ) 
// :+ means append

Reading data from insurance/train.csv file
Preparing data for training model
Null values exist in the DataFrame


pipeline: org.apache.spark.ml.Pipeline = pipeline_37b81fa2304f


At this stage we can set up the parameter Grid search. `ParamGridBuilder()` is the module to create a grid where we are exploring all the sequential values defined above. `.build()` will create the final grid

In [23]:
val paramGrid = new ParamGridBuilder()
        .addGrid(model.maxIter, MaxIter)
        .addGrid(model.regParam, RegParam)
        .addGrid(model.tol,Tol)
        .addGrid(model.elasticNetParam, ElasticNetParam)
        .build()

paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	linReg_6e918775bd2f-elasticNetParam: 0.001,
	linReg_6e918775bd2f-maxIter: 10,
	linReg_6e918775bd2f-regParam: 0.001,
	linReg_6e918775bd2f-tol: 1.0E-4
})


In [24]:
//k-fold to validate the grid params 
val cv = new CrossValidator()
        .setEstimator(pipeline)
        .setEvaluator(new RegressionEvaluator)
        .setEstimatorParamMaps(paramGrid)
        .setNumFolds(numFolds)

cv: org.apache.spark.ml.tuning.CrossValidator = cv_f2d3c3d16071


Once the grid is set and the k-fold validation - `CrossValidator()`- is set up as well, defining a process pipeling and the `RegressionEvaluator` as model we want to use we can run the Linear Regression model against the `trainingData` 

In [25]:
//now create the linear regression model 
val cvModel = cv.fit(Preprocessing.trainingData)

cvModel: org.apache.spark.ml.tuning.CrossValidatorModel = cv_f2d3c3d16071


### Metrics and  best model

As a final part we are going to measure the performance of the Logistic Regression against the train dataset in terms of Mean Squared Error and Correlation Coefficient $R^2$. Then, we will be measuring it against the test dataset. 
As you will see results are not great, given the small training parameters we have set up. As a task you could try to improve this model by modifying the hyperpamaters cell:
```
//define the Linear regression parameters 
val numFolds= 3 //exploit the k-fold method 
val MaxIter: Seq[Int] = Seq(100)
val RegParam: Seq[Double] = Seq(0.001)
val Tol: Seq[Double] = Seq(1e-4)
val ElasticNetParam:  Seq[Double] = Seq(0.001)
```

In [34]:
//find the best model out of the 5 fold 
println("Evaluating model on train set")
val trainPredictionsAndLabels =  
    cvModel.transform(Preprocessing.trainingData)
    .select("label","prediction")
    .map{case Row(label: Double, prediction:Double)
    => (label,prediction) }.rdd

Evaluating model on train set


trainPredictionsAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[2873] at rdd at <console>:95


In [35]:
val trainRegressionMetrics = new RegressionMetrics(trainPredictionsAndLabels)

trainRegressionMetrics: org.apache.spark.mllib.evaluation.RegressionMetrics = org.apache.spark.mllib.evaluation.RegressionMetrics@6deda586


In [36]:
val bestModel = cvModel.asInstanceOf[PipelineModel]

java.lang.ClassCastException:  org.apache.spark.ml.tuning.CrossValidatorModel cannot be cast to org.apache.spark.ml.PipelineModel

In [42]:
//print  the results 
val results = 
"\n=================================================================n" +
s"\nParam trainSample:${Preprocessing.trainSample}\n"+ 
s"\nParam testSample:${Preprocessing.testSample}\n"+
"\n=================================================================n" +
s"\nTraining data MSE =${trainRegressionMetrics.meanSquaredError}\n"+
s"\nTraining data RMSE=${trainRegressionMetrics.rootMeanSquaredError}\n"+
s"\nTraining data R-squared=${trainRegressionMetrics.r2}\n"

results: String =
"
Param trainSample:1.0

Param testSample:1.0

Training data MSE =4541073.635702167

Training data RMSE=2130.9795014739507

Training data R-squared=-0.1674795578921029
"


In [44]:
println("Although results on the train are bad, go on with the test")
cvModel.transform(Preprocessing.testData)
    .select("id","prediction")
    .withColumnRenamed("prediction","loss")
    .coalesce(1) //get all the prediction in a single csv file 
    .write.format("com.databricks.spark.csv")
    .option("header","true")
    .save("results_LR.csv")

Although results on the train are bad, go on with the test


In [49]:
!head results_LR.csv

id,loss
4,1149.2668994674
6,2281.771672371282
9,11473.14281933248
12,4675.344731683625
15,-47.40916887730896
17,2312.3760003813927
21,2622.871263023185
28,248.8846099468567
32,2562.5745424487345
