### Creating Spark session

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('titanic').getOrCreate()

In [2]:
#import important functions

In [3]:
from itertools import chain
from pyspark.sql.functions import count, mean, when, lit, create_map, regexp_extract

### Read the dataset

In [4]:
data1 = spark.read.csv('train.csv', inferSchema=True, header=True)
data2 = spark.read.csv('test.csv', inferSchema=True, header=True)

In [5]:
#what information dataset contains

In [6]:
data1.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [7]:
#showing sample 5 records 

In [8]:
data1.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Let's see summary of some important columns**

In [9]:
data1.select('Survived', 'Pclass', 'Age', 'Fare').summary().show()

+-------+-------------------+------------------+------------------+-----------------+
|summary|           Survived|            Pclass|               Age|             Fare|
+-------+-------------------+------------------+------------------+-----------------+
|  count|                891|               891|               714|              891|
|   mean| 0.3838383838383838| 2.308641975308642| 29.69911764705882| 32.2042079685746|
| stddev|0.48659245426485753|0.8360712409770491|14.526497332334035|49.69342859718089|
|    min|                  0|                 1|              0.42|              0.0|
|    25%|                  0|                 2|              20.0|           7.8958|
|    50%|                  0|                 3|              28.0|          14.4542|
|    75%|                  1|                 3|              38.0|             31.0|
|    max|                  1|                 3|              80.0|         512.3292|
+-------+-------------------+------------------+------

here we can see the average age of person is close to 30 with minimum age of 0.4 and maximum age of 80yrs

In [10]:
print('Number of Rows : ', data1.count())
print('Number of Columns : ', len(data1.columns))

Number of Rows :  891
Number of Columns :  12


dataset contains total 891 records with 12 different columns

**How many people survived?**

In [11]:
data1.groupBy('Survived').count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



Out of 891 people 342 survived and 549 lost their lives

**Average fare and age**

In [12]:
data1.groupBy('Survived').mean('Fare', 'Age').show()

+--------+------------------+------------------+
|Survived|         avg(Fare)|          avg(Age)|
+--------+------------------+------------------+
|       1| 48.39540760233917|28.343689655172415|
|       0|22.117886885245877| 30.62617924528302|
+--------+------------------+------------------+



here we can see people with chepeaset fare (less money) are likely to died and those with high fair fare are survived. 
age also matters here !

**Number of male/female survived**

In [13]:
data1.groupBy('Survived').pivot('sex').count().show()

+--------+------+----+
|Survived|female|male|
+--------+------+----+
|       1|   233| 109|
|       0|    81| 468|
+--------+------+----+



Number of women died are much less than men, which shows women are given priority while rescuing. Also, number of female 
survived are more than male

In [14]:
data1.groupBy('Survived').pivot('SibSp').count().show()

+--------+---+---+---+---+---+----+----+
|Survived|  0|  1|  2|  3|  4|   5|   8|
+--------+---+---+---+---+---+----+----+
|       1|210|112| 13|  4|  3|null|null|
|       0|398| 97| 15| 12| 15|   5|   7|
+--------+---+---+---+---+---+----+----+



person travelling alone is likely to die. Which shows number of people travelling together also matters during accident.

In [15]:
data1.groupBy('Survived').pivot('Embarked').count().show()

+--------+----+---+---+---+
|Survived|null|  C|  Q|  S|
+--------+----+---+---+---+
|       1|   2| 93| 30|217|
|       0|null| 75| 47|427|
+--------+----+---+---+---+



**Let's see if we have missing data**

In [16]:
for col in data1.columns:
    print(col.ljust(15), data1.filter(data1[col].isNull()).count())

PassengerId     0
Survived        0
Pclass          0
Name            0
Sex             0
Age             177
SibSp           0
Parch           0
Ticket          0
Fare            0
Cabin           687
Embarked        2


columns **Cabin , Embarked and Sex** have missing values

