In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, regexp_extract, when, lit, avg
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer


In [2]:
spark = SparkSession\
    .builder\
    .appName("Spark ML on titanic data ")\
    .getOrCreate()


In [3]:
path = "./titanic_full.csv"

In [4]:
titanic_df = spark.read.csv(path,header = 'True',inferSchema='True')

# Basic information about dataset

In [5]:
#See data
display(titanic_df)
titanic_df.printSchema()
titanic_df.describe().show()
#count passenger 
passengers_count = titanic_df.count()
print("total number of passengers in train set is: "+ str(passengers_count))



DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+------------------+------------------+------------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|             Parch|            Ticket|              Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+--------------

# Simple EDA

In [6]:
#Count survived and dead
titanic_df.groupBy("Survived").count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  494|
|       0|  815|
+--------+-----+



Out of 891 passengers in dataset, only about 342 survived.

In [7]:
#Surival rate by gender
titanic_df.groupBy("Sex","Survived").count().show()

+------+--------+-----+
|   Sex|Survived|count|
+------+--------+-----+
|  male|       0|  734|
|female|       1|  385|
|female|       0|   81|
|  male|       1|  109|
+------+--------+-----+



Although the number of males are more than females on ship, the female survivors are twice the number of males saved.

In [8]:
#Surival rate by passenger class
titanic_df.groupBy("Pclass","Survived").count().show()

+------+--------+-----+
|Pclass|Survived|count|
+------+--------+-----+
|     1|       0|  137|
|     3|       1|  191|
|     1|       1|  186|
|     2|       1|  117|
|     2|       0|  160|
|     3|       0|  518|
+------+--------+-----+



We can clearly see that Passenegers Of Pclass 1 were given a very high priority while rescue. Even though the the number of Passengers in Pclass 3 were a lot higher, still the number of survival from them is very low.

# Missing Values treatment

In [9]:
# This function use to print feature with null values and null count 
def null_value_count(df):
  null_columns_counts = []
  numRows = df.count()
  for k in df.columns:
    nullRows = df.where(col(k).isNull()).count()
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

In [10]:
# Calling function
null_columns_count_list = null_value_count(titanic_df)


In [11]:
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|                   Age|              263|
|                  Fare|                1|
|                 Cabin|             1014|
|              Embarked|                2|
+----------------------+-----------------+



## Age

In [12]:
mean_age = titanic_df.select(mean('Age')).collect()[0][0]
print(mean_age)

29.881137667304014


In [13]:
titanic_df.select("Name").show()

+--------------------+
|                Name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
|    Moran, Mr. James|
|McCarthy, Mr. Tim...|
|Palsson, Master. ...|
|Johnson, Mrs. Osc...|
|Nasser, Mrs. Nich...|
|Sandstrom, Miss. ...|
|Bonnell, Miss. El...|
|Saundercock, Mr. ...|
|Andersson, Mr. An...|
|Vestrom, Miss. Hu...|
|Hewlett, Mrs. (Ma...|
|Rice, Master. Eugene|
|Williams, Mr. Cha...|
|Vander Planke, Mr...|
|Masselmani, Mrs. ...|
+--------------------+
only showing top 20 rows



To replace these NaN values, we can assign them the mean age of the dataset.But the problem is, there were many people with many different ages. We just cant assign a 4 year kid with the mean age that is 29 years.
we can check the Name feature. Looking upon the feature, we can see that the names have a salutation like Mr or Mrs. Thus we can assign the mean values of Mr and Mrs to the respective groups

In [14]:
titanic_df = titanic_df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))
#Check the new "initial" column with unique values
titanic_df.select("Initial").distinct().show()

+--------+
| Initial|
+--------+
|    Dona|
|     Don|
|    Miss|
|Countess|
|     Col|
|     Rev|
|    Lady|
|  Master|
|     Mme|
|    Capt|
|      Mr|
|      Dr|
|     Mrs|
|     Sir|
|Jonkheer|
|    Mlle|
|   Major|
|      Ms|
+--------+



