In [1]:
import findspark

In [2]:
findspark.init('/home/sumith/spark')

In [3]:
import pyspark

In [4]:
import re

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.types import FloatType


In [5]:
spark = SparkSession.Builder().master('local').appName('titanic').getOrCreate()

sc = spark.sparkContext

In [6]:
titanic_data = spark.read.csv('data/titanic.csv',
                              inferSchema=True, header=True)

In [7]:
titanic_data.show()
print(titanic_data.count())
titanic_data.printSchema()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [8]:

# to find the correlation between different columns (Non string) in dataframe with respect to Survived Column

print(titanic_data.dtypes)
for c in titanic_data.columns:
    if titanic_data.select(c).dtypes[0][1] != 'string':
        print(c + '=' + str(titanic_data.stat.corr(c, 'Survived')))


[('PassengerId', 'int'), ('Survived', 'int'), ('Pclass', 'int'), ('Name', 'string'), ('Sex', 'string'), ('Age', 'double'), ('SibSp', 'int'), ('Parch', 'int'), ('Ticket', 'string'), ('Fare', 'double'), ('Cabin', 'string'), ('Embarked', 'string')]
PassengerId=-0.005006660767066522
Survived=1.0
Pclass=-0.33848103596101514
Age=0.010539215871285685
SibSp=-0.03532249888573558
Parch=0.08162940708348336
Fare=0.2573065223849626


In [9]:

# To find the null value count in each column
for c in titanic_data.columns:
    print(c, titanic_data.filter(col(c).isNull()).count())



titanic_data.groupBy('Embarked').count().orderBy('count', ascending=False).first()

PassengerId 0
Survived 0
Pclass 0
Name 0
Sex 0
Age 177
SibSp 0
Parch 0
Ticket 0
Fare 0
Cabin 687
Embarked 2


Row(Embarked='S', count=644)

In [10]:
# To find the mode of embarked column

mode_embarked=titanic_data.groupBy('Embarked').count().orderBy('count', ascending=False).first()[0]


print('mode', mode_embarked)



for c in titanic_data.columns:
    print(c, titanic_data.filter(col(c).isNull()).count())


age_mean = titanic_data.select(mean('Age')).first()[0]
# age_mean.show()
print(int(age_mean))

mode S
PassengerId 0
Survived 0
Pclass 0
Name 0
Sex 0
Age 177
SibSp 0
Parch 0
Ticket 0
Fare 0
Cabin 687
Embarked 2
29


In [11]:

titanic_data = titanic_data.fillna({'Age':int(age_mean) , 'Embarked':mode_embarked})
#
for c in titanic_data.columns:
    print(c, titanic_data.filter(col(c).isNull()).count())
#
titanic_data = titanic_data.drop('Cabin')
titanic_data.show()
#
#
def title_extract(name):
    return re.findall(r'( [A-Za-z]+)\.',name)[0]


#
#
title_extract_udf = udf(title_extract)
titanic_data = titanic_data.withColumn('Title', title_extract_udf('Name'))
titanic_data.groupBy('Title').count().show()
titanic_data.show()




def string_index_fun(col_name):
    stringIndexer = StringIndexer(inputCol=col_name, outputCol=col_name+'_Index')
    model = stringIndexer.fit(titanic_data)
    indexed = model.transform(titanic_data)

    encoder = OneHotEncoder(inputCol=col_name+'_Index', outputCol=col_name+'_Vector')
    encode_model = encoder.fit(indexed)
    encoded = encode_model.transform(indexed)

    return encoded


indexables = ['Embarked','Sex','Title']

for c in indexables:
    titanic_data=string_index_fun(c)
#
titanic_data.show()
titanic_data = titanic_data.drop('PassengerId','Title','Embarked','Name','Ticket','Sex')
#
#

titanic_data.show()
titanic_data=titanic_data.drop(*[c for c in titanic_data.columns if c.endswith('Index')])
titanic_data.show()
#
input_cols = titanic_data.columns
#

input_cols.remove('Survived')
print(input_cols)

assembler = VectorAssembler(inputCols=input_cols,outputCol='features')
titanic_data = assembler.transform(titanic_data)
# #
titanic_data.show(truncate=False)
#
#
train,test = titanic_data.randomSplit([0.8,0.2],seed=1)
print(train.count(),test.count())
# #

print('-'*40 + 'Logistic Regression' +'_'*40)

lr = LogisticRegression(labelCol='Survived', featuresCol='features')
lrmodel = lr.fit(train)

predict_test=lrmodel.transform(test)
predict_test.select("Survived","prediction").show()

#
tp = predict_test.filter((col('Survived')==0) & (col('prediction')==0)).count()
tn = predict_test.filter((col('Survived')==1) & (col('prediction')==1)).count()
fp = predict_test.filter((col('Survived')==1) & (col('prediction')==0)).count()
fn = predict_test.filter((col('Survived')==0) & (col('prediction')==1)).count()