There are many Cabin info is missing. The Cabin is related to Pclass. We will drop this feature. So no problem so far. There are, 2 entries of Embarked missing. We will fill it with the most repeated value S. Age of many people is missing. Again the simplest way to impute the age would be to fill by the average. We choose median for fare imputation. We use Spark's fillna() method to do that. For age we use more complex imputation method discussed below. For now I am just focusing on the train data. There can be different feature missing in the test data. Acutally there is missed fair in test data. So we calculate median fair also.

In [17]:
data1.select('Embarked', 'Fare').summary('mean', '50%', 'max').show()

+-------+--------+----------------+
|summary|Embarked|            Fare|
+-------+--------+----------------+
|   mean|    null|32.2042079685746|
|    50%|    null|         14.4542|
|    max|       S|        512.3292|
+-------+--------+----------------+



**Let's Fill Null values based on above information**

In [18]:
data1 = data1.fillna({'Embarked':'S', 'Fare':14.45})

The basic idea for age imputation is to take the title of the people from the name column and impute with the average age of the group of people with that title. Mrs tend to be older than Miss. 

First, we extract the title using the regular expression and observe the count and average age with each of the titles.

In [19]:
data1 = data1.withColumn('Title', regexp_extract(data1['Name'],'([A-Za-z]+)\.', 1))

In [20]:
data1.show(4)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Title|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|   Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|  Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S| Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|  Mrs|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
only showing top 4 rows



In [21]:
data1.groupBy('Title').agg(count('Age'), mean('Age')).sort('count(Age)').show()

+--------+----------+------------------+
|   Title|count(Age)|          avg(Age)|
+--------+----------+------------------+
|     Mme|         1|              24.0|
|    Capt|         1|              70.0|
|     Don|         1|              40.0|
|     Sir|         1|              49.0|
|    Lady|         1|              48.0|
|Jonkheer|         1|              38.0|
|      Ms|         1|              28.0|
|Countess|         1|              33.0|
|   Major|         2|              48.5|
|     Col|         2|              58.0|
|    Mlle|         2|              24.0|
|     Rev|         6|43.166666666666664|
|      Dr|         6|              42.0|
|  Master|        36| 4.574166666666667|
|     Mrs|       108|35.898148148148145|
|    Miss|       146|21.773972602739725|
|      Mr|       398|32.368090452261306|
+--------+----------+------------------+



It is seen that Mr, Miss, and Mrs are highly repeated than other titles. The count of Master is not that high but its average age is much lower than others. So we keep those four titles and map other with one of the first three.

In [22]:
title_dic = {'Mr':'Mr', 'Miss':'Miss', 'Mrs':'Mrs', 'Master':'Master', \
             'Mlle': 'Miss', 'Major': 'Mr', 'Col': 'Mr', 'Sir': 'Mr',\
             'Don': 'Mr', 'Mme': 'Miss', 'Jonkheer': 'Mr', 'Lady': 'Mrs',\
             'Capt': 'Mr', 'Countess': 'Mrs', 'Ms': 'Miss', 'Dona': 'Mrs', \
             'Dr':'Mr', 'Rev':'Mr'}

In [23]:
mapping = create_map([lit(x) for x in chain(*title_dic.items())])

data1 = data1.withColumn('Title', mapping[data1['Title']])
data1.groupBy('Title').mean('Age').show()

+------+------------------+
| Title|          avg(Age)|
+------+------------------+
|  Miss|             21.86|
|Master| 4.574166666666667|
|    Mr| 33.02272727272727|
|   Mrs|35.981818181818184|
+------+------------------+



Now we create a function that imputes the age column with the average age of the group of people having the same name title as theirs. And use it to impute the ages in the next stage. 

In [24]:
def put_age(data, title, req_age):
    return data.withColumn('Age', when((data['Age'].isNull()) & (data['Title']==title),req_age).otherwise(data['Age']))

