<img src="http://imgur.com/1ZcRyrc.png" style="float: left; margin: 20px; height: 55px"> 
# Spark MLlib Lab
---

In [13]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
 
plt.style.use('ggplot')
sns.set(font_scale=1.5)
%config InlineBackend.figure_format = 'retina'
%matplotlib inline

## Create the spark context

In [14]:
import pyspark as ps
from pyspark.sql import SQLContext

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler

In [16]:
sc.stop()
sc = ps.SparkContext('local[4]')
sqlContext = SQLContext(sc)

## Label encoding categorical features

Often we have categorical features with values given as strings which we would like to transform to numerical values. The analogue of sklearn's `LabelEncoder` is the `StringIndexer`.

In [4]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

# StringIndexer turns categorical target variable into labels=0,1,2: you need to do this in pyspark,
# unlike sklearn which could handle categorical target variables.

In [5]:
ex_1 = sqlContext.createDataFrame([
    (4, "high"),
    (5, "low"),
    (6, "high"),
    (7, "high"),
    (8,'medium')
], ["id", "label"])

In [6]:
ex_1.show()

+---+------+
| id| label|
+---+------+
|  4|  high|
|  5|   low|
|  6|  high|
|  7|  high|
|  8|medium|
+---+------+



In [7]:
string_indexer = StringIndexer(
        inputCol='label',
        outputCol='label' + "_index"
    )

In [8]:
ex_2 = string_indexer.fit(ex_1).transform(ex_1)
ex_2.show()

+---+------+-----------+
| id| label|label_index|
+---+------+-----------+
|  4|  high|        0.0|
|  5|   low|        1.0|
|  6|  high|        0.0|
|  7|  high|        0.0|
|  8|medium|        2.0|
+---+------+-----------+



In [9]:
onehot = OneHotEncoderEstimator(
        dropLast=True,
        inputCols=['label_index'],
        outputCols=['label' + "_index_1"]
    )

In [11]:
onehot.fit(ex_2).transform(ex_2).show()
# label_index_1 is a sparse vector.
    # First element says how many elements there are in the sparse vector
    # Next element shows you which class appeared and third element is the count of that class
    # Note the last row is empty as this is a sparse vector, and we've already dropped last.

+---+------+-----------+-------------+
| id| label|label_index|label_index_1|
+---+------+-----------+-------------+
|  4|  high|        0.0|(2,[0],[1.0])|
|  5|   low|        1.0|(2,[1],[1.0])|
|  6|  high|        0.0|(2,[0],[1.0])|
|  7|  high|        0.0|(2,[0],[1.0])|
|  8|medium|        2.0|    (2,[],[])|
+---+------+-----------+-------------+



The one-hot-encoded values are given as a sparse vector for each observation. The first number indicates the length of the sparse vector, the second number in brackets indicates the position that is filled with the last value. As you can see from the last shown entry, dropping a redundant label (`drop_last`) is default here.

## Read in the car evaluation dataset 

```python
df = pd.read_csv('../../../../resource-datasets/car_evaluation/car.csv')
```

Use `acceptability` as target.

In [76]:
df = pd.read_csv('../../../../resource-datasets/car_evaluation/car.csv')
df.head()

Unnamed: 0,buying,maint,doors,persons,lug_boot,safety,acceptability
0,vhigh,vhigh,2,2,small,low,unacc
1,vhigh,vhigh,2,2,small,med,unacc
2,vhigh,vhigh,2,2,small,high,unacc
3,vhigh,vhigh,2,2,med,low,unacc
4,vhigh,vhigh,2,2,med,med,unacc


In [None]:
spark_df = sqlContext.createDataFrame(df)
spark_df.first()

In [77]:
spark_df.dtypes

[('buying', 'string'),
 ('maint', 'string'),
 ('doors', 'string'),
 ('persons', 'string'),
 ('lug_boot', 'string'),
 ('safety', 'string'),
 ('acceptability', 'string'),
 ('buying_index', 'double'),
 ('maint_index', 'double'),
 ('doors_index', 'double'),
 ('persons_index', 'double'),
 ('lug_boot_index', 'double'),
 ('safety_index', 'double'),
 ('acceptability_index', 'double')]

In [78]:
spark_df.select('buying').dtypes

[('buying', 'string')]

In [79]:
[spark_df.dtypes[i][0] for i in range(len(spark_df.dtypes)) if spark_df.dtypes[i][1]=='string']

['buying', 'maint', 'doors', 'persons', 'lug_boot', 'safety', 'acceptability']

In [80]:
spark_df

DataFrame[buying: string, maint: string, doors: string, persons: string, lug_boot: string, safety: string, acceptability: string, buying_index: double, maint_index: double, doors_index: double, persons_index: double, lug_boot_index: double, safety_index: double, acceptability_index: double]

## Dummify the categorical variables

