#### Imports

In [1]:
import os
import sys

In [2]:
# load PySpark
try:
    # Append PySpark to PYTHONPATH / Spark 2.1.0
    sys.path.append(os.path.join(os.environ["SPARK_HOME"], "python"))
    sys.path.append(os.path.join(os.environ["SPARK_HOME"], "python", "lib",
                                 "py4j-0.10.4-src.zip"))
except KeyError as e:
    print("SPARK_HOME is not set", e)
    sys.exit(1)

In [3]:
import pickle
import pandas as pd
import numpy as np
import sklearn as skl
from pyspark import SparkConf
from pyspark.sql import SparkSession
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score

In [4]:
print("scikit-learn: {}".format(skl.__version__))
print("pandas: {}".format(pd.__version__))
print("numpy: {}".format(np.__version__))

scikit-learn: 0.18.1
pandas: 0.19.1
numpy: 1.11.2


---

In [5]:
# load data
train_data = pd.read_csv("data/titanic_dataset_train.csv")

In [6]:
train_data.groupby("Sex")["Survived"].value_counts()

Sex     Survived
female  1           233
        0            81
male    0           468
        1           109
Name: Survived, dtype: int64

In [7]:
train_data.head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


In [8]:
train_data.Survived.value_counts(normalize=True)

0    0.616162
1    0.383838
Name: Survived, dtype: float64

In [9]:
train_data.describe()

Unnamed: 0,PassengerId,Survived,Pclass,Age,SibSp,Parch,Fare
count,891.0,891.0,891.0,714.0,891.0,891.0,891.0
mean,446.0,0.383838,2.308642,29.699118,0.523008,0.381594,32.204208
std,257.353842,0.486592,0.836071,14.526497,1.102743,0.806057,49.693429
min,1.0,0.0,1.0,0.42,0.0,0.0,0.0
25%,223.5,0.0,2.0,20.125,0.0,0.0,7.9104
50%,446.0,0.0,3.0,28.0,0.0,0.0,14.4542
75%,668.5,1.0,3.0,38.0,1.0,0.0,31.0
max,891.0,1.0,3.0,80.0,8.0,6.0,512.3292


In [10]:
train_data.Sex.isnull().value_counts()

False    891
Name: Sex, dtype: int64

In [11]:
train_data.Embarked.isnull().value_counts()

False    889
True       2
Name: Embarked, dtype: int64

In [12]:
continuous_variables = train_data[["Pclass", "Age", "SibSp", "Parch", "Fare"]].copy()
continuous_variables.Age = continuous_variables.Age.fillna(-255)

In [13]:
categorical_variables = train_data[["Sex", "Embarked"]].copy()
categorical_variables.Embarked = categorical_variables.Embarked.fillna(-255)

In [14]:
pd.get_dummies(categorical_variables).head()

Unnamed: 0,Sex_female,Sex_male,Embarked_-255,Embarked_C,Embarked_Q,Embarked_S
0,0,1,0,0,0,1
1,1,0,0,1,0,0
2,1,0,0,0,0,1
3,1,0,0,0,0,1
4,0,1,0,0,0,1


In [15]:
pd.concat([continuous_variables.head(), continuous_variables.head()], axis=1)

Unnamed: 0,Pclass,Age,SibSp,Parch,Fare,Pclass.1,Age.1,SibSp.1,Parch.1,Fare.1
0,3,22.0,1,0,7.25,3,22.0,1,0,7.25
1,1,38.0,1,0,71.2833,1,38.0,1,0,71.2833
2,3,26.0,0,0,7.925,3,26.0,0,0,7.925
3,1,35.0,1,0,53.1,1,35.0,1,0,53.1
4,3,35.0,0,0,8.05,3,35.0,0,0,8.05


In [16]:
continuous_variables = train_data[["Pclass", "Age", "SibSp", "Parch", "Fare"]].copy()
continuous_variables.Age = continuous_variables.Age.fillna(-255)
categorical_variables = train_data[["Sex", "Embarked"]].copy()
categorical_variables.Embarked = categorical_variables.Embarked.fillna(-255)
    
