# PySpark: Machine Learning

## 1. SparkSession

The traditional way to interact with Spark is the SparkContext. In the notebooks we get that from the pyspark driver.

From 2.0 we can use SparkSession to replace SparkConf, SparkContext and SQLContext

In [14]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
# Libraries
from pyspark.sql import functions as f
from pyspark.sql import types
# Create a Session
spark = SparkSession.builder.getOrCreate()
# How is my session?
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f3cb89e5048>


In [2]:
spark

## 2. Loading the Dataset

In [8]:
# Path
path = '../Master_DataScience/06_Machine_Learning_on_my_own/Files/'
file = 'train.csv'
# Load
df = spark.read.csv(path+file,header=True, inferSchema=True)

In [9]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [67]:
df2 = df.select('Survived',
                'Pclass',
                'Sex',
                'Age',
                'SibSp',
                'Parch',
                'Fare',
                'Embarked')
df2.show()

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
|       1|     1|female|35.0|    1|    0|   53.1|       S|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|
|       0|     3|  male|null|    0|    0| 8.4583|       Q|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|
|       1|     3|female|27.0|    0|    2|11.1333|       S|
|       1|     2|female|14.0|    1|    0|30.0708|       C|
|       1|     3|female| 4.0|    1|    1|   16.7|       S|
|       1|     1|female|58.0|    0|    0|  26.55|       S|
|       0|     3|  male|20.0|    0|    0|   8.05|       S|
|       0|     3|  male|39.0|    1|    5| 31.275|       

## 3. Null Values and Data Types

In [68]:
# Length
df2.count()

891

In [69]:
# Null Values
df_agg = df2.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in df2.columns]) 
df_agg.toPandas().head()

Unnamed: 0,Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Embarked
0,0,0,0,177,0,0,0,2


In [70]:
# Imputations (Replace Unknowns & Missing Values) or Dropna

# # Filter
# df2 = df2.filter(condition...)
# # Missing Value Imputation
# df2 = df2.fillna(-1, subset=["Age"])
# df2 = df2.fillna(-1, subset=["Cabin"])
# df2 = df2.fillna(-1, subset=["Embarked"])
# # New Columns
# df2 = df2.withColumn('Sex_cat',f.when(f.col('Sex')== 'male',f.lit(1)).otherwise(0))

# This is a basic example, we will drop all our missing values
df3 = df2.dropna(how='any')

In [71]:
# Null Values after handling missing values
df_agg = df3.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in df3.columns]) 
df_agg.toPandas().head()

Unnamed: 0,Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Embarked
0,0,0,0,0,0,0,0,0


## IMPORTANT: Handle different fields in different ways 

We have features of at least three kinds:

* __Numeric continuous fields__, which we can use as input to many algorithms as they are. In particular, decision trees can take continuous variables with any value as input, since they only look for the cutoff point that most increases the homogeneity of the resulting groups. In contrast, if we were using a logistic regression with regularization, for example, we would need to first scale the variables to have comparable magnitudes.

* There are fields which we will treat as __categorical variables__, but which are already integers. These need to be __one-hot encoded__.

* Finally, there are several __categorical variables that are encoded as strings__. These need to be one-hot encoded, but __OneHotEncoder__ requires numeric input. Therefore, we will need to apply a __StringIndexer__ to each of them before one-hot encoding.

___PySpark Pipeline___

| Types      |StrIndex   | OneHot   | Model  |
|----------- | --------- | -------- | ----   |
|CAT         | NO        | YES      | YES    |
|STR         | YES       | YES      | YES    |
|CONT        | NO        | NO       | YES    |


## 4. StringIndexer 

A [StringIndexer](https://spark.apache.org/docs/2.2.0/ml-features.html#stringindexer) is an estimator that takes a single string field, then produces a transformer that codifies said field as numeric labels that are fit for feeding to a one-hot encoding. 

We need to specify an input column, an output column, and a way to handle invalids. In this case, invalids are values that the indexer has not seen during fitting but that the transformer finds during processing. Its values are 'error' (the default), which is pretty self-explanatory, 'skip', which drops them, and 'keep', which is what we want. It will assign all unseen labels to a single category index.

In [72]:
df3.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)



In [73]:
categorical_fields = ['Pclass','SibSp','Parch']

string_fields = [field.name for field in df3.schema.fields if field.dataType == types.StringType()]

continuous_fields = ['Age','Fare']

target_field = 'Survived'

In [74]:
string_fields

['Sex', 'Embarked']

In [75]:
from pyspark.ml.feature import StringIndexer
# handleInvalid: If there is no value, a new category is added for these values
string_indexers =[StringIndexer(inputCol=field,outputCol=field+'Index',handleInvalid='keep') for field in string_fields]

## 5. OneHotEncoder

