# Installing pyspark Module

In [1]:
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable


# Beginning a SparkSession & Building a spark instance

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('classification').getOrCreate()

In [3]:
from pyspark.sql.functions import count, mean, when, lit, create_map, regexp_extract
from itertools import chain

# Loading and Reading the Dataset

In [4]:
data1 = spark.read.csv('train.csv',\
                     header=True, inferSchema=True)
data2 = spark.read.csv('test.csv', \
                     header=True, inferSchema=True)

# Viewing the Dataframe Schema 

In [5]:
data1.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



# Showing First 10 Rows of the Dataframe 

In [6]:
data1.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

# Transforming Spark Dataframe by Limiting and Converting It to a Pandas Dataframe

In [7]:
data1.limit(4).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S


#  Selecting 4 Columns for Inspect within Spark

In [8]:
data1.select('Pclass', 'Fare', 'Survived', 'Age').show(4)

+------+-------+--------+----+
|Pclass|   Fare|Survived| Age|
+------+-------+--------+----+
|     3|   7.25|       0|22.0|
|     1|71.2833|       1|38.0|
|     3|  7.925|       1|26.0|
|     1|   53.1|       1|35.0|
+------+-------+--------+----+
only showing top 4 rows



# Applying the Summary() Method

In [9]:
data1.select('Pclass', 'Fare', 'Survived', 'Age').summary().show()

+-------+------------------+-----------------+-------------------+------------------+
|summary|            Pclass|             Fare|           Survived|               Age|
+-------+------------------+-----------------+-------------------+------------------+
|  count|               891|              891|                891|               714|
|   mean| 2.308641975308642| 32.2042079685746| 0.3838383838383838| 29.69911764705882|
| stddev|0.8360712409770491|49.69342859718089|0.48659245426485753|14.526497332334035|
|    min|                 1|              0.0|                  0|              0.42|
|    25%|                 2|           7.8958|                  0|              20.0|
|    50%|                 3|          14.4542|                  0|              28.0|
|    75%|                 3|             31.0|                  1|              38.0|
|    max|                 3|         512.3292|                  1|              80.0|
+-------+------------------+-----------------+--------

# Showing Total Number of Rows and Columns present in the Dataframe

In [10]:
print('No. of columns present in the dataframe: \t', len(data1.columns))
print('No. of rows present in the dataframe: \t', data1.count())

No. of columns present in the dataframe: 	 12
No. of rows present in the dataframe: 	 891


# Count of People Who Survived

In [11]:
data1.groupBy('Survived').count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



# Finding Average Fare and Age

In [12]:
data1.groupBy('Survived').mean('Age', 'Fare').show()

+--------+------------------+------------------+
|Survived|          avg(Age)|         avg(Fare)|
+--------+------------------+------------------+
|       1|28.343689655172415| 48.39540760233917|
|       0| 30.62617924528302|22.117886885245877|
+--------+------------------+------------------+



# Showing Number of Survival According to Sex

In [13]:
data1.groupBy('Survived').pivot('Sex').count().show()

+--------+------+----+
|Survived|female|male|
+--------+------+----+
|       1|   233| 109|
|       0|    81| 468|
+--------+------+----+



# Displaying Number of Survival According to Number of Siblings

In [14]:
data1.groupBy('Survived').pivot('SibSp').count().show()

+--------+---+---+---+---+---+----+----+
|Survived|  0|  1|  2|  3|  4|   5|   8|
+--------+---+---+---+---+---+----+----+
|       1|210|112| 13|  4|  3|null|null|
|       0|398| 97| 15| 12| 15|   5|   7|
+--------+---+---+---+---+---+----+----+



# Viewing Survival Number According to Class Type

In [15]:
data1.groupBy('Survived').pivot('Pclass').count().show()

+--------+---+---+---+
|Survived|  1|  2|  3|
+--------+---+---+---+
|       1|136| 87|119|
|       0| 80| 97|372|
+--------+---+---+---+



# Showing Survival Number According to Embarked

In [16]:
data1.groupBy('Survived').pivot('Embarked').count().show()

+--------+----+---+---+---+
|Survived|null|  C|  Q|  S|
+--------+----+---+---+---+
|       1|   2| 93| 30|217|
|       0|null| 75| 47|427|
+--------+----+---+---+---+



# Determining the Number of Survival Based on Parch

In [17]:
data1.groupBy('Survived').pivot('Parch').count().show()

