** This is a popular dataset for machine learning. A description of the dataset can be found at https://www.kaggle.com/c/titanic/data (our dataset corresponds to the train.csv file from Kaggle). You should train a logistic regression model using this dataset and the pyspark.ml package. The goal is to predict for each passenger whether he/she survive the Titanic tragedy as well as to use the pipeline and feature functionality of pyspark.ml.  ** 

In [1]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
import pandas as pd
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.stat import Statistics
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.util import MLUtils

# Step 1:  The goal of this step is to read the data from the local folder. You should infer the schema (e.g., columns) from the csv file. Tip: explore the spark_csv package from databricks. 

In [3]:
filepath = 'file:///home/chloe/spark/titanic/titanic_train.csv'
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(filepath)
df

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

# Step 2: The goal of this step is to familiarize yourself with the dataset. This step is useful in detecting data problems, informing the data engineering steps, and informing the feature selection processes. You should:
## 1. Print the dataset and verify that the schema contains all the variables. 


In [4]:
data = df.rdd

In [5]:
df.toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.2500,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.9250,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1000,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.0500,,S
5,6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
6,7,0,1,"McCarthy, Mr. Timothy J",male,54.0,0,0,17463,51.8625,E46,S
7,8,0,3,"Palsson, Master. Gosta Leonard",male,2.0,3,1,349909,21.0750,,S
8,9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27.0,0,2,347742,11.1333,,S
9,10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14.0,1,0,237736,30.0708,,C


## 2. Print the first 10 rows from the dataset. 

In [6]:
df.toPandas().head(n = 10)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S
5,6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
6,7,0,1,"McCarthy, Mr. Timothy J",male,54.0,0,0,17463,51.8625,E46,S
7,8,0,3,"Palsson, Master. Gosta Leonard",male,2.0,3,1,349909,21.075,,S
8,9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27.0,0,2,347742,11.1333,,S
9,10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14.0,1,0,237736,30.0708,,C


## 3. Obtain summary statistics for all variables in the dataframe. Pay attention to whether there are missing data as well as whether the field appears to be continuous or discrete.

In [29]:
df.describe().toPandas()

Unnamed: 0,summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
1,mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
2,stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
3,min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
4,max,891.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


**Number of missing data**

In [30]:
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).toPandas()


Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,0,0,0,0,0,177,0,0,0,0,687,2


In [182]:
df = df.filter(df.Embarked. isNotNull())

## 4. For each of the string columns (except name and ticket), print the count of the 10 most frequent values ordered by descending order of frequency.


In [183]:
df.groupBy("Sex").count().orderBy(desc("count")).toPandas()

Unnamed: 0,Sex,count
0,male,577
1,female,312


In [184]:
cabinSort = df.groupBy("Cabin").count().orderBy(desc("count"))
window = Window.orderBy(desc("count"))
cabinSort.select('*',rank().over(window).alias('rank')).filter(col('rank')<10).toPandas()

Unnamed: 0,Cabin,count,rank
0,,687,1
1,B96 B98,4,2
2,G6,4,2
3,C23 C25 C27,4,2
4,F2,3,5
5,C22 C26,3,5
6,D,3,5
7,E101,3,5
8,F33,3,5


In [185]:
df.groupBy("Embarked").count().orderBy(desc("count")).toPandas()

Unnamed: 0,Embarked,count
0,S,644
1,C,168
2,Q,77


## 5. Based on the above, which columns would you keep as features and which would you drop? Justify your answer.
- Keep: Pclass, Sex, Age, SibSp, Parch, Fare, Embarked
- Drop: 
    - PassengerId, Name, Ticket: only related to every specific person. No traits to follow
	- Cabin: too many missing variables. All other values are no greater than 4


# Step 3: The goal of this step is to engineer the necessary features for the machine learning model. You should:

## 1. Select all feature columns you plan to use in addition to the target variable (i.e., ‘Survived’) and covert all numerical columns into double data type. Tip: you can use the .cast() from pyspark.sql.functions

