## [Background](https://cloud.google.com/ml-engine/docs/hyperparameter-tuning-overview)

## [Tuning In Spark](https://spark.apache.org/docs/2.1.0/ml-tuning.html)

In [1]:
import findspark
findspark.init('/home/asif/spark-2.1.0-bin-hadoop2.7')
from pyspark.sql import SparkSession # to load dataframe we need this
spark = SparkSession.builder.appName('titanicRandomForestClassifier').getOrCreate()


In [2]:
rawData = spark.read.csv('TitanicData.csv', inferSchema=True, header=True)
rawData.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [3]:
from pyspark.sql.functions import col
totalNumberOfRow = rawData.count()
print('Number of rows in rawData = {0}'.format(totalNumberOfRow))
collectNumRowsInEachCol = rawData.describe().filter(col("summary") == "count").collect()
print("Number of rows in each column:\n",collectNumRowsInEachCol)

Number of rows in rawData = 891
Number of rows in each column:
 [Row(summary='count', PassengerId='891', Survived='891', Pclass='891', Name='891', Sex='891', Age='714', SibSp='891', Parch='891', Ticket='891', Fare='891', Cabin='204', Embarked='889')]


## 'Age', 'Cabin', 'Embarked' have missing values

## Working with 'Embarked'

In [4]:
rawData.select('Embarked').distinct().show()

+--------+
|Embarked|
+--------+
|       Q|
|    null|
|       C|
|       S|
+--------+



In [5]:
rawData.groupBy("Embarked").count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+



In [6]:
rawData = rawData.na.fill('missing',subset=['Embarked'])

In [7]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkedInt')
# for multiple column pipeline should be used
rawData = indexer.fit(rawData).transform(rawData)
encoder = OneHotEncoder(inputCol="EmbarkedInt", outputCol="categoryEmbarked")
rawData = encoder.transform(rawData)

In [8]:
rawData.head(1)

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S', EmbarkedInt=0.0, categoryEmbarked=SparseVector(3, {0: 1.0}))]

In [9]:
rawData.select('Cabin').distinct().count()

148

In [10]:
rawData = rawData.drop('Cabin')

## Function for mean and standerd dev

In [11]:
def getMean(dataframe, colName):
    from pyspark.sql.functions import mean
    meanVal = dataframe.select(mean(dataframe[colName])).collect()
    meanVal = meanVal[0][0]
    return meanVal

def getStddev(dataframe, colName):
    from pyspark.sql.functions import stddev
    stddevVal = dataframe.select(stddev(dataframe[colName])).collect()
    stddevVal = stddevVal[0][0]
    return stddevVal


In [12]:

meanAge = getMean(rawData, 'Age')
stddevAge = getStddev(rawData, 'Age')
print('meanAge  = {0}   \nstddevAge = {1}'.format(meanAge, stddevAge))

meanAge  = 29.69911764705882   
stddevAge = 14.526497332334035


In [13]:
rawData = rawData.na.fill(meanAge, ['Age'])
rawData.describe('Age').show()

+-------+------------------+
|summary|               Age|
+-------+------------------+
|  count|               891|
|   mean|29.699117647058763|
| stddev|13.002015226002891|
|    min|              0.42|
|    max|              80.0|
+-------+------------------+



In [14]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='Sex', outputCol='SexInt')
# for multiple column pipeline should be used
rawData = indexer.fit(rawData).transform(rawData)
rawData = rawData.drop('Sex')
rawData.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- EmbarkedInt: double (nullable = true)
 |-- categoryEmbarked: vector (nullable = true)
 |-- SexInt: double (nullable = true)



In [15]:
# adding two columns value
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
def findSum(Parch, SibSp):
    sum_ = float(SibSp) + float(Parch)
    return sum_
    
userDefiendFuncForSum =  udf(findSum, DoubleType())
rawData = rawData.withColumn('sumPS', userDefiendFuncForSum(col('Parch'),col('SibSp')))



def FareBysumPS(fare, sumPS):
    ans = float(fare) / (float(sumPS)+1)
    return ans
    
userDefiendFuncForFareBysumPS =  udf(FareBysumPS, DoubleType())
rawData = rawData.withColumn('FareBysumPS', userDefiendFuncForFareBysumPS(col('Fare'),col('sumPS')))




def getFareByPclass(fare,pclass):
    fare = float(fare)/(float(pclass))
    fare = float("{0:.2f}".format(fare))
    return fare
                        
userDefiendFuncForFareByPclass =  udf(getFareByPclass, DoubleType())
rawData = rawData.withColumn('FareByPclass', userDefiendFuncForFareByPclass(col('Fare'),col('Pclass')))
rawData.select('FareByPclass').show()


+------------+
|FareByPclass|
+------------+
|        2.42|
|       71.28|
|        2.64|
|        53.1|
|        2.68|
|        2.82|
|       51.86|
|        7.02|
|        3.71|
|       15.04|
|        5.57|
|       26.55|
|        2.68|
|       10.42|
|        2.62|
|         8.0|
|        9.71|
|         6.5|
|         6.0|
|        2.41|
+------------+
only showing top 20 rows



In [16]:
rawData = rawData.withColumnRenamed("Survived", "label")
print(rawData.columns)