Using the Regex ""[A-Za-z]+)." we extract the initials from the Name. It looks for strings which lie between A-Z or a-z and followed by a .(dot).
There are some misspelled Initials like Mlle or Mme that stand for Miss. I will replace them with Miss and same thing for other values.

In [15]:
#normalize initial
titanic_df = titanic_df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
               ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])


In [16]:
titanic_df.select("Initial").distinct().show()

+-------+
|Initial|
+-------+
|   Dona|
|   Miss|
|  Other|
| Master|
|     Mr|
|    Mrs|
+-------+



In [17]:
titanic_df.groupby('Initial').avg('Age').collect()

[Row(Initial='Dona', avg(Age)=39.0),
 Row(Initial='Miss', avg(Age)=21.834532710280374),
 Row(Initial='Other', avg(Age)=44.92307692307692),
 Row(Initial='Master', avg(Age)=5.482641509433963),
 Row(Initial='Mr', avg(Age)=32.545531197301855),
 Row(Initial='Mrs', avg(Age)=37.03488372093023)]

 impute missing values in age feature based on average age of Initials

In [18]:
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Miss") & (titanic_df["Age"].isNull()), 22).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Other") & (titanic_df["Age"].isNull()), 46).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Master") & (titanic_df["Age"].isNull()), 5).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Mr") & (titanic_df["Age"].isNull()), 33).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Mrs") & (titanic_df["Age"].isNull()), 36).otherwise(titanic_df["Age"]))

Check the imputation

In [19]:
titanic_df.filter(titanic_df.Age==46).select("Initial").show()
titanic_df.select("Age").show()

+-------+
|Initial|
+-------+
|     Mr|
|     Mr|
|     Mr|
|     Mr|
|     Mr|
|     Mr|
+-------+

+----+
| Age|
+----+
|22.0|
|38.0|
|26.0|
|35.0|
|35.0|
|33.0|
|54.0|
| 2.0|
|27.0|
|14.0|
| 4.0|
|58.0|
|20.0|
|39.0|
|14.0|
|55.0|
| 2.0|
|33.0|
|31.0|
|36.0|
+----+
only showing top 20 rows



In [20]:
# Check remaining NAs
null_columns_count_list = null_value_count(titanic_df)
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|                  Fare|                1|
|                 Cabin|             1014|
|              Embarked|                2|
+----------------------+-----------------+



All NAs in ages is treat. 
Let's treat 2 NAs in Embarked


## Embark

In [21]:
titanic_df.groupBy("Embarked").count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|  123|
|    null|    2|
|       C|  270|
|       S|  914|
+--------+-----+



Since the mode is 'S', we can replace port of embarkment for missing values as S

In [22]:
titanic_df = titanic_df.na.fill({"Embarked" : 'S'})

In [23]:
# Check remaining NAs
null_columns_count_list = null_value_count(titanic_df)
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|                  Fare|                1|
|                 Cabin|             1014|
+----------------------+-----------------+



## Fare

There is one missing value in fare, we can replace this with mean of fare

In [24]:
titanic_df.agg(avg(col("Fare"))).show()

+----------------+
|       avg(Fare)|
+----------------+
|33.2954792813456|
+----------------+



In [25]:
titanic_df = titanic_df.na.fill({"Fare" : 33.29})

## Cabin

Cabin has more than 10% of NA, we will drop this variable

In [26]:
titanic_df = titanic_df.drop("Cabin")

In [27]:
titanic_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = false)
 |-- Embarked: string (nullable = false)
 |-- Initial: string (nullable = true)



## Add Family_Size And Alone columns to understand the effect of family on survival rate

We can create a new feature called "Family_size" and "Alone" and analyse it. This feature is the summation of Parch(parents/children) and SibSp(siblings/spouses). It gives us a combined data so that we can check if survival rate have anything to do with family size of the passengers

In [28]:
titanic_df = titanic_df.withColumn("Family_Size",col('SibSp')+col('Parch'))

In [29]:

titanic_df.groupBy("Family_Size").count().show()

+-----------+-----+
|Family_Size|count|
+-----------+-----+
|          1|  235|
|          6|   16|
|          3|   43|
|          5|   25|
|          4|   22|
|          7|    8|
|         10|   11|
|          2|  159|
|          0|  790|
+-----------+-----+