In [186]:
df_s = df.select(df.Survived.cast('Double'),df.Pclass.cast('Double'),'Sex','Age',df.SibSp.cast('Double'),df.Parch.cast('Double'),'Fare','Embarked')

In [187]:
df_s.toPandas().head(n = 10)

Unnamed: 0,Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Embarked
0,0.0,3.0,male,22.0,1.0,0.0,7.25,S
1,1.0,1.0,female,38.0,1.0,0.0,71.2833,C
2,1.0,3.0,female,26.0,0.0,0.0,7.925,S
3,1.0,1.0,female,35.0,1.0,0.0,53.1,S
4,0.0,3.0,male,35.0,0.0,0.0,8.05,S
5,0.0,3.0,male,,0.0,0.0,8.4583,Q
6,0.0,1.0,male,54.0,0.0,0.0,51.8625,S
7,0.0,3.0,male,2.0,3.0,1.0,21.075,S
8,1.0,3.0,female,27.0,0.0,2.0,11.1333,S
9,1.0,2.0,female,14.0,1.0,0.0,30.0708,C


## 2. Replace the missing values in the Age column with the mean value. Create also a new variable (e.g., ‘AgeNA’) indicating whether the value of age was missing or not. 

In [188]:
meanAge = df_s.groupby().mean('Age').take(1)[0]
df_s = df_s.na.fill({'Age':meanAge[0]})

In [189]:
from pyspark.sql.types import DoubleType, IntegerType
function = udf(lambda age: 1 if age == meanAge[0] else 0, IntegerType())
df_s = df_s.select('Survived','Pclass','Sex','Age',function(col('Age')).alias('AgeNA').cast('Double'),'SibSp','Parch','Fare', 'Embarked')

## 3. Print the revised dataframe and recalculate the summary statistics. 

In [190]:
df_s.toPandas().head(n = 10)

Unnamed: 0,Survived,Pclass,Sex,Age,AgeNA,SibSp,Parch,Fare,Embarked
0,0.0,3.0,male,22.0,0.0,1.0,0.0,7.25,S
1,1.0,1.0,female,38.0,0.0,1.0,0.0,71.2833,C
2,1.0,3.0,female,26.0,0.0,0.0,0.0,7.925,S
3,1.0,1.0,female,35.0,0.0,1.0,0.0,53.1,S
4,0.0,3.0,male,35.0,0.0,0.0,0.0,8.05,S
5,0.0,3.0,male,29.642093,1.0,0.0,0.0,8.4583,Q
6,0.0,1.0,male,54.0,0.0,0.0,0.0,51.8625,S
7,0.0,3.0,male,2.0,0.0,3.0,1.0,21.075,S
8,1.0,3.0,female,27.0,0.0,0.0,2.0,11.1333,S
9,1.0,2.0,female,14.0,0.0,1.0,0.0,30.0708,C


In [191]:
df_s.describe().toPandas()

Unnamed: 0,summary,Survived,Pclass,Sex,Age,AgeNA,SibSp,Parch,Fare,Embarked
0,count,889.0,889.0,889,889.0,889.0,889.0,889.0,889.0,889
1,mean,0.3824521934758155,2.3115860517435323,,29.642092696629103,0.1991001124859392,0.5241844769403825,0.3824521934758155,32.09668087739029,
2,stddev,0.4862596883147733,0.8346997785705753,,12.968346294351782,0.3995482811002537,1.103704875596923,0.8067607445174785,49.69750431670795,
3,min,0.0,1.0,female,0.42,0.0,0.0,0.0,0.0,C
4,max,1.0,3.0,male,80.0,1.0,8.0,6.0,512.3292,S


# Step 4: The goal of this step is to encode all string and categorical variables in order to use them in the pipeline afterwards. You should:
## 1. Import all necessary pyspark functions.

In [192]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