['PassengerId', 'label', 'Pclass', 'Name', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Embarked', 'EmbarkedInt', 'categoryEmbarked', 'SexInt', 'sumPS', 'FareBysumPS', 'FareByPclass']


In [17]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols =[ 'Pclass','SibSp', 'Parch', 'Fare','categoryEmbarked',
                                         'SexInt','sumPS','FareBysumPS'],
                            outputCol = 'features')


In [18]:
dataFromAssembler = assembler.transform(rawData)
dataFromAssembler.head(1)

[Row(PassengerId=1, label=0, Pclass=3, Name='Braund, Mr. Owen Harris', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Embarked='S', EmbarkedInt=0.0, categoryEmbarked=SparseVector(3, {0: 1.0}), SexInt=0.0, sumPS=1.0, FareBysumPS=3.625, FareByPclass=2.42, features=DenseVector([3.0, 1.0, 0.0, 7.25, 1.0, 0.0, 0.0, 0.0, 1.0, 3.625]))]

In [19]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
rfc = RandomForestClassifier(labelCol="label", featuresCol="features",numTrees=500,maxDepth=15, maxBins=32, minInstancesPerNode=3)

In [20]:
finalData = dataFromAssembler.select('features','label')
trainData, testData = finalData.randomSplit([0.7,0.3])
rfc_model = rfc.fit(trainData)

In [21]:
rfc_preds = rfc_model.transform(testData)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
rfc_eval = BinaryClassificationEvaluator(labelCol='label')
print('RFC area under the curve=',rfc_eval.evaluate(rfc_preds))
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
rfc_acc = acc_eval.evaluate(rfc_preds)
print('rfc acc= ',rfc_acc)

RFC area under the curve= 0.8759426847662144
rfc acc=  0.8081180811808119


In [22]:
rfc_preds.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [23]:
rfc_preds.select('label','prediction').show()

+-----+----------+
|label|prediction|
+-----+----------+
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    0|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    0|       1.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    1|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 20 rows



# <span style="color:red"><h1>Now we try hypermeter tuning</h1></span>
# <span style="color:blue"><h2><a href="https://spark.apache.org/docs/2.1.0/ml-tuning.html#cross-validation">Cross-Validation</a></h2></span>

In [24]:
from pyspark.ml import Pipeline

rfcNew = RandomForestClassifier(labelCol="label", featuresCol="features", minInstancesPerNode=3)

In [25]:
from pyspark.ml.tuning import  ParamGridBuilder
paramGrid = ParamGridBuilder() \
    .addGrid(rfcNew.numTrees, [40, 100, 200,300,500]) \
    .addGrid(rfcNew.maxDepth, [10,15,20,28]) \
    .addGrid(rfcNew.maxBins, [8,16,32]) \
    .build()


In [26]:
from pyspark.ml.tuning import CrossValidator
crossval = CrossValidator(estimator=rfcNew,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10)  # use 3+ folds in practice
cvModel = crossval.fit(trainData)

In [27]:
# Make predictions on test documents. cvModel uses the best model found (
prediction = cvModel.transform(testData)

In [28]:
prediction.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [29]:
rfc_eval_new = BinaryClassificationEvaluator(labelCol='label')
print('RFC area under the curve=',rfc_eval_new.evaluate(prediction))
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval_new = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
rfc_acc = acc_eval_new.evaluate(prediction)
print('rfc acc after tuning= ',rfc_acc)

RFC area under the curve= 0.873912286808215
rfc acc after tuning=  0.8228782287822878


In [30]:
best_model = cvModel.bestModel

In [31]:
best_model

RandomForestClassificationModel (uid=rfc_edf5710b2ff4) with 100 trees

# <span style="color:blue"><h2><a href="https://spark.apache.org/docs/2.1.0/ml-tuning.html#train-validation-split">Train-Validation Split</a></h2></span>

In [32]:
rfcTrainValidationSplit = RandomForestClassifier(labelCol="label", featuresCol="features",maxBins=16, minInstancesPerNode=5)

In [33]:
paramGridTrainValidationSplit = ParamGridBuilder() \
    .addGrid(rfcNew.numTrees, [20,32,64, 100, 200,300,500]) \
    .addGrid(rfcNew.maxDepth, [10,15,20,28]) \
    .build()

from pyspark.ml.tuning import TrainValidationSplit
tvs = TrainValidationSplit(estimator=rfcTrainValidationSplit,
                           estimatorParamMaps=paramGridTrainValidationSplit,
                           evaluator=BinaryClassificationEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)


In [34]:
tvsModel = tvs.fit(trainData)
tvsPrediction = tvsModel.transform(testData)

In [35]:
tvsPrediction.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [36]:
rfc_eval_tvs = BinaryClassificationEvaluator(labelCol='label')
print('RFC area under the curve=',rfc_eval_tvs.evaluate(tvsPrediction))
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval_tvs = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
rfc_acc_tvs = acc_eval_tvs.evaluate(tvsPrediction)
print('rfc acc after tuning= ',rfc_acc_tvs)

RFC area under the curve= 0.8934331128901266
rfc acc after tuning=  0.8154981549815498


In [37]:
best_model_tvs = tvsModel.bestModel
best_model_tvs

RandomForestClassificationModel (uid=rfc_93b4f3d007f5) with 20 trees