Use first the `StringIndexer`, then the `OneHotEncoderEstimator` to create the dummified variables. Be careful not to use one-hot encoding on the target variable (`acceptability`).

In [82]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(spark_df) for column in list(spark_df.columns)]

pipeline = Pipeline(stages=indexers)
spark_df_2 = pipeline.fit(spark_df).transform(spark_df)
spark_df_2.show(5)

IllegalArgumentException: 'requirement failed: Output column buying_index already exists.'

In [58]:
onehot = OneHotEncoderEstimator(
        dropLast=True,
        inputCols=spark_df_2.columns[7:-1],
        outputCols=[str(col)+'_index_1' for col in spark_df_2.columns[7:-1]]
    )
spark_df_3 = onehot.fit(spark_df_2).transform(spark_df_2)
spark_df_3.show(5)

Exception ignored in: <function JavaWrapper.__del__ at 0x11517c710>
Traceback (most recent call last):
  File "/Users/Noah/anaconda3/envs/pyspark_env/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'OneHotEncoderEstimator' object has no attribute '_java_obj'


+------+-----+-----+-------+--------+------+-------------+------------+-----------+-----------+-------------+--------------+------------+-------------------+---------------------+----------------------+--------------------+--------------------+-------------------+-------------------+
|buying|maint|doors|persons|lug_boot|safety|acceptability|buying_index|maint_index|doors_index|persons_index|lug_boot_index|safety_index|acceptability_index|persons_index_index_1|lug_boot_index_index_1|buying_index_index_1|safety_index_index_1|doors_index_index_1|maint_index_index_1|
+------+-----+-----+-------+--------+------+-------------+------------+-----------+-----------+-------------+--------------+------------+-------------------+---------------------+----------------------+--------------------+--------------------+-------------------+-------------------+
| vhigh|vhigh|    2|      2|   small|   low|        unacc|         2.0|        2.0|        2.0|          2.0|           2.0|         1.0|        

## Prepare your feature columns with `VectorAssembler`

In [1]:
from pyspark.ml.feature import VectorAssembler

In [72]:
vectorAssembler = VectorAssembler(inputCols=spark_df_3.columns[14:],
                                  outputCol="features")
vector_df = vectorAssembler.transform(spark_df_3)
vector_df.persist()

DataFrame[buying: string, maint: string, doors: string, persons: string, lug_boot: string, safety: string, acceptability: string, buying_index: double, maint_index: double, doors_index: double, persons_index: double, lug_boot_index: double, safety_index: double, acceptability_index: double, persons_index_index_1: vector, lug_boot_index_index_1: vector, buying_index_index_1: vector, safety_index_index_1: vector, doors_index_index_1: vector, maint_index_index_1: vector, features: vector]

In [71]:
vector_df.select('features').show(2)

+--------------------+
|            features|
+--------------------+
|(15,[6,8,11,14],[...|
|(15,[6,7,11,14],[...|
+--------------------+
only showing top 2 rows



## Fit and evaluate a spark decision tree model and tune with grid search

Once done, try also other models.

In [73]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [83]:
vector_df

DataFrame[buying: string, maint: string, doors: string, persons: string, lug_boot: string, safety: string, acceptability: string, buying_index: double, maint_index: double, doors_index: double, persons_index: double, lug_boot_index: double, safety_index: double, acceptability_index: double, persons_index_index_1: vector, lug_boot_index_index_1: vector, buying_index_index_1: vector, safety_index_index_1: vector, doors_index_index_1: vector, maint_index_index_1: vector, features: vector]

In [84]:
model = DecisionTreeClassifier(featuresCol='features', labelCol='acceptability_index')

In [86]:
(data_train, data_test) = vector_df.randomSplit([0.7, 0.3], seed=1)
scaler = StandardScaler(withMean=True,
                        inputCol="features",
                        outputCol="scaledfeatures")


pipeline = Pipeline(stages=[scaler, model])

evaluator = MulticlassClassificationEvaluator(
    predictionCol='prediction',
    labelCol='acceptability_index',
    metricName='accuracy'
)

paramGrid = ParamGridBuilder() \
    .addGrid(model.maxDepth, [3, 4, 5]) \
    .build()

# the actual gridsearch
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters.
model_fit = crossval.fit(data_train)

print('Average cv scores:')
print(np.around(np.array(model_fit.avgMetrics), 4))

java_model = model_fit.bestModel.stages[1]._java_obj

print('Best model parameters:')
print({param.name: java_model.getOrDefault(java_model.getParam(param.name))
       for param in paramGrid[0]})
print()
# print(java_model.explainParams())

predictions = model_fit.transform(data_test)

print('Best model test accuracy:')
print(evaluator.evaluate(predictions))

Average cv scores:
[0.7806 0.7961 0.8208]
Best model parameters:
{'maxDepth': 5}

Best model test accuracy:
0.8287937743190662