In [30]:
#Create new column named "alone"
titanic_df = titanic_df.withColumn('Alone',lit(0))
#if that person is alone, marked as 1
titanic_df = titanic_df.withColumn("Alone",when(titanic_df["Family_Size"] == 0, 1).otherwise(titanic_df["Alone"]))

In [31]:
titanic_df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Embarked',
 'Initial',
 'Family_Size',
 'Alone']

Convert Sex, Embarked & Initial columns from string to number using StringIndexer for machine learning purposes

In [32]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic_df) for column in ["Sex","Embarked","Initial"]]
pipeline = Pipeline(stages=indexers)
titanic_df = pipeline.fit(titanic_df).transform(titanic_df)

In [33]:
titanic_df.limit(5).toPandas().head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Embarked,Initial,Family_Size,Alone,Sex_index,Embarked_index,Initial_index
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,S,Mr,1,0,0.0,0.0,0.0
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C,Mrs,1,0,1.0,1.0,2.0
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,S,Miss,0,1,1.0,0.0,1.0
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,S,Mrs,1,0,1.0,0.0,2.0
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,S,Mr,0,1,0.0,0.0,0.0


Drop columns that are unnecessary for machine learning

In [34]:
titanic_df = titanic_df.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex","Initial")

In [35]:
titanic_df.limit(5).toPandas().head()

Unnamed: 0,Survived,Pclass,Age,SibSp,Parch,Fare,Family_Size,Alone,Sex_index,Embarked_index,Initial_index
0,0,3,22.0,1,0,7.25,1,0,0.0,0.0,0.0
1,1,1,38.0,1,0,71.2833,1,0,1.0,1.0,2.0
2,1,3,26.0,0,0,7.925,0,1,1.0,0.0,1.0
3,1,1,35.0,1,0,53.1,1,0,1.0,0.0,2.0
4,0,3,35.0,0,0,8.05,0,1,0.0,0.0,0.0


put all features into vector



In [36]:
feature = VectorAssembler(inputCols=titanic_df.columns[1:],outputCol="features")
feature_vector= feature.transform(titanic_df)

In [37]:
feature_vector.limit(5).toPandas().head()

Unnamed: 0,Survived,Pclass,Age,SibSp,Parch,Fare,Family_Size,Alone,Sex_index,Embarked_index,Initial_index,features
0,0,3,22.0,1,0,7.25,1,0,0.0,0.0,0.0,"(3.0, 22.0, 1.0, 0.0, 7.25, 1.0, 0.0, 0.0, 0.0..."
1,1,1,38.0,1,0,71.2833,1,0,1.0,1.0,2.0,"[1.0, 38.0, 1.0, 0.0, 71.2833, 1.0, 0.0, 1.0, ..."
2,1,3,26.0,0,0,7.925,0,1,1.0,0.0,1.0,"[3.0, 26.0, 0.0, 0.0, 7.925, 0.0, 1.0, 1.0, 0...."
3,1,1,35.0,1,0,53.1,1,0,1.0,0.0,2.0,"[1.0, 35.0, 1.0, 0.0, 53.1, 1.0, 0.0, 1.0, 0.0..."
4,0,3,35.0,0,0,8.05,0,1,0.0,0.0,0.0,"(3.0, 35.0, 0.0, 0.0, 8.05, 0.0, 1.0, 0.0, 0.0..."


# Modeling


In [38]:
(trainingData, testData) = feature_vector.randomSplit([0.8, 0.2],seed = 11)

## Logistic Regression

In [39]:

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
#Training algo
lrModel = lr.fit(trainingData)
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "Survived", "features").show(5)




+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
+----------+--------+--------------------+
only showing top 5 rows



In [40]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

Coefficients: [-0.936211621642326,-0.03742945403867884,-0.31160435916009854,-0.18607357710826647,0.002830435680720263,-0.19000330359121198,-0.5376754482877728,3.28952480250676,0.20019681955416727,0.45366867352908014]
Intercept: 1.5298824346365505


### Evaluating accuracy of LogisticRegression.