+--------+---+---+---+---+----+---+----+
|Survived|  0|  1|  2|  3|   4|  5|   6|
+--------+---+---+---+---+----+---+----+
|       1|233| 65| 40|  3|null|  1|null|
|       0|445| 53| 40|  2|   4|  4|   1|
+--------+---+---+---+---+----+---+----+



# Checking for Null Values

In [18]:
for col in data1.columns:
    print(col.ljust(20), data1.filter(data1[col].isNull()).count())

PassengerId          0
Survived             0
Pclass               0
Name                 0
Sex                  0
Age                  177
SibSp                0
Parch                0
Ticket               0
Fare                 0
Cabin                687
Embarked             2


# Extracting Summary for Embarked and Fare 

In [19]:
data1.select('Embarked', 'Fare').summary('max', '50%', 'mean').show()

+-------+--------+----------------+
|summary|Embarked|            Fare|
+-------+--------+----------------+
|    max|       S|        512.3292|
|    50%|    null|         14.4542|
|   mean|    null|32.2042079685746|
+-------+--------+----------------+



In [20]:
data1 = data1.fillna({'Embarked': 'S', 'Fare':14.45})

# Extracting the Title Using the Regular Expression and Observing the Count and Average Age

In [21]:
data1 = data1.withColumn('Title', regexp_extract(data1['Name'],\
                '([A-Za-z]+)\.', 1))

data1.groupBy('Title').agg(count('Age'), mean('Age')).sort('count(Age)').show()

+--------+----------+------------------+
|   Title|count(Age)|          avg(Age)|
+--------+----------+------------------+
|     Don|         1|              40.0|
|Countess|         1|              33.0|
|    Lady|         1|              48.0|
|     Mme|         1|              24.0|
|    Capt|         1|              70.0|
|     Sir|         1|              49.0|
|Jonkheer|         1|              38.0|
|      Ms|         1|              28.0|
|     Col|         2|              58.0|
|    Mlle|         2|              24.0|
|   Major|         2|              48.5|
|     Rev|         6|43.166666666666664|
|      Dr|         6|              42.0|
|  Master|        36| 4.574166666666667|
|     Mrs|       108|35.898148148148145|
|    Miss|       146|21.773972602739725|
|      Mr|       398|32.368090452261306|
+--------+----------+------------------+



# Keeping Four Titles and Mapping Other With One of the First Three

In [22]:
title_dic = {'Mr':'Mr', 'Miss':'Miss', 'Mrs':'Mrs', 'Master':'Master', \
             'Mlle': 'Miss', 'Major': 'Mr', 'Col': 'Mr', 'Sir': 'Mr',\
             'Don': 'Mr', 'Mme': 'Miss', 'Jonkheer': 'Mr', 'Lady': 'Mrs',\
             'Capt': 'Mr', 'Countess': 'Mrs', 'Ms': 'Miss', 'Dona': 'Mrs', \
             'Dr':'Mr', 'Rev':'Mr'}

mapping = create_map([lit(x) for x in chain(*title_dic.items())])

data1 = data1.withColumn('Title', mapping[data1['Title']])
data1.groupBy('Title').mean('Age').show()

+------+------------------+
| Title|          avg(Age)|
+------+------------------+
|  Miss|             21.86|
|Master| 4.574166666666667|
|    Mr| 33.02272727272727|
|   Mrs|35.981818181818184|
+------+------------------+



#  Creating a Function which Imputes column Age with the Average age

In [23]:
def ageimpute(data, title, age):
    return data.withColumn('Age', \
                         when((data['Age'].isNull()) & (data['Title']==title), \
                              age).otherwise(data['Age']))

# Imputing the Age

In [24]:
data1 = ageimpute(data1, 'Mr', 33.02)
data1 = ageimpute(data1, 'Mrs', 35.98)
data1 = ageimpute(data1, 'Miss', 21.86)
data1 = ageimpute(data1, 'Master', 4.75)

# Creating FamilySize column and Dropping columns SibSp and Parch

In [25]:
data1 = data1.withColumn('FamilySize', data1['Parch'] + data1['SibSp']).\
            drop('SibSp', 'Parch')

# Dropping Unwanted Columns

In [26]:
data1 = data1.drop('Name', 'PassengerID', 'Ticket', 'Title', 'Cabin')

# Viewing Trimmed Dataframe

In [27]:
data1.show(6)

+--------+------+------+-----+-------+--------+----------+
|Survived|Pclass|   Sex|  Age|   Fare|Embarked|FamilySize|
+--------+------+------+-----+-------+--------+----------+
|       0|     3|  male| 22.0|   7.25|       S|         1|
|       1|     1|female| 38.0|71.2833|       C|         1|
|       1|     3|female| 26.0|  7.925|       S|         0|
|       1|     1|female| 35.0|   53.1|       S|         1|
|       0|     3|  male| 35.0|   8.05|       S|         0|
|       0|     3|  male|33.02| 8.4583|       Q|         0|
+--------+------+------+-----+-------+--------+----------+
only showing top 6 rows