A [OneHotEncoder](https://spark.apache.org/docs/2.2.0/ml-features.html#onehotencoder) generates a n-1 length vector column for an n-category column of category indices. 

We need to specify an input and an output column.

One OneHotEncoder per categorical column. We are also going to build these stages programatically

In [76]:
from pyspark.ml.feature import OneHotEncoder

encoders_cat =[OneHotEncoder(inputCol=field,outputCol=field+'OneHot') for field in categorical_fields if field not in string_indexers] 
encoders_str =[OneHotEncoder(inputCol=field+'Index',outputCol=field+'OneHot') for field in string_fields] 

## 6. VectorAssembler

In [77]:
from pyspark.ml.feature import VectorAssembler

cols_to_concatenate = [ field + 'OneHot' for field in categorical_fields] + continuous_fields + [field +'OneHot' for field in string_fields]
cols_to_concatenate

['PclassOneHot',
 'SibSpOneHot',
 'ParchOneHot',
 'Age',
 'Fare',
 'SexOneHot',
 'EmbarkedOneHot']

In [78]:
assembler = VectorAssembler(inputCols=cols_to_concatenate, outputCol='features')

## 7. Pipeline

Now that we have all the stages, we are finally ready to put them together into a single Estimator, our Pipeline.

In [79]:
from pyspark.ml.pipeline import Pipeline

pipeline = Pipeline(stages= string_indexers +
                            encoders_str +
                            encoders_cat +
                            [assembler]
                            )
pipeline

Pipeline_83447b9094f5

Now that we have gone to the trouble of building our Pipeline, fitting it and using it to predict the probabilty of delay on unseen data is as easy as using a single Estimator:

In [80]:
df3_fit = pipeline.fit(df3)

In [84]:
df3_transform = df3_fit.transform(df3)
df3_transform.select('Age',
                     'Fare',
                     'SexIndex',
                     'EmbarkedIndex',
                     'SexOneHot',
                     'EmbarkedOneHot',
                     'PclassOneHot',
                     'SibSpOneHot',
                     'ParchOneHot',
                     'features',
                     'Survived').show()

+----+-------+--------+-------------+-------------+--------------+-------------+-------------+-------------+--------------------+--------+
| Age|   Fare|SexIndex|EmbarkedIndex|    SexOneHot|EmbarkedOneHot| PclassOneHot|  SibSpOneHot|  ParchOneHot|            features|Survived|
+----+-------+--------+-------------+-------------+--------------+-------------+-------------+-------------+--------------------+--------+
|22.0|   7.25|     0.0|          0.0|(2,[0],[1.0])| (3,[0],[1.0])|    (3,[],[])|(5,[1],[1.0])|(6,[0],[1.0])|(21,[4,8,14,15,16...|       0|
|38.0|71.2833|     1.0|          1.0|(2,[1],[1.0])| (3,[1],[1.0])|(3,[1],[1.0])|(5,[1],[1.0])|(6,[0],[1.0])|(21,[1,4,8,14,15,...|       1|
|26.0|  7.925|     1.0|          0.0|(2,[1],[1.0])| (3,[0],[1.0])|    (3,[],[])|(5,[0],[1.0])|(6,[0],[1.0])|(21,[3,8,14,15,17...|       1|
|35.0|   53.1|     1.0|          0.0|(2,[1],[1.0])| (3,[0],[1.0])|(3,[1],[1.0])|(5,[1],[1.0])|(6,[0],[1.0])|(21,[1,4,8,14,15,...|       1|
|35.0|   8.05|     0.0|    

### Train Test Split

In [85]:
(trainData, testData) = df3_transform.randomSplit([0.8, 0.2],seed = 11)

In [86]:
trainData.count(), testData.count()

(577, 135)

## 8. RandomForestClassifier

And we are ready to do some Machine Learning! We'll use a RandomForestClassifier to try to predict delayed versus non delayed flights, a binary classification task.

In [87]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol='features', labelCol='Survived')

In [102]:
#Training algo
rfModel = rf.fit(trainData)
rf_prediction = rfModel.transform(testData)
rf_prediction.select("prediction", "Survived", "features").show(5)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|(21,[1,4,8,14,15,...|
|       1.0|       0|(21,[1,3,9,14,15,...|
|       1.0|       0|(21,[1,3,10,14,15...|
|       0.0|       0|(21,[1,3,8,14,15,...|
|       0.0|       0|(21,[1,3,9,14,15,...|
+----------+--------+--------------------+
only showing top 5 rows



In [108]:
testData.groupby('Survived').count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|   54|
|       0|   81|
+--------+-----+



### 8.1. Evaluating RF

In [100]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

rf_accuracy = evaluator.evaluate(rf_prediction)
print("Accuracy of RandomForest is = %g"% (rf_accuracy))
print("Test Error of RandomForest = %g " % (1.0 - rf_accuracy))

Accuracy of RandomForest is = 0.807407
Test Error of RandomForest = 0.192593 