In [41]:
print("Confusion matrix")
from sklearn.metrics import confusion_matrix
y_true = lr_prediction.select("Survived")
y_true = y_true.toPandas()

y_pred = lr_prediction.select("prediction")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred)
print(cnf_matrix)

#Accuracy, precision
evaluator1 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", \
                                               metricName = "accuracy")
evaluator2 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction")


lr_accuracy = evaluator1.evaluate(lr_prediction)
lr_fscore = evaluator2.evaluate(lr_prediction)

print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))
print("F-score of LogisticRegression is = %g"% (lr_fscore))


Confusion matrix
[[147  12]
 [ 22  64]]
Accuracy of LogisticRegression is = 0.861224
Test Error of LogisticRegression = 0.138776 
F-score of LogisticRegression is = 0.859057


## DecisionTreeClassifier

In [42]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
dt_model = dt.fit(trainingData)
dt_prediction = dt_model.transform(testData)
dt_prediction.select("prediction", "Survived", "features").show(5)



+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
+----------+--------+--------------------+
only showing top 5 rows



### Evaluating accuracy of DecisionTreeClassifier.

In [43]:
print("Confusion matrix")
from sklearn.metrics import confusion_matrix
y_true = dt_prediction.select("Survived")
y_true = y_true.toPandas()

y_pred = dt_prediction.select("prediction")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred)
print(cnf_matrix)

#Accuracy, precision
evaluator1 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", \
                                               metricName = "accuracy")
evaluator2 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction")


dt_accuracy = evaluator1.evaluate(lr_prediction)
dt_fscore = evaluator2.evaluate(lr_prediction)

print("Accuracy of DecisionTree is = %g"% (dt_accuracy))
print("Test Error of DecisionTree = %g " % (1.0 - dt_accuracy))
print("F-score of DecisionTree is = %g"% (dt_fscore))


Confusion matrix
[[144  15]
 [ 23  63]]
Accuracy of DecisionTree is = 0.861224
Test Error of DecisionTree = 0.138776 
F-score of DecisionTree is = 0.859057


## RandomForestClassifier