y = train_data.Survived.values
X = pd.concat([continuous_variables, pd.get_dummies(categorical_variables)], axis=1).values
    
lg_model = LogisticRegression(random_state=1)
skf = StratifiedKFold(n_splits=3)
    
score = []
for train, test in skf.split(X, y):
    fitted_model = lg_model.fit(X[train], y[train])
    y_pred = fitted_model.predict(X[test])
    y_true = y[test]
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    score.append((accuracy, precision, recall, pickle.dumps(fitted_model), fitted_model, fitted_model.coef_))

#### Notes
- When saving the model directly, it only stores the coefficient from the last trained model
- Serializing the model and then deserializing the model works though

In [17]:
pickle.loads(score[0][3]).coef_

array([[ -8.61683232e-01,   1.33502542e-03,  -1.97196423e-01,
         -1.85513431e-01,   3.12556318e-03,   1.89302858e+00,
         -8.24340242e-01,   7.47929332e-02,   8.08315494e-01,
          8.92525479e-02,   9.63273672e-02]])

In [18]:
pickle.loads(score[1][3]).coef_

array([[ -6.52150734e-01,  -7.02098628e-04,  -2.84662818e-01,
         -8.76863787e-02,   8.35325971e-03,   1.66824332e+00,
         -9.08737509e-01,   1.43698892e-01,   4.48824665e-01,
          1.62178989e-01,   4.80327030e-03]])

In [19]:
pickle.loads(score[2][3]).coef_

array([[ -7.17141076e-01,   8.90771172e-04,  -2.39548127e-01,
          7.91689887e-02,  -2.11482503e-04,   1.78695222e+00,
         -8.48641674e-01,   1.05869373e-01,   2.27665357e-01,
          6.12079781e-01,  -7.30396392e-03]])

In [20]:
score[0][4].coef_

array([[ -7.17141076e-01,   8.90771172e-04,  -2.39548127e-01,
          7.91689887e-02,  -2.11482503e-04,   1.78695222e+00,
         -8.48641674e-01,   1.05869373e-01,   2.27665357e-01,
          6.12079781e-01,  -7.30396392e-03]])

In [21]:
score[1][4].coef_

array([[ -7.17141076e-01,   8.90771172e-04,  -2.39548127e-01,
          7.91689887e-02,  -2.11482503e-04,   1.78695222e+00,
         -8.48641674e-01,   1.05869373e-01,   2.27665357e-01,
          6.12079781e-01,  -7.30396392e-03]])

In [22]:
score[2][4].coef_

array([[ -7.17141076e-01,   8.90771172e-04,  -2.39548127e-01,
          7.91689887e-02,  -2.11482503e-04,   1.78695222e+00,
         -8.48641674e-01,   1.05869373e-01,   2.27665357e-01,
          6.12079781e-01,  -7.30396392e-03]])

---
#### Spark

In [23]:
# Create a SparkSession
spark_session = SparkSession.builder.getOrCreate()

In [24]:
# Usually read from database / could also define schema here and change the type
train_data_df = spark_session.read.csv("data/titanic_dataset_train.csv", header=True)

In [25]:
train_data_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [26]:
train_data_df.dtypes