# prediction , Label , float()
preds_and_labels = predict_test.select(['prediction','Survived']).withColumn('label', col('Survived').cast(FloatType())).orderBy('prediction')
metrics = MulticlassMetrics(preds_and_labels.select('prediction','label').rdd.map(tuple))

print(metrics.confusionMatrix().toArray())
print(tp,fp,fn,tn)

print('-'*55)

print('acc = '+ str((tp+tn)/(tp+tn+fp+fn)))

PassengerId 0
Survived 0
Pclass 0
Name 0
Sex 0
Age 0
SibSp 0
Parch 0
Ticket 0
Fare 0
Cabin 687
Embarked 0
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|       S|
|          6|       0|

+--------+------+----+-----+-----+-------+--------------+---------------+---------+-------------+-----------+--------------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|Embarked_Index|Embarked_Vector|Sex_Index|   Sex_Vector|Title_Index|  Title_Vector|
+--------+------+----+-----+-----+-------+--------------+---------------+---------+-------------+-----------+--------------+
|       0|     3|22.0|    1|    0|   7.25|           0.0|  (2,[0],[1.0])|      0.0|(1,[0],[1.0])|        0.0|(16,[0],[1.0])|
|       1|     1|38.0|    1|    0|71.2833|           1.0|  (2,[1],[1.0])|      1.0|    (1,[],[])|        2.0|(16,[2],[1.0])|
|       1|     3|26.0|    0|    0|  7.925|           0.0|  (2,[0],[1.0])|      1.0|    (1,[],[])|        1.0|(16,[1],[1.0])|
|       1|     1|35.0|    1|    0|   53.1|           0.0|  (2,[0],[1.0])|      1.0|    (1,[],[])|        2.0|(16,[2],[1.0])|
|       0|     3|35.0|    0|    0|   8.05|           0.0|  (2,[0],[1.0])|      0.0|(1,[0],[1.0])|        0.0|(16,[0],[1.0])|


731 160
----------------------------------------Logistic Regression________________________________________
+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows

[[89. 15.]
 [14. 42.]]
89 14 15 42
-------------------------------------------------------
acc = 0.81875


In [12]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="Survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predict_test)
print("Test Error = %g " % (1.0 - accuracy))
print(accuracy)

print(lrmodel.coefficients , lrmodel.intercept)


print('-'*40 + 'Decision Tree' + '_'*40)

from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol='Survived', featuresCol='features')
dtmodel = dt.fit(train)
# predict_train=model.transform(train)
predict_testdt=dtmodel.transform(test)
predict_testdt .select("Survived","prediction").show()


tp = predict_testdt.filter((col('Survived')==0) & (col('prediction')==0)).count()
tn = predict_testdt.filter((col('Survived')==1) & (col('prediction')==1)).count()
fp = predict_testdt.filter((col('Survived')==1) & (col('prediction')==0)).count()
fn = predict_testdt.filter((col('Survived')==0) & (col('prediction')==1)).count()
print(tp,fp,fn,tn)

print('acc = '+ str((tp+tn)/(tp+tn+fp+fn)))

accuracy = evaluator.evaluate(predict_testdt)
print("Test Error = %g " % (1.0 - accuracy))


print('-'*40 + 'Random Forest' + '_'*40)


Test Error = 0.18125 
0.81875
[-1.142901161984344,-0.029523807706080152,-0.5303371698047312,-0.3962965534844821,0.0015141692194655138,-0.7113519015308525,-0.22448627091765708,-29.352086684280955,-13.374964352447638,-39.70100103191786,-38.81545671047,-9.864754400540312,-12.493980709377064,-47.99363901626195,-12.510364404113853,-12.543917071426865,197.06562465690737,-35.98322226901333,230.45751339367908,0.0,-49.16671077012835,240.39826442604013,210.04799658027727,174.3739639562326] 45.13896777306313
----------------------------------------Decision Tree________________________________________
+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|      

In [13]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='Survived', featuresCol='features')
rfmodel = rf.fit(train)
# predict_train=model.transform(train)
predict_testrf=rfmodel.transform(test)
predict_testrf.select("Survived","prediction").show()

tp = predict_testrf.filter((col('Survived')==0) & (col('prediction')==0)).count()
tn = predict_testrf.filter((col('Survived')==1) & (col('prediction')==1)).count()
fp = predict_testrf.filter((col('Survived')==1) & (col('prediction')==0)).count()
fn = predict_testrf.filter((col('Survived')==0) & (col('prediction')==1)).count()
print(tp,fp,fn,tn)

print('acc = '+ str((tp+tn)/(tp+tn+fp+fn)))


accuracy = evaluator.evaluate(predict_testdt)
print("Test Error = %g " % (1.0 - accuracy))


sc.stop()


+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows

89 16 15 40
acc = 0.80625
Test Error = 0.20625 
