# ST446 Distributed Computing for Big Data
## Seminar class 7: Scalable machine learning I

In this notebook, we run a logistic regression from `spark.ml` on data from the file [voice.csv](voice.csv) from https://www.kaggle.com/primaryobjects/voicegender/data.

Please have a look at the Website to understand what data it contains.
**Copy it onto your cluster**.

Reference: 
https://github.com/drabastomek/learningPySpark/blob/master/Chapter06/LearningPySpark_Chapter06.ipynb

While RDDs and MLlib have already hidden a lot of what's going on behind an API, with DataFrames and ML we can use an even higher-level abstraction, called _ML pipeline_.

# 1. Predict the gender of voice with ML

## a. Load and format the data

 I have put the data it into HDFS using `hadoop fs -put voice.csv /tmp/`.

In [2]:
# edit to your bucket location
voice_path = "/tmp/voice.csv"

In [3]:
import pyspark.sql.types as typ

from pyspark.sql.types import *

# metadata / schema for the voice.csv dataset
schema = StructType([
    StructField("meanfreq", DoubleType(), True),    
    StructField("sd", DoubleType(), True),
    StructField("median", DoubleType(), True),
    StructField("Q25", DoubleType(), True),
    StructField("Q75", DoubleType(), True),
    StructField("IQR", DoubleType(), True),
    StructField("skew", DoubleType(), True),
    StructField("kurt", DoubleType(), True),
    StructField("sp_ent", DoubleType(), True),
    StructField("sfm", DoubleType(), True),
    StructField("mode", DoubleType(), True),
    StructField("centroid", DoubleType(), True),
    StructField("meanfun", DoubleType(), True),
    StructField("minfun", DoubleType(), True),
    StructField("maxfun", DoubleType(), True),
    StructField("meandom", DoubleType(), True),
    StructField("mindom", DoubleType(), True),
    StructField("maxdom", DoubleType(), True),
    StructField("dfrange", DoubleType(), True),
    StructField("modindx", DoubleType(), True),
    StructField("label", StringType(), True)    
])

# load the data into a dataframe.
voice = spark.read.csv(voice_path, header=True, schema=schema)

In [4]:
# voice.createGlobalTempView("voice")

# dataset structure and sample instance
voice.show(1)

+------------------+------------------+-----------------+------------------+------------------+------------------+----------------+----------------+-----------------+-----------------+----+------------------+-----------------+------------------+-----------------+---------+---------+---------+-------+-------+-----+
|          meanfreq|                sd|           median|               Q25|               Q75|               IQR|            skew|            kurt|           sp_ent|              sfm|mode|          centroid|          meanfun|            minfun|           maxfun|  meandom|   mindom|   maxdom|dfrange|modindx|label|
+------------------+------------------+-----------------+------------------+------------------+------------------+----------------+----------------+-----------------+-----------------+----+------------------+-----------------+------------------+-----------------+---------+---------+---------+-------+-------+-----+
|0.0597809849598081|0.0642412677031359|0.03202691337

In [6]:
# dataset size
voice.count()

3168

### Convert the data into correct format

We need to cast the labels into integer values (instead of doubles) before fitting the model:

In [7]:
voice = voice.withColumn("label", (voice["label"]=="male").cast(IntegerType()))
voice.printSchema()

root
 |-- meanfreq: double (nullable = true)
 |-- sd: double (nullable = true)
 |-- median: double (nullable = true)
 |-- Q25: double (nullable = true)
 |-- Q75: double (nullable = true)
 |-- IQR: double (nullable = true)
 |-- skew: double (nullable = true)
 |-- kurt: double (nullable = true)
 |-- sp_ent: double (nullable = true)
 |-- sfm: double (nullable = true)
 |-- mode: double (nullable = true)
 |-- centroid: double (nullable = true)
 |-- meanfun: double (nullable = true)
 |-- minfun: double (nullable = true)
 |-- maxfun: double (nullable = true)
 |-- meandom: double (nullable = true)
 |-- mindom: double (nullable = true)
 |-- maxdom: double (nullable = true)
 |-- dfrange: double (nullable = true)
 |-- modindx: double (nullable = true)
 |-- label: integer (nullable = true)



### Split the data into training and testing samples

We use the `.randomSplit(...)` method to split the data set into a training and test data set.

In [9]:
# split data into training (70% of the samples) and testing (30% of the samples)
voice_train, voice_test = voice.randomSplit([0.7, 0.3], seed=1)

## b. Use pipeline to fit the logistic regression