In [25]:
data1 = put_age(data1, 'Mr', 33.02)
data1 = put_age(data1, 'Mrs', 35.98)
data1 = put_age(data1, 'Miss', 21.86)
data1 = put_age(data1, 'Master', 4.75)

In [26]:
data1.show()

+-----------+--------+------+--------------------+------+-----+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex|  Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+-----+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38.0|    1|    0|        PC 17599|71.2833|  C85|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35.0|    1|    0|          113803|   53.1| C123|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35.0|    0|    0|          373450|   8.05| null|       S|    Mr|
|          6|   

In [27]:
for col in data1.columns:
    print(col.ljust(15), data1.filter(data1[col].isNull()).count())

PassengerId     0
Survived        0
Pclass          0
Name            0
Sex             0
Age             0
SibSp           0
Parch           0
Ticket          0
Fare            0
Cabin           687
Embarked        0
Title           0


So there are no null values except Cabin column 

Let's create a new column called FamilySize combining Parch and SibSp. We use withColumn() method to do that. The first input in the method is a string of the name of the new column. This creates a new column and also keeps the old columns. We will drop the Parch and SibSp column afterward

In [28]:
data1 = data1.withColumn('FamilySize', data1['SibSp']+data1['Parch']).drop('Parch', 'SibSp')

In [29]:
data1.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked',
 'Title',
 'FamilySize']

In [30]:
data1.show(4)

+-----------+--------+------+--------------------+------+----+----------------+-------+-----+--------+-----+----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|          Ticket|   Fare|Cabin|Embarked|Title|FamilySize|
+-----------+--------+------+--------------------+------+----+----------------+-------+-----+--------+-----+----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|       A/5 21171|   7.25| null|       S|   Mr|         1|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|        PC 17599|71.2833|  C85|       C|  Mrs|         1|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|STON/O2. 3101282|  7.925| null|       S| Miss|         0|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|          113803|   53.1| C123|       S|  Mrs|         1|
+-----------+--------+------+--------------------+------+----+----------------+-------+-----+--------+-----+----------+
only showing top 4 rows



Let's drop the unwanted columns.

In [31]:
data1 = data1.drop('PassengerID', 'Cabin', 'Name', 'Ticket', 'Title')

In [32]:
data1.show(4)

+--------+------+------+----+-------+--------+----------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|FamilySize|
+--------+------+------+----+-------+--------+----------+
|       0|     3|  male|22.0|   7.25|       S|         1|
|       1|     1|female|38.0|71.2833|       C|         1|
|       1|     3|female|26.0|  7.925|       S|         0|
|       1|     1|female|35.0|   53.1|       S|         1|
+--------+------+------+----+-------+--------+----------+
only showing top 4 rows



So, Now we have required data with all the important columns and no missing values

### Model Building 

So far we used Spark dataframe available in Spark SQL for EDA and feature engineering. Now we will use the Spark ML library to do ML tasks. 
Will use the following features :
- StringIndexer: Converts string categories to numerical categories. 
- Vector Assembler: Special to Spark API. convert features to spark accepted numeric data. 
- Logistic regression based on Ridge and Lasso regularization. 
- Tree-based ensemble methods: Random forest and Gradient boosting. 
- Pipeline: It is a big deal for big data. 

In [33]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression,\
                    RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#### String Indexer

We will convert the Sex and Embarked column from string to numeric index. This creates a new column for numeric leaving the original intact. So we will remove them afterward.

In [34]:
StringIndex = StringIndexer(inputCols=['Sex', 'Embarked'], outputCols=['SexNum', 'EmbNum'])
StringIndex_model = StringIndex.fit(data1)
data1_ = StringIndex_model.transform(data1).drop('Sex', 'Embarked')

In [35]:
data1_.show(4)

+--------+------+----+-------+----------+------+------+
|Survived|Pclass| Age|   Fare|FamilySize|SexNum|EmbNum|
+--------+------+----+-------+----------+------+------+
|       0|     3|22.0|   7.25|         1|   0.0|   0.0|
|       1|     1|38.0|71.2833|         1|   1.0|   1.0|
|       1|     3|26.0|  7.925|         0|   1.0|   0.0|
|       1|     1|35.0|   53.1|         1|   1.0|   0.0|
+--------+------+----+-------+----------+------+------+
only showing top 4 rows



We can see Newly added columns SexNum and EmbNum

#### Vectorassembler - transform all features in single matrix which spark understands

We will apply Vectorassembler on only feature columns i.e except Survived column

In [36]:
VectAssembler = VectorAssembler(inputCols=['Pclass','Age','Fare','FamilySize','SexNum','EmbNum'], outputCol='features')

In [37]:
VectAssembler = VectorAssembler(inputCols=['Pclass','Age','Fare','FamilySize','SexNum','EmbNum'], outputCol='features')
data1_ = VectAssembler.transform(data1_).select('features', 'Survived')
data1_.show(3)

+--------------------+--------+
|            features|Survived|
+--------------------+--------+
|[3.0,22.0,7.25,1....|       0|
|[1.0,38.0,71.2833...|       1|
|[3.0,26.0,7.925,0...|       1|
+--------------------+--------+
only showing top 3 rows



Now we split the training data into the train and validation part. We split the data into a 7:3 ratio.

In [38]:
train_data, valid_data = data1_.randomSplit([0.7,0.3])

In [39]:
train_data.show(4, truncate=False)

+---------------------+--------+
|features             |Survived|
+---------------------+--------+
|(6,[0,1],[1.0,38.0]) |0       |
|(6,[0,1],[1.0,40.0]) |0       |
|(6,[0,1],[2.0,33.02])|0       |
|(6,[0,1],[2.0,33.02])|0       |
+---------------------+--------+
only showing top 4 rows



### Linear regression Model 

In spark API  𝛼  is eleasticNetParam and  𝜆  is regParam. We can make our model Ridge by choosing  𝛼=0  and Lasso by choosing  𝛼=1 .

**Ridge regression**

In [40]:
ridge = LogisticRegression(labelCol='Survived', maxIter=100, 
                        elasticNetParam=0, # Ridge regression is choosen 
                        regParam=0.03)

lr_model = ridge.fit(train_data)
preds = lr_model.transform(valid_data)


In [41]:
evaluator = MulticlassClassificationEvaluator(labelCol='Survived', 
                                          metricName='accuracy')

In [42]:
evaluator.evaluate(preds)

0.8098859315589354

**Lasso regression**

In [43]:
lasso = LogisticRegression(labelCol='Survived', 
                           maxIter=100,
                           elasticNetParam=1, # Lasso
                           regParam=0.0003)

model = lasso.fit(train_data)
pred = model.transform(valid_data)

evaluator.evaluate(pred)

0.7908745247148289

Ridge performed well on train dataset than Lasso Regression

### test data prediction

In [44]:
data2.columns

['PassengerId',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [45]:
for col in data2.columns:
    print(col.ljust(15), data2.filter(data2[col].isNull()).count())

PassengerId     0
Pclass          0
Name            0
Sex             0
Age             86
SibSp           0
Parch           0
Ticket          0
Fare            1
Cabin           327
Embarked        0


In [46]:
data2 = data2.withColumn('Title', regexp_extract(data2['Name'],'([A-Za-z]+)\.', 1))
mapping = create_map([lit(x) for x in chain(*title_dic.items())])

data2 = data2.withColumn('Title', mapping[data2['Title']])
data2.groupBy('Title').mean('Age').show()

+------+------------------+
| Title|          avg(Age)|
+------+------------------+
|  Miss|21.774843750000002|
|Master| 7.406470588235294|
|    Mr|32.340425531914896|
|   Mrs|38.904761904761905|
+------+------------------+



In [47]:
data2 = put_age(data2, 'Mr', 32.34)
data2 = put_age(data2, 'Mrs', 38.90)
data2 = put_age(data2, 'Miss', 21.77)
data2 = put_age(data2, 'Master', 7.41)

data2 = data2.withColumn('FamilySize', data2['SibSp']+data2['Parch']).drop('Parch', 'SibSp')
data2 = data2.drop('PassengerID', 'Cabin', 'Name', 'Ticket', 'Title')

In [48]:
data2.show(4)

+------+------+----+------+--------+----------+
|Pclass|   Sex| Age|  Fare|Embarked|FamilySize|
+------+------+----+------+--------+----------+
|     3|  male|34.5|7.8292|       Q|         0|
|     3|female|47.0|   7.0|       S|         1|
|     2|  male|62.0|9.6875|       Q|         0|
|     3|  male|27.0|8.6625|       S|         0|
+------+------+----+------+--------+----------+
only showing top 4 rows



In [49]:
StringIndex_model = StringIndex.fit(data2)
data2_ = StringIndex_model.transform(data2).drop('Sex', 'Embarked')

data2_ = VectAssembler.transform(data2_).select('features')
data2_.show(3)

+--------------------+
|            features|
+--------------------+
|[3.0,34.5,7.8292,...|
|[3.0,47.0,7.0,1.0...|
|[2.0,62.0,9.6875,...|
+--------------------+
only showing top 3 rows



In [50]:
#Ridge regression
preds = lr_model.transform(data2_)
preds.show()

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|[3.0,34.5,7.8292,...|[1.74525955071606...|[0.85135389466907...|       0.0|
|[3.0,47.0,7.0,1.0...|[0.60075599034216...|[0.64582924585057...|       0.0|
|[2.0,62.0,9.6875,...|[1.79303891301258...|[0.85729945215217...|       0.0|
|[3.0,27.0,8.6625,...|[1.82230471079401...|[0.86084244431059...|       0.0|
|[3.0,22.0,12.2875...|[-0.1646586599166...|[0.45892808987432...|       1.0|
|[3.0,14.0,9.225,0...|[1.35890344900389...|[0.79558142164823...|       0.0|
|[3.0,30.0,7.6292,...|[-0.4902906299574...|[0.37982510505628...|       1.0|
|[2.0,26.0,29.0,2....|[1.07703269074589...|[0.74593203700064...|       0.0|
|[3.0,18.0,7.2292,...|[-0.7420480526552...|[0.32255645347785...|       1.0|
|[3.0,21.0,24.15,2...|[1.83783109502792...|[0.86269199287453...|       0.0|
|[3.0,32.34,

In [51]:
#Lasso regression
preds = model.transform(data2_)
preds.show()

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|[3.0,34.5,7.8292,...|[2.27050514274402...|[0.90640465047680...|       0.0|
|[3.0,47.0,7.0,1.0...|[0.88320973414331...|[0.70748691504359...|       0.0|
|[2.0,62.0,9.6875,...|[2.41415067311868...|[0.91790001788674...|       0.0|
|[3.0,27.0,8.6625,...|[2.21395951120279...|[0.90149609431627...|       0.0|
|[3.0,22.0,12.2875...|[-0.2476074391040...|[0.43841247723176...|       1.0|
|[3.0,14.0,9.225,0...|[1.51603847443155...|[0.81995438488509...|       0.0|
|[3.0,30.0,7.6292,...|[-0.5938187368037...|[0.35575913925498...|       1.0|
|[2.0,26.0,29.0,2....|[1.23695294487676...|[0.77503318491687...|       0.0|
|[3.0,18.0,7.2292,...|[-1.0632398858213...|[0.25669079529419...|       1.0|
|[3.0,21.0,24.15,2...|[2.30427946180217...|[0.90923084254244...|       0.0|
|[3.0,32.34,