# Checking for Missing Values

In [28]:
for col in data1.columns:
    print(col.ljust(20), data1.filter(data1[col].isNull()).count())

Survived             0
Pclass               0
Sex                  0
Age                  0
Fare                 0
Embarked             0
FamilySize           0


# Importing Model building Libraries

In [29]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression,\
                    RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Changing the Embarked and Sex Column from String to Numeric index

In [30]:
stringIndex = StringIndexer(inputCols=['Embarked', 'Sex'], 
                       outputCols=['EmbNum', 'SexNum'])

stringIndex_model = stringIndex.fit(data1)

data1_ = stringIndex_model.transform(data1).drop('Sex', 'Embarked')
data1_.show(6)

+--------+------+-----+-------+----------+------+------+
|Survived|Pclass|  Age|   Fare|FamilySize|EmbNum|SexNum|
+--------+------+-----+-------+----------+------+------+
|       0|     3| 22.0|   7.25|         1|   0.0|   0.0|
|       1|     1| 38.0|71.2833|         1|   1.0|   1.0|
|       1|     3| 26.0|  7.925|         0|   0.0|   1.0|
|       1|     1| 35.0|   53.1|         1|   0.0|   1.0|
|       0|     3| 35.0|   8.05|         0|   0.0|   0.0|
|       0|     3|33.02| 8.4583|         0|   2.0|   0.0|
+--------+------+-----+-------+----------+------+------+
only showing top 6 rows



# Use of VectorAssembler 

In [31]:
vec_asmbl = VectorAssembler(inputCols=data1_.columns[1:], 
                           outputCol='features')

data1_ = vec_asmbl.transform(data1_).select('Survived', 'features')
data1_.show(6, truncate=False)

+--------+------------------------------+
|Survived|features                      |
+--------+------------------------------+
|0       |[3.0,22.0,7.25,1.0,0.0,0.0]   |
|1       |[1.0,38.0,71.2833,1.0,1.0,1.0]|
|1       |[3.0,26.0,7.925,0.0,0.0,1.0]  |
|1       |[1.0,35.0,53.1,1.0,0.0,1.0]   |
|0       |[3.0,35.0,8.05,0.0,0.0,0.0]   |
|0       |[3.0,33.02,8.4583,0.0,2.0,0.0]|
+--------+------------------------------+
only showing top 6 rows



# Splitting the Data for Training and Testing

In [32]:
train_data, valid_data = data1_.randomSplit([0.8, 0.2])

# Showing the Train Data for Top 6 Rows

In [33]:
train_data.show(6, truncate=False)

+--------+---------------------+
|Survived|features             |
+--------+---------------------+
|0       |(6,[0,1],[1.0,33.02])|
|0       |(6,[0,1],[1.0,33.02])|
|0       |(6,[0,1],[1.0,38.0]) |
|0       |(6,[0,1],[1.0,39.0]) |
|0       |(6,[0,1],[1.0,40.0]) |
|0       |(6,[0,1],[2.0,33.02])|
+--------+---------------------+
only showing top 6 rows



# Using MulticlassClassificationEvaluator

In [34]:
evaluator = MulticlassClassificationEvaluator(labelCol='Survived', 
                                          metricName='accuracy')

# Building Logistic Regression Model 

In [35]:
ridge = LogisticRegression(labelCol='Survived', 
                        maxIter=120, 
                        elasticNetParam=0, 
                        regParam=0.03)

model = ridge.fit(train_data)
pred = model.transform(valid_data)
evaluator.evaluate(pred)

0.8092485549132948

# Developing Random Forest Classifier Model

In [36]:
rf = RandomForestClassifier(labelCol='Survived', 
                           numTrees=200, maxDepth=5)

model = rf.fit(train_data)
pred = model.transform(valid_data)
evaluator.evaluate(pred)

0.8265895953757225

# Building GBT Classifier Model 

In [37]:
gb = GBTClassifier(labelCol='Survived', maxIter=100, maxDepth=3)

model = gb.fit(train_data)
pred = model.transform(valid_data)
evaluator.evaluate(pred)

0.838150289017341

# Showing the Test Data

In [38]:
data2.show(6)