A pipeline consists of `Transformers` and `Estimators` to specify an ML workflow:
* `Transformer`: an abstraction that includes feature transformers and learned models. A `Transformer` implements a method `transform()`, which converts one DataFrame into another, for example by appending one or more columns.
* `Estimator`: abstracts the concept of a learning algorithm or any algorithm that fits or trains a model to/on the data. An `Estimator` implements a method `fit()`, which accepts a DataFrame and produces a model. The model, in turn, is a `Transformer`.
* A `Pipeline` is a chain of multiple `Transformers` and `Estimators`.
* Generally, `fit()` and `transform()` will be stateless.
* A pipeline also has `Parameters`, that can be accessed by the transformer and estimators.

One advantage of this paradigm is that we can use the same transformations for training and testing the data. For training, we need to plug in an estimator in the end. For testing, we need to plug in the model that has been produced by the estimator.


See [here](https://spark.apache.org/docs/2.2.0/ml-pipeline.html) for more about pipelines.
![ml-pipeline](https://mapr.com/blog/fast-data-processing-pipeline-predicting-flight-delays-using-apache-apis-pt-1/assets/ml-pipeline.png)

### i. Create transformers

Create a transformer which creates a single column with all the features collated together.

It turns multiple columns into one column containing a vector. Here, this will be the feature vector.

In [10]:
import pyspark.ml.feature as ft

featuresCreator = ft.VectorAssembler(
    inputCols= voice.columns[:-1],
    outputCol='features'
)

### ii. Create an estimator 
(we instantiate a logistic regression model with specific parameters)

In [11]:
import pyspark.ml.classification as cl

logistic = cl.LogisticRegression(
    maxIter=10, 
    regParam=0.01, 
    labelCol='label')

### iii. Create a pipeline 
(create an abstract list of transformers and estimators)

In [12]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
        featuresCreator, 
        logistic
    ])

### iv. Run the pipeline to fit the model

Now run our `pipeline` and estimate our model.

In [13]:
model = pipeline.fit(voice_train)
test_model = model.transform(voice_test)

Here's what the `test_model` looks like.

In [14]:
test_model.take(1)

[Row(meanfreq=0.0621823118609672, sd=0.0878894037873831, median=0.0109745762711864, Q25=0.00177966101694915, Q75=0.117457627118644, IQR=0.115677966101695, skew=9.61220808953177, kurt=114.803500510109, sp_ent=0.786650358576641, sfm=0.329569856469261, mode=0.000889830508474576, centroid=0.0621823118609672, meanfun=0.0997761550401998, minfun=0.0171122994652406, maxfun=0.258064516129032, meandom=0.0955528846153846, mindom=0.0078125, maxdom=1.4140625, dfrange=1.40625, modindx=0.105777777777778, label=1, features=DenseVector([0.0622, 0.0879, 0.011, 0.0018, 0.1175, 0.1157, 9.6122, 114.8035, 0.7867, 0.3296, 0.0009, 0.0622, 0.0998, 0.0171, 0.2581, 0.0956, 0.0078, 1.4141, 1.4062, 0.1058]), rawPrediction=DenseVector([-1.1187, 1.1187]), probability=DenseVector([0.2463, 0.7537]), prediction=1.0)]

## c. Model performance

In [15]:
import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='label')

print('areaUnderROC: ', evaluator.evaluate(test_model, 
     {evaluator.metricName: 'areaUnderROC'}))
print('areaUnderPR: ', evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))

areaUnderROC:  0.9877969706401062
areaUnderPR:  0.9879752286972516


## d. Saving the model

PySpark allows you to save the `Pipeline` definition for later use.

In [16]:
pipelinePath = './voice'
pipeline.write().overwrite().save(pipelinePath)

So, you can load it up later and use straight away to `.fit(...)` and predict.

In [17]:
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline \
    .fit(voice_train)\
    .transform(voice_test)\
    .take(1)

[Row(meanfreq=0.0621823118609672, sd=0.0878894037873831, median=0.0109745762711864, Q25=0.00177966101694915, Q75=0.117457627118644, IQR=0.115677966101695, skew=9.61220808953177, kurt=114.803500510109, sp_ent=0.786650358576641, sfm=0.329569856469261, mode=0.000889830508474576, centroid=0.0621823118609672, meanfun=0.0997761550401998, minfun=0.0171122994652406, maxfun=0.258064516129032, meandom=0.0955528846153846, mindom=0.0078125, maxdom=1.4140625, dfrange=1.40625, modindx=0.105777777777778, label=1, features=DenseVector([0.0622, 0.0879, 0.011, 0.0018, 0.1175, 0.1157, 9.6122, 114.8035, 0.7867, 0.3296, 0.0009, 0.0622, 0.0998, 0.0171, 0.2581, 0.0956, 0.0078, 1.4141, 1.4062, 0.1058]), rawPrediction=DenseVector([-1.1187, 1.1187]), probability=DenseVector([0.2463, 0.7537]), prediction=1.0)]