## 2. Create indexers and encoders for categorical string variables. Call them [field]_indexer and [field]_encoder, respectively. For instance, gender_indexer and gender_encoder. 

In [193]:
Sex_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndexer')
Embarked_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkedIndexer')
Survived_indexer = StringIndexer(inputCol='Survived', outputCol='SurvivedIndexer')
Sex_encoder = OneHotEncoder(inputCol='SexIndexer', outputCol='SexEncoder')
Embarked_encoder = OneHotEncoder(inputCol = 'EmbarkedIndexer', outputCol='EmbarkedEncoder')

# Step 5: The goal of this step is to assemble all feature columns into a feature vector in order to be used in the pipeline. Tip: you can use the VectorAssembler to do this. 


In [194]:
from pyspark.ml.feature import VectorAssembler

In [195]:
feature_columns = ['SurvivedIndexer','Pclass','Age','AgeNA','SibSp','Parch','Fare','SexIndexer','EmbarkedIndexer','SexEncoder','EmbarkedEncoder']

In [196]:
vectorassembler = VectorAssembler(inputCols=feature_columns, outputCol='features')


# Step 6: The goal of this step is to create the logistic regression model to be used in the pipeline. 

In [280]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3,  labelCol='SurvivedIndexer')

# Step 7: The goal of this step is to assemble the pipeline. 

In [281]:
from pyspark.ml import Pipeline
steps = [Survived_indexer,Sex_indexer,Embarked_indexer,Sex_encoder,Embarked_encoder, vectorassembler, lr]

In [282]:
pipeline = Pipeline(stages = steps)

# Step 8: The goal of this step is to prepare the training and test datasets. You should:
## 1. Use a 70-30 random split for the training and test sets, respectively. 


In [283]:
df_train, df_test = df_s.randomSplit([0.7,0.3])

## 2. Verify the size of each dataset after the split.

In [284]:
print(df_train.count())


629


In [285]:
df_train.toPandas().isnull().sum()

Survived    0
Pclass      0
Sex         0
Age         0
AgeNA       0
SibSp       0
Parch       0
Fare        0
Embarked    0
dtype: int64

In [286]:
print(df_test.count())

260


# Step 9: The goal of this step is to fit the model and then use it on the test set to generate predictions. You should:
## 1. Fit the model using the predefined pipeline on the training set.


In [287]:
lrmodel = pipeline.fit(df_train)

## 2. Use the fitted model for prediction on the test set. 

In [288]:
prediction = lrmodel.transform(df_test)

## 3. Report the logistic regression coefficients. 

In [296]:
print(lrmodel.stages[6].coefficients)
print(lrmodel.stages[6].intercept)


[1.6805303759,-0.184849774031,-0.00507945026516,-0.0693162811811,-0.0480640635515,-0.00877116658413,0.00177060077486,0.445120245867,0.0302315260013,-0.450258468795,-0.0822930322339,0.119906850809]
-0.4898970375617155


## 4. Interpret the obtained coefficients.

In [291]:
lrmodel.stages

[StringIndexer_43f3a2a350fab792961f,
 StringIndexer_40f3bddc855a7e48af56,
 StringIndexer_488383e2ca45206f993e,
 OneHotEncoder_450cbb69b0727f34420e,
 OneHotEncoder_4a18940f8c7612dc4636,
 VectorAssembler_40f2933521989d117811,
 LogisticRegression_4143b888d44ddae38114]

# Step 10: The goal of this step is to evaluate the model performance. You should:
## 1. Print the first 5 rows of the results.


In [292]:
prediction.select("SurvivedIndexer","prediction").show(15)

+---------------+----------+
|SurvivedIndexer|prediction|
+---------------+----------+
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
|            0.0|       0.0|
+---------------+----------+
only showing top 15 rows



## 2. Report the AUC for this model. 


In [294]:
metrics = BinaryClassificationMetrics(prediction.select("prediction","SurvivedIndexer").rdd)

In [295]:
metrics.areaUnderROC

0.9900990099009901