[('PassengerId', 'string'),
 ('Survived', 'string'),
 ('Pclass', 'string'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'string'),
 ('SibSp', 'string'),
 ('Parch', 'string'),
 ('Ticket', 'string'),
 ('Fare', 'string'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

In [27]:
def train_and_cv_model(data):
    columns = ["Survived", "Pclass", "Age", "Name",
              "Sibsp", "Parch", "Ticket", "Fare",
              "Cabin", "Embarked"]
    data_df = pd.DataFrame(data, columns=columns)
    
    # has to be converted to a number i.e. Survived is of type string due to Spark
    y = data_df.Survived.values.astype(np.float32) 
    
    continuous_variables = data_df[["Pclass", "Age", "Sibsp", "Parch", "Fare"]].copy()
    continuous_variables.Age = continuous_variables.Age.fillna(-255)
    categorical_variables = data_df[["Embarked"]].copy()
    categorical_variables.Embarked = categorical_variables.Embarked.fillna(-255)
    
    X = pd.concat([continuous_variables, pd.get_dummies(categorical_variables)], axis=1).values
    
    lg_model = LogisticRegression(random_state=1)
    skf = StratifiedKFold(n_splits=3)
    
    score = []
    for train, test in skf.split(X, y):
        fitted_model = lg_model.fit(X[train], y[train])
        y_pred = fitted_model.predict(X[test])
        y_true = y[test]
        accuracy = accuracy_score(y_true, y_pred)
        precision = precision_score(y_true, y_pred)
        recall = recall_score(y_true, y_pred)
        score.append((accuracy, precision, recall, pickle.dumps(fitted_model)))
    return score

In [28]:
train_data_grouped = (train_data_df
 .rdd
 .map(lambda x: (x.Sex, (
                x.Survived,
                x.Pclass,
                x.Age,
                x.Name,
                x.SibSp,
                x.Parch,
                x.Ticket,
                x.Fare,
                x.Cabin,
                x.Embarked
            )))
 .groupByKey()
 )

In [29]:
train_data_grouped.map(lambda x: (x[0], train_and_cv_model(list(x[1])))).take(2)

[('female',
  [(0.73333333333333328,
    0.79761904761904767,
    0.85897435897435892,
    b'\x80\x03csklearn.linear_model.logistic\nLogisticRegression\nq\x00)\x81q\x01}q\x02(X\x11\x00\x00\x00intercept_scalingq\x03K\x01X\x0b\x00\x00\x00multi_classq\x04X\x03\x00\x00\x00ovrq\x05X\x10\x00\x00\x00_sklearn_versionq\x06X\x06\x00\x00\x000.18.1q\x07X\x07\x00\x00\x00verboseq\x08K\x00X\x0c\x00\x00\x00random_stateq\tK\x01X\n\x00\x00\x00warm_startq\n\x89X\x08\x00\x00\x00classes_q\x0bcnumpy.core.multiarray\n_reconstruct\nq\x0ccnumpy\nndarray\nq\rK\x00\x85q\x0eC\x01bq\x0f\x87q\x10Rq\x11(K\x01K\x02\x85q\x12cnumpy\ndtype\nq\x13X\x02\x00\x00\x00f4q\x14K\x00K\x01\x87q\x15Rq\x16(K\x03X\x01\x00\x00\x00<q\x17NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq\x18b\x89C\x08\x00\x00\x00\x00\x00\x00\x80?q\x19tq\x1abX\x03\x00\x00\x00tolq\x1bG?\x1a6\xe2\xeb\x1cC-X\x07\x00\x00\x00n_iter_q\x1ch\x0ch\rK\x00\x85q\x1dh\x0f\x87q\x1eRq\x1f(K\x01K\x01\x85q h\x13X\x02\x00\x00\x00i4q!K\x00K\x01\x87q"Rq#(K\x03h\x17NNNJ\xff\xff\x

##### Different solution if there are many columns 

In [30]:
train_data_df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [31]:
columns = train_data_df.columns
def create_df(row):
    return pd.DataFrame(row, columns=columns)

In [32]:
train_data_grouped = (train_data_df
 .rdd
 .map(lambda x: (x.Sex, tuple(x))) # converts rows explicty to list -> trick
 .groupByKey()
 )

In [33]:
train_data_grouped.mapValues(lambda x: create_df(list(x))).take(1)[0][1].head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38,1,0,PC 17599,71.2833,C85,C
1,3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
2,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
3,9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27,0,2,347742,11.1333,,S
4,10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14,1,0,237736,30.0708,,C