You can also save the whole model and load it from disk.

In [18]:
from pyspark.ml import PipelineModel

modelPath = './voice_model'
model.write().overwrite().save(modelPath)

loadedPipelineModel = PipelineModel.load(modelPath)
test_loadedModel = loadedPipelineModel.transform(voice_test)

# 2. Hyper-parameter tuning

## a. Grid search

Now, we would like to figure out what training parameters work well. To this end, we specify our model and the list of parameters we want to loop through. Specifically, we test different maximum numbers of iterations and regularistaion parameters (L2).

In [19]:
import pyspark.ml.tuning as tune

logistic = cl.LogisticRegression(
    labelCol='label')

grid = tune.ParamGridBuilder() \
    .addGrid(logistic.maxIter,  
             [2, 10, 50]) \
    .addGrid(logistic.regParam, 
             [0.01, 0.05, 0.3]) \
    .build()

Next, we need some way of comparing the models.

In [20]:
evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='label')

Create the logic that will do the validation work for us.

In [21]:
cv = tune.CrossValidator(
    estimator=logistic, 
    estimatorParamMaps=grid, 
    evaluator=evaluator
)

Create a purely transforming `Pipeline`.

In [22]:
pipeline = Pipeline(stages=[featuresCreator])
data_transformer = pipeline.fit(voice_train)

Having done this, we are ready to find the optimal combination of parameters for our model.

In [23]:
cvModel = cv.fit(data_transformer.transform(voice_train))

The `cvModel` will return the best model estimated.

## b. Model evaluation

We can now use it to see if it performed better than our previous model.

In [24]:
data_train = data_transformer \
    .transform(voice_test)
results = cvModel.transform(data_train)

print('areaUnderROC', evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderROC'}))
print('areaUnderPR', evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderPR'}))

areaUnderROC 0.9916744475568025
areaUnderPR 0.9929399234828469


## c. Extract the parameters

What parameters has the best model? The answer is a little bit convoluted but here's how you can extract it.

In [25]:
results = [
    (
        [
            {key.name: paramValue} 
            for key, paramValue 
            in zip(
                params.keys(), 
                params.values())
        ], metric
    ) 
    for params, metric 
    in zip(
        cvModel.getEstimatorParamMaps(), 
        cvModel.avgMetrics
    )
]

sorted(results, 
       key=lambda el: el[1], 
       reverse=True)[0]

([{'maxIter': 50}, {'regParam': 0.01}], 0.9942463252028225)

## 3. Can we fit a more sophisticated model?
This demonstrates how to re-assemble the pipeline. We are using Gradient Boosted Trees here.

In [26]:
from pyspark.ml.classification import GBTClassifier

# new classifier
gbt = GBTClassifier(
    labelCol='label', maxIter=20)

# put into pipeline
pipelineGBT = Pipeline(stages=[
        featuresCreator, 
        gbt
    ])

# train and test
modelGBT = pipelineGBT.fit(voice_train)
test_modelGBT = modelGBT.transform(voice_test)

# tell me the performance
evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='label')

print('areaUnderROC: ', evaluator.evaluate(test_modelGBT, 
     {evaluator.metricName: 'areaUnderROC'}))
print('areaUnderPR:  ', evaluator.evaluate(test_modelGBT, {evaluator.metricName: 'areaUnderPR'}))

areaUnderROC:  0.9932133347166023
areaUnderPR:   0.9930471486509133


In [27]:
test_modelGBT.take(1)

[Row(meanfreq=0.0621823118609672, sd=0.0878894037873831, median=0.0109745762711864, Q25=0.00177966101694915, Q75=0.117457627118644, IQR=0.115677966101695, skew=9.61220808953177, kurt=114.803500510109, sp_ent=0.786650358576641, sfm=0.329569856469261, mode=0.000889830508474576, centroid=0.0621823118609672, meanfun=0.0997761550401998, minfun=0.0171122994652406, maxfun=0.258064516129032, meandom=0.0955528846153846, mindom=0.0078125, maxdom=1.4140625, dfrange=1.40625, modindx=0.105777777777778, label=1, features=DenseVector([0.0622, 0.0879, 0.011, 0.0018, 0.1175, 0.1157, 9.6122, 114.8035, 0.7867, 0.3296, 0.0009, 0.0622, 0.0998, 0.0171, 0.2581, 0.0956, 0.0078, 1.4141, 1.4062, 0.1058]), rawPrediction=DenseVector([-1.0799, 1.0799]), probability=DenseVector([0.1034, 0.8966]), prediction=1.0)]

The performance looks similar.