In [44]:

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features")
rf_model = rf.fit(trainingData)
rf_prediction = rf_model.transform(testData)
rf_prediction.select("prediction", "Survived", "features").show()

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,2,4,5],[...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,6],[1.0,...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       1.0|       0|[1.0,50.0,0.0,0.0...|
|       0.0|       0|[1.0,51.0,0.0,1.0...|
|       0.0|       0|[1.0,53.0,0.0,0.0...|
+----------

### Evaluating accuracy of RandomForestClassifier.

In [45]:
print("Confusion matrix")
from sklearn.metrics import confusion_matrix
y_true = rf_prediction.select("Survived")
y_true = y_true.toPandas()

y_pred = rf_prediction.select("prediction")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred)
print(cnf_matrix)

#Accuracy, precision
evaluator1 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", \
                                               metricName = "accuracy")
evaluator2 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction")


rf_accuracy = evaluator1.evaluate(rf_prediction)
rf_fscore = evaluator2.evaluate(rf_prediction)

print("Accuracy of RandomForest is = %g"% (rf_accuracy))
print("Test Error of RandomForest = %g " % (1.0 - rf_accuracy))
print("F-score of RandomForest is = %g"% (rf_fscore))

Confusion matrix
[[149  10]
 [ 22  64]]
Accuracy of RandomForest is = 0.869388
Test Error of RandomForest = 0.130612 
F-score of RandomForest is = 0.866865


## Gradient-boosted tree classifier

In [46]:

from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="Survived", featuresCol="features",maxIter=10)
gbt_model = gbt.fit(trainingData)
gbt_prediction = gbt_model.transform(testData)
gbt_prediction.select("prediction", "Survived", "features").show()



+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,2,4,5],[...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,6],[1.0,...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       1.0|       0|[1.0,50.0,0.0,0.0...|
|       0.0|       0|[1.0,51.0,0.0,1.0...|
|       0.0|       0|[1.0,53.0,0.0,0.0...|
+----------

### Evaluate accuracy of Gradient-boosted.

In [47]:
print("Confusion matrix")
from sklearn.metrics import confusion_matrix
y_true = gbt_prediction.select("Survived")
y_true = y_true.toPandas()

y_pred = gbt_prediction.select("prediction")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred)
print(cnf_matrix)

#Accuracy, precision
evaluator1 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", \
                                               metricName = "accuracy")
evaluator2 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction")


gbt_accuracy = evaluator1.evaluate(gbt_prediction)
gbt_fscore = evaluator2.evaluate(gbt_prediction)

print("Accuracy of  Gradient-boosted DecisionTree is = %g"% (gbt_accuracy))
print("Test Error of  Gradient-boosted DecisionTree = %g " % (1.0 - gbt_accuracy))
print("F-score of  Gradient-boosted DecisionTree is = %g"% (gbt_fscore))

Confusion matrix
[[145  14]
 [ 22  64]]
Accuracy of  Gradient-boosted DecisionTree is = 0.853061
Test Error of  Gradient-boosted DecisionTree = 0.146939 
F-score of  Gradient-boosted DecisionTree is = 0.85128


## NaiveBayes

In [48]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(labelCol="Survived", featuresCol="features")
nb_model = nb.fit(trainingData)
nb_prediction = nb_model.transform(testData)
nb_prediction.select("prediction", "Survived", "features").show(5)


+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|(10,[0,1,4,6],[1....|
|       1.0|       0|(10,[0,1,4,6,8],[...|
|       1.0|       0|(10,[0,1,4,6],[1....|
|       1.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
+----------+--------+--------------------+
only showing top 5 rows



### Evaluating accuracy of NaiveBayes.

In [49]:
print("Confusion matrix")
from sklearn.metrics import confusion_matrix
y_true = nb_prediction.select("Survived")
y_true = y_true.toPandas()

y_pred = nb_prediction.select("prediction")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred)
print(cnf_matrix)

#Accuracy, precision
evaluator1 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", \
                                               metricName = "accuracy")
evaluator2 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction")


nb_accuracy = evaluator1.evaluate(nb_prediction)
nb_fscore = evaluator2.evaluate(nb_prediction)

print("Accuracy of NaiveBayes is = %g"% (nb_accuracy))
print("Test Error of NaiveBayes = %g " % (1.0 - nb_accuracy))
print("F-score of NaiveBayes is = %g"% (nb_fscore))

Confusion matrix
[[132  27]
 [ 49  37]]
Accuracy of NaiveBayes is = 0.689796
Test Error of NaiveBayes = 0.310204 
F-score of NaiveBayes is = 0.677084


## Support Vector Machine

In [50]:
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(labelCol="Survived", featuresCol="features")
svm_model = svm.fit(trainingData)
svm_prediction = svm_model.transform(testData)
svm_prediction.select("prediction", "Survived", "features").show(5)


+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6],[1....|
|       0.0|       0|(10,[0,1,4,6,8],[...|
|       0.0|       0|(10,[0,1,4,6,8],[...|
+----------+--------+--------------------+
only showing top 5 rows



### Evaluating the accuracy of Support Vector Machine

In [51]:
print("Confusion matrix")
from sklearn.metrics import confusion_matrix
y_true = svm_prediction.select("Survived")
y_true = y_true.toPandas()

y_pred = svm_prediction.select("prediction")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred)
print(cnf_matrix)

#Accuracy, precision
evaluator1 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", \
                                               metricName = "accuracy")
evaluator2 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction")


svm_accuracy = evaluator1.evaluate(svm_prediction)
svm_fscore = evaluator2.evaluate(svm_prediction)

print("Accuracy of SupportVectorMachine is = %g"% (svm_accuracy))
print("Test Error of SupportVectorMachine = %g " % (1.0 - svm_accuracy))
print("F-score of SupportVectorMachine is = %g"% (svm_fscore))

Confusion matrix
[[146  13]
 [ 23  63]]
Accuracy of SupportVectorMachine is = 0.853061
Test Error of SupportVectorMachine = 0.146939 
F-score of SupportVectorMachine is = 0.850766
