Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE", as well as your name and collaborators below:

In [1]:
NAME = "Vincent Kim"
COLLABORATORS = ""

---

# Machine Learning in Spark

Following the evolution of Spark, there are two ways to do Machine Learning on Spark :

* MLlib, or `spark.mllib`, was the first ML library implemented in the core Spark library and runs on RDDs. As of today, the library is in maintenance mode, but as we did for RDDs vs DataFrames, it is important that we cover some aspects of the older library. MLlib is also the only library that supports training models for Spark Streaming. 
* ML, or `spark.ml` is now the primary ML library on Spark, and runs on DataFrames. Its API is close to those of other mainstream librairies like scikit-learn.

We will dive into both APIs in this notebook, using the `titanic.csv` file for classification purposes on the `Survived` column.

_I think at this point of your career, you all know what the [Titanic dataset](https://www.kaggle.com/c/titanic/data) is..._

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAppName('lecture-lyon2').setMaster('local[*]')
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [3]:
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.mllib.linalg import VectorUDT

---
## Data preparation

Even though MLlib is designed with RDDs and DStreams in focus, for ease of transforming the data we will read the data and convert it to a DataFrame. Afterwards we will build RDDs for training in MLlib, or stay in DataFrame for training in ML.

In [4]:
data = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('../data/titanic.csv')
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [5]:
data.describe().toPandas()

Unnamed: 0,summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
1,mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
2,stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
3,min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
4,max,891.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


From the first summary statistics, we see that the `Age`, `Cabin` and `Embarked` variables can have null values. Also `PassengerId` and `Ticket` look useless for future predictions.

# Question
  
* Drop `Cabin`, `Ticket` and `PassengerId`
* Using `.na.fill` function on a DataFrame :
    * For `Age`, replace `None` by the mean value for the column. 
    * For `Embarked` columns, replace `None` by the most frequent value for the column. 

In [6]:
df = data
columns_to_drop = ['Cabin', 'Ticket', 'PassengerId']
df = df.drop(*columns_to_drop)

#age_df = df.createOrReplaceTempView('psng')
#stats = df.describe()
#avg_age = spark.sql("SELECT Age FROM psng").show()
#avg_age = fn.avg(avg_age)
#avg_age = stats.select('Age').show()
#avg_age = df.select(avg(df('Age'))).show()
#avg_age = df.select(avg($"Age")).show()

dfp = df.toPandas()
#avg_age = dfp.fillna({'Age': df['Age'].mean()})
#mode_embark = dfp.fillna({'Embark': df['Embark'].mode()})
avg_age = dfp['Age'].mean()
print(avg_age)
mode_embark = dfp['Embarked'].mode().item()#columns='Embarked', numeric_only=True, dropna=True)
print(mode_embark)
#avg_age = avg_age.toPandas()
#avg_age.head()
#dfp.head(20)
#print(df['Age'].mean())
    
df = df.na.fill(avg_age,'Age')
df = df.na.fill(mode_embark,'Embarked')
df.toPandas().head(20)

29.69911764705882
S


Unnamed: 0,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Fare,Embarked
0,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,7.25,S
1,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,71.2833,C
2,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,7.925,S
3,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,53.1,S
4,0,3,"Allen, Mr. William Henry",male,35.0,0,0,8.05,S
5,0,3,"Moran, Mr. James",male,29.699118,0,0,8.4583,Q
6,0,1,"McCarthy, Mr. Timothy J",male,54.0,0,0,51.8625,S
7,0,3,"Palsson, Master. Gosta Leonard",male,2.0,3,1,21.075,S
8,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27.0,0,2,11.1333,S
9,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14.0,1,0,30.0708,C


In [7]:
def replace_na(df):
    """
    Deal with na values, and drop selected columns    
    """
    columns_to_drop = ['Cabin', 'Ticket', 'PassengerId']
    df = df.drop(*columns_to_drop)
    
    #Calculate the statistics
    dfp = df.toPandas()
    avg_age = dfp['Age'].mean()
    mode_embark = dfp['Embarked'].mode().item()
    
    #Impute the null values
    df = df.na.fill(avg_age,'Age')
    df = df.na.fill(mode_embark,'Embarked')
    return df

In [8]:
"""
Graded cell

3 points
"""
result = replace_na(data)
assert float(result.describe().toPandas().loc[2]['Age']) - 13 < 0.1
assert int(result.describe().toPandas().loc[0]['Embarked']) == 891
assert list(result.toPandas().columns.values) == ['Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']

For the following two questions, we will use [Transformers](https://spark.apache.org/docs/2.2.0/ml-pipeline.html#transformers). Technically, a Transformer implements a method `transform()`, which converts one DataFrame into another, generally by appending one or more columns.

Example: 

```python
from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])
continuousDataFrame.show()

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
```

Result :

```
+---+-------+
| id|feature|
+---+-------+
|  0|    0.1|
|  1|    0.8|
|  2|    0.2|
+---+-------+

Binarizer output with Threshold = 0.500000
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
|  0|    0.1|              0.0|
|  1|    0.8|              1.0|
|  2|    0.2|              0.0|
+---+-------+-----------------+
```

**Note:** contrary to previous notebooks, I have not imported all of the libraries needed to solve the remaining exercises. When you want to import a library, please import it in the same notebook cell as where you implement your code, otherwise it may impact the automatic grading.

# Question

Through some regex, the [regex_extract UDF](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF) and [SQLTransformer](https://spark.apache.org/docs/2.2.0/ml-features.html#sqltransformer), get the title of a person from the `Name` column in a `Title` column. Drop the `Name` column afterwards.

Example

```
Braund, Mr. Owen       --> Mr
Andria, Doctor. Steve  --> Doctor
```

_Not a hint: while it is perfectly possible to write a custom UDF to solve this question, it breaks the purpose of using Dataframes for cleaning because UDFs don't benefit from SparkSQL's optimizer engine and have to transform back to Java objects for processing. Spark built-in UDFs don't share this problem._

In [9]:
from pyspark.sql import functions as fn
from pyspark.sql.functions import regexp_extract, udf, lit
from pyspark.ml.feature import SQLTransformer

def extract_civility(df):
    """
    Return dataframe dropping Name and replacing with Title
    """
    #df['Title'] = df.select(regexp_extract('Name', '(\\s)(\p{L})(\.)\(\s)', 1).alias('d')).collect()
    #df_new = df.withColumn("Title", lit(0))
    #df_new['Title'] = df.select(regexp_extract('Name', '\\s', 2).alias('Title')).collect()
    
    df.createOrReplaceTempView("__THIS__") #Not necessary??
    #sqlTrans = SQLTransformer(statement="SELECT *, regexp_extract(__THIS__.Sex, '(a)(.*)', 1) AS Title FROM __THIS__")
    sqlTrans = SQLTransformer(statement="SELECT *, regexp_extract(__THIS__.Name, '(.*, )([[a-zA-Z], ]*)(\..*)', 2) AS Civility FROM __THIS__")
    #sqlTrans = SQLTransformer(statement="SELECT *, regexp_extract('Stengel, Mrs. Charles Emil Henry', '(.*, )(\w+)(\..*)', 3) AS Title FROM __THIS__")
    df = sqlTrans.transform(df)
    df = df.drop('Name')
    return df
    #return df
    #raise NotImplementedError()
    
result = extract_civility(data)
a = result.toPandas()
#a[a.Civility == 'Capt'].head(100)
#a[a.Civility == 'the Countess'].head(100)
#result.toPandas()[['Title']].head(10)
#resultCols = result.columns
#print(resultCols)

In [10]:
"""
Graded cell

4 points
"""
result = extract_civility(data)
resultCols = result.columns
assert 'Name' not in resultCols
assert 'Civility' in resultCols
assert list(result.select('Civility').distinct().toPandas()['Civility'].sort_values().values) == [
    'Capt',
    'Col',
    'Don',
    'Dr',
    'Jonkheer',
    'Lady',
    'Major',
    'Master',
    'Miss',
    'Mlle',
    'Mme',
    'Mr',
    'Mrs',
    'Ms',
    'Rev',
    'Sir',
    'the Countess'
]

# Question 

[One hot encode](https://spark.apache.org/docs/2.2.0/ml-features.html#onehotencoder) `Sex`, `Civility` and `Embarked` columns into `SexVec`, `CivilityVec` and `EmbarkedVec`. Don't forget to drop the original columns.

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer

def one_hot_encode(df):
    """
    Return dataframe one hot encoding selected columns    
    """
    inputCol = ["Sex", "Civility", "Embarked"]
    #outputCol = ["SexVec", "CivilityVec", "EmbarkedVec"]
    #stringIndexer = StringIndexer(inputCol=*inputCol, outputCol=*outputCol)
    indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in inputCol]
    
    pipeline = Pipeline(stages=indexers)
    df2 = pipeline.fit(df).transform(df)

    #df2.show()

    #model = stringIndexer.fit(df)
    #indexed = model.transform(df)
    #indexed.show()
    
    #df2.show()
    #print(inputCol)
    df3 = OneHotEncoder(inputCol="Sex_index", outputCol="SexVec").transform(df2)
    df3 = OneHotEncoder(inputCol="Civility_index", outputCol="CivilityVec").transform(df3)
    df3 = OneHotEncoder(inputCol="Embarked_index", outputCol="EmbarkedVec").transform(df3)
    #df3 = [OneHotEncoder(inputCol=*column+"_index", outputCol=*column+"Vec").transform(df2) for column in inputCol]
    
    #encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"Vec") for column in inputCol]
    #pipeline2 = Pipeline(stages=encoders)
    #df3 = pipeline2.transform(df2)

    # Drop the original columns.
    columns_to_drop = ["Sex", "Civility", "Embarked", "Sex_index", "Civility_index", "Embarked_index"]
    df3 = df3.drop(*columns_to_drop)
    
    return df3
    #return df3.toPandas()
    #encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
    #encoded = encoder.transform(indexed)
    #encoded.show()
    #raise NotImplementedError()
result = one_hot_encode(extract_civility(data))
resultCols = result.columns
print(resultCols)
print(len(resultCols))
result.show()

['PassengerId', 'Survived', 'Pclass', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'SexVec', 'CivilityVec', 'EmbarkedVec']
12
+-----------+--------+------+----+-----+-----+----------------+-------+-----+-------------+--------------+-------------+
|PassengerId|Survived|Pclass| Age|SibSp|Parch|          Ticket|   Fare|Cabin|       SexVec|   CivilityVec|  EmbarkedVec|
+-----------+--------+------+----+-----+-----+----------------+-------+-----+-------------+--------------+-------------+
|          1|       0|     3|22.0|    1|    0|       A/5 21171|   7.25| null|(1,[0],[1.0])|(16,[0],[1.0])|(2,[0],[1.0])|
|          2|       1|     1|38.0|    1|    0|        PC 17599|71.2833|  C85|    (1,[],[])|(16,[2],[1.0])|(2,[1],[1.0])|
|          3|       1|     3|26.0|    0|    0|STON/O2. 3101282|  7.925| null|    (1,[],[])|(16,[1],[1.0])|(2,[0],[1.0])|
|          4|       1|     1|35.0|    1|    0|          113803|   53.1| C123|    (1,[],[])|(16,[2],[1.0])|(2,[0],[1.0])|
|          5|       

In [12]:
"""
Graded cell

4 points
"""
result = one_hot_encode(extract_civility(data))
resultCols = result.columns
assert len(resultCols) == 12

assert 'SexVec' in resultCols
assert 'CivilityVec' in resultCols
assert 'EmbarkedVec' in resultCols

assert 'Sex' not in resultCols
assert 'Civility' not in resultCols
assert 'Embarked' not in resultCols

assert result.schema['SexVec'].simpleString() == 'SexVec:vector'
assert result.schema['CivilityVec'].simpleString() == 'CivilityVec:vector'
assert result.schema['EmbarkedVec'].simpleString() == 'EmbarkedVec:vector'

# Question

Now that we have created all of our numeric features, we need to assemble them into the same column. This is the goal of the [VectorAssembler](https://spark.apache.org/docs/2.2.0/ml-features.html#vectorassembler) transformer.

In [13]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

def feature_assemble(df, featureCols):
    """
    Assemble all features in the featureCols list into one column called 'features'.
    """
    assembler = VectorAssembler(inputCols=featureCols, outputCol="features")
    output = assembler.transform(df)
    return output
    raise NotImplementedError()

In [14]:
"""
Graded cell

2 points
"""
result = feature_assemble(data, ['Pclass', 'SibSp', 'Parch'])
assert 'features' in result.columns
assert result.schema['features'].simpleString() == 'features:vector'

---
#### All the data preparation has been made. After running the following cell, we can concentrate on running ML modelling.

For comparison purposes, let's try a Logistic Regression from MLlib and ML on the dataset.

In [15]:
# prepare the data !
features = ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare', 'SexVec', 'CivilityVec', 'EmbarkedVec']
prepared_data = feature_assemble(one_hot_encode(extract_civility(replace_na(data))), features)
prepared_data = prepared_data.withColumnRenamed("Survived", "label").select(['label', 'features'])
train, test = prepared_data.randomSplit([0.75, 0.25], 0)

train.cache()
test.cache()

DataFrame[label: int, features: vector]

---
## MLlib - RDD based API

We will first use the RDD-based [Logistic Regression](https://spark.apache.org/docs/2.2.0/mllib-linear-methods.html#logistic-regression). The exercise comes into two steps :

1. First, you must create a RDD of LabeledPoint(label, features). Also careful as we are using `pyspark.ml.linalg.SparseVector` but the RDD-based API expects `pyspark.mllib.linalg.SparseVector`, so we need to [convert it](http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html?highlight=linearregressionwithsgd#pyspark.mllib.linalg.Vectors.fromML).
2. Then you can apply LogisticRegression on it.

# Question

Train a logistic regression model on the train dataset.

In [16]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

def dataframe_to_labeledpoints(df):
    """
    This function takes the conversion from a DataFrame of columns [label, features] to a RDD of LabeledPoint.    
    """
    from pyspark.mllib.regression import LabeledPoint
    from pyspark.mllib.linalg import Vectors
    
    return df.rdd.map(lambda row: LabeledPoint(row[0], Vectors.fromML(row[1])))

def train_mllib_logistic(train):
    """
    Return a MLlib logistic regression trained on a RDD of LabeledPoint. 
    """
    # Build the model
    model = LogisticRegressionWithLBFGS.train(train)

    # Evaluating the model on training data
    labelsAndPreds = train.map(lambda p: (p.label, model.predict(p.features)))
    trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(train.count())
    print("Training Error = " + str(trainErr))
    return model
    raise NotImplementedError()

In [17]:
"""
Graded cell

4 points
"""
from pyspark.mllib.evaluation import BinaryClassificationMetrics

train_rdd = dataframe_to_labeledpoints(train)
test_rdd = dataframe_to_labeledpoints(test)
model = train_mllib_logistic(train_rdd)

predictionAndLabels = test_rdd.map(lambda lp: (float(model.predict(lp.features)), lp.label))

metrics = BinaryClassificationMetrics(predictionAndLabels)
assert metrics.areaUnderROC > 0.75  # I managed ~0.8 on my first try

Training Error = 0.1744186046511628


---
## ML - DataFrame based API

We now compare with using the ML [Logistic regression](https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#binomial-logistic-regression). It should work directly on our dataset.

In [18]:
from pyspark.ml.classification import LogisticRegression

def train_ml_logistic(train):
    """
    Return a MLlib logistic regression trained on a RDD of LabeledPoint. 
    """
    lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

    # Fit the model
    lrModel = lr.fit(train)

    # Print the coefficients and intercept for logistic regression
    print("Coefficients: " + str(lrModel.coefficients))
    print("Intercept: " + str(lrModel.intercept))

    # We can also use the multinomial family for binary classification
    mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

    # Fit the model
    mlrModel = mlr.fit(train)

    # Print the coefficients and intercepts for logistic regression with multinomial family
    print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
    print("Multinomial intercepts: " + str(mlrModel.interceptVector))
    return lrModel #mlrModel
    raise NotImplementedError()

In [19]:
"""
Graded cell

3 points
"""
from pyspark.ml.evaluation import BinaryClassificationEvaluator

model = train_ml_logistic(train)
assert model.summary.areaUnderROC > 0.75 # managed 0.87 on my first try

predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator()
assert evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"}) > 0.75 # managed 0.88 on my first try

Coefficients: (24,[5,6],[-0.025454610373626316,-0.12952547089307428])
Intercept: -0.40193996728772446
Multinomial coefficients: 2 X 24 CSRMatrix
(0,5) 0.0129
(0,6) 0.0711
(1,5) -0.0129
(1,6) -0.0711
Multinomial intercepts: [0.19674892392626103,-0.19674892392626103]


In [20]:
spark.stop()