+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|        892|     3|    Kelly, Mr. James|  male|34.5|    0|    0| 330911| 7.8292| null|       Q|
|        893|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0| 363272|    7.0| null|       S|
|        894|     2|Myles, Mr. Thomas...|  male|62.0|    0|    0| 240276| 9.6875| null|       Q|
|        895|     3|    Wirz, Mr. Albert|  male|27.0|    0|    0| 315154| 8.6625| null|       S|
|        896|     3|Hirvonen, Mrs. Al...|female|22.0|    1|    1|3101298|12.2875| null|       S|
|        897|     3|Svensson, Mr. Joh...|  male|14.0|    0|    0|   7538|  9.225| null|       S|
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
only showing top 6 rows



# Checking for Missing Values inside Test Data

In [39]:
for col in data2.columns:
    print(col.ljust(20), data2.filter(data2[col].isNull()).count())

PassengerId          0
Pclass               0
Name                 0
Sex                  0
Age                  86
SibSp                0
Parch                0
Ticket               0
Fare                 1
Cabin                327
Embarked             0


# Creating a FamilySize Feature and Dropping the Unwanted

In [40]:
data2 = data2.fillna({'Embarked': 'S', 'Fare':14.45})
data2 = data2.withColumn('FamilySize', data2['Parch'] + data2['SibSp']).\
            drop('Parch', 'SibSp')

#  Imputing Missing Age

In [41]:
data2 = data2.withColumn('Title', regexp_extract(data2['Name'],\
                '([A-Za-z]+)\.', 1))

data2 = data2.withColumn('Title', mapping[data2['Title']])

data2.groupBy('Title').agg(count('Age'), mean('Age')).sort('count(Age)').show()

+------+----------+------------------+
| Title|count(Age)|          avg(Age)|
+------+----------+------------------+
|Master|        17| 7.406470588235294|
|   Mrs|        63|38.904761904761905|
|  Miss|        64|21.774843750000002|
|    Mr|       188|32.340425531914896|
+------+----------+------------------+



# Showing Top 6 Rows after Dropping 4 variables

In [42]:
data2 = ageimpute(data2, 'Mr', 33.02)
data2 = ageimpute(data2, 'Mrs', 35.98)
data2 = ageimpute(data2, 'Miss', 21.86)
data2 = ageimpute(data2, 'Master', 4.75)

data2 = data2.drop('Ticket', 'Cabin', 'Title', 'Name')
data2.show(6)

+-----------+------+------+----+-------+--------+----------+
|PassengerId|Pclass|   Sex| Age|   Fare|Embarked|FamilySize|
+-----------+------+------+----+-------+--------+----------+
|        892|     3|  male|34.5| 7.8292|       Q|         0|
|        893|     3|female|47.0|    7.0|       S|         1|
|        894|     2|  male|62.0| 9.6875|       Q|         0|
|        895|     3|  male|27.0| 8.6625|       S|         0|
|        896|     3|female|22.0|12.2875|       S|         2|
|        897|     3|  male|14.0|  9.225|       S|         0|
+-----------+------+------+----+-------+--------+----------+
only showing top 6 rows



# Checking for Null Values

In [43]:
for col in data2.columns:
    print(col.ljust(20), data2.filter(data2[col].isNull()).count())

PassengerId          0
Pclass               0
Sex                  0
Age                  0
Fare                 0
Embarked             0
FamilySize           0


# Grid-search and Cross-validation

In [44]:
pipeline_rf = Pipeline(stages=[stringIndex, vec_asmbl, rf])

paramGrid = ParamGridBuilder().\
            addGrid(rf.maxDepth, [3, 4, 5]).\
            addGrid(rf.minInfoGain, [0., 0.01, 0.1]).\
            addGrid(rf.numTrees, [1000]).\
            build()

selected_model = CrossValidator(estimator=pipeline_rf, 
                                estimatorParamMaps=paramGrid, 
                                evaluator=evaluator, 
                                numFolds=5)

model_final = selected_model.fit(data1)
pred_train = model_final.transform(data1)
evaluator.evaluate(pred_train)

0.8484848484848485

#  In-sample Accuracy

In [45]:
pred_test = model_final.transform(data2)

predictions = pred_test.select('PassengerId', 'prediction')
predictions = predictions.\
                withColumn('Survived', predictions['prediction'].\
                cast('integer')).drop('prediction')
predictions.show(6)

+-----------+--------+
|PassengerId|Survived|
+-----------+--------+
|        892|       0|
|        893|       0|
|        894|       0|
|        895|       0|
|        896|       1|
|        897|       0|
+-----------+--------+
only showing top 6 rows

