In [1]:
import findspark
findspark.init('/home/user/spark-3.0.0-bin-hadoop2.7')
import pyspark

In [2]:
import re

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression

In [3]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.types import FloatType

spark = SparkSession.Builder().master('local').appName('titanic').getOrCreate()

sc = spark.sparkContext

In [4]:
titanic_data = spark.read.csv('/home/user/PROJECTS luminar/spark mlib/titanic.csv',
                              inferSchema=True, header=True)

In [5]:
titanic_data.show()
print(titanic_data.count())
titanic_data.printSchema()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [6]:
# titanic_data_pandas=titanic_data.toPandas()
# to find the correlation between different columns (Non string) in dataframe with respect to Survived Column

# print(titanic_data.select('Age').dtypes[0][1])
print(titanic_data.dtypes)
for c in titanic_data.columns:
    if titanic_data.select(c).dtypes[0][1] != 'string':
        print(c + '=' + str(titanic_data.stat.corr(c, 'Survived')))

[('PassengerId', 'int'), ('Survived', 'int'), ('Pclass', 'int'), ('Name', 'string'), ('Sex', 'string'), ('Age', 'double'), ('SibSp', 'int'), ('Parch', 'int'), ('Ticket', 'string'), ('Fare', 'double'), ('Cabin', 'string'), ('Embarked', 'string')]
PassengerId=-0.005006660767066522
Survived=1.0
Pclass=-0.33848103596101514
Age=0.010539215871285685
SibSp=-0.03532249888573558
Parch=0.08162940708348336
Fare=0.2573065223849626


In [7]:
# To find the null value count in each column
for c in titanic_data.columns:
    print(c, titanic_data.filter(col(c).isNull()).count())

PassengerId 0
Survived 0
Pclass 0
Name 0
Sex 0
Age 177
SibSp 0
Parch 0
Ticket 0
Fare 0
Cabin 687
Embarked 2


In [8]:
titanic_data.groupBy('Embarked').count().orderBy('count', ascending=False).first()

Row(Embarked='S', count=644)

In [9]:
# To find the mode of embarked column

mode_embarked=titanic_data.groupBy('Embarked').count().orderBy('count', ascending=False).first()[0]

# print('mode ' , titanic_data.select('Embarked').collect())

print('mode', mode_embarked)

mode S


In [10]:
# To fill embarked column with mode of Embarked Column
# titanic_data = titanic_data.fillna(mode_embarked, subset=['Embarked'])


for c in titanic_data.columns:
    print(c, titanic_data.filter(col(c).isNull()).count())


age_mean = titanic_data.select(mean('Age')).first()[0]
# age_mean.show()
print(int(age_mean))

PassengerId 0
Survived 0
Pclass 0
Name 0
Sex 0
Age 177
SibSp 0
Parch 0
Ticket 0
Fare 0
Cabin 687
Embarked 2
29


In [11]:

# titanic_data = titanic_data.fillna(int(age_mean), subset=['Age'])


#
#
titanic_data = titanic_data.fillna({'Age':int(age_mean) , 'Embarked':mode_embarked})
#
for c in titanic_data.columns:
    print(c, titanic_data.filter(col(c).isNull()).count())
#
titanic_data = titanic_data.drop('Cabin')
titanic_data.show()
#
#



PassengerId 0
Survived 0
Pclass 0
Name 0
Sex 0
Age 0
SibSp 0
Parch 0
Ticket 0
Fare 0
Cabin 687
Embarked 0
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|       S|
|          6|       0|

In [12]:
def title_extract(name):
    return re.findall(r'( [A-Za-z]+)\.',name)[0]

In [13]:
#
#
title_extract_udf = udf(title_extract)
titanic_data = titanic_data.withColumn('Title', title_extract_udf('Name'))
titanic_data.groupBy('Title').count().show()
titanic_data.show()
# # # rare = titanic_data.groupBy('Title').count().filter(col('count')<8).select('Title').collect()
# # # rare = [x[0] for x in rare]
# # # print(rare)
#


+---------+-----+
|    Title|count|
+---------+-----+
|   Master|   40|
|      Rev|    6|
|     Capt|    1|
|      Mrs|  125|
|     Miss|  182|
|     Lady|    1|
| Jonkheer|    1|
|       Mr|  517|
|      Sir|    1|
|      Col|    2|
| Countess|    1|
|     Mlle|    2|
|       Dr|    7|
|    Major|    2|
|      Don|    1|
|       Ms|    1|
|      Mme|    1|
+---------+-----+

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|  Title|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|     Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|    Mrs|
|          3|       1|     3|Heikkin

In [14]:
# stringI = StringIndexer(inputCol='Sex',outputCol='Sex_Index')
# model = stringI.fit(titanic_data)
# indexed_df = model.transform(titanic_data)


# indexed_df.show()
def string_index_fun(col_name):
    stringIndexer = StringIndexer(inputCol=col_name, outputCol=col_name+'_Index')
    model = stringIndexer.fit(titanic_data)
    indexed = model.transform(titanic_data)

    encoder = OneHotEncoder(inputCol=col_name+'_Index', outputCol=col_name+'_Vector')
    encode_model = encoder.fit(indexed)
    encoded = encode_model.transform(indexed)

    return encoded


In [15]:
indexables = ['Embarked','Sex','Title']
for i in indexables:
    titanic_data=string_index_fun(i)



In [16]:
titanic_data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-------+--------------+---------------+---------+-------------+-----------+--------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|  Title|Embarked_Index|Embarked_Vector|Sex_Index|   Sex_Vector|Title_Index|  Title_Vector|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-------+--------------+---------------+---------+-------------+-----------+--------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|     Mr|           0.0|  (2,[0],[1.0])|      0.0|(1,[0],[1.0])|        0.0|(16,[0],[1.0])|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|    Mrs|           1.0|  (2,[1],[1.0])|      1.0|    (1,[],[])|        2.0|(16,[2],[1.0])|
|    

In [17]:

titanic_data = titanic_data.drop('PassengerId','Title','Embarked','Name','Ticket','Sex')

In [18]:
titanic_data.show()


+--------+------+----+-----+-----+-------+--------------+---------------+---------+-------------+-----------+--------------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|Embarked_Index|Embarked_Vector|Sex_Index|   Sex_Vector|Title_Index|  Title_Vector|
+--------+------+----+-----+-----+-------+--------------+---------------+---------+-------------+-----------+--------------+
|       0|     3|22.0|    1|    0|   7.25|           0.0|  (2,[0],[1.0])|      0.0|(1,[0],[1.0])|        0.0|(16,[0],[1.0])|
|       1|     1|38.0|    1|    0|71.2833|           1.0|  (2,[1],[1.0])|      1.0|    (1,[],[])|        2.0|(16,[2],[1.0])|
|       1|     3|26.0|    0|    0|  7.925|           0.0|  (2,[0],[1.0])|      1.0|    (1,[],[])|        1.0|(16,[1],[1.0])|
|       1|     1|35.0|    1|    0|   53.1|           0.0|  (2,[0],[1.0])|      1.0|    (1,[],[])|        2.0|(16,[2],[1.0])|
|       0|     3|35.0|    0|    0|   8.05|           0.0|  (2,[0],[1.0])|      0.0|(1,[0],[1.0])|        0.0|(16,[0],[1.0])|


In [19]:
titanic_data=titanic_data.drop(*[c for c in titanic_data.columns if c.endswith('Index')])
titanic_data.show()

+--------+------+----+-----+-----+-------+---------------+-------------+--------------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|Embarked_Vector|   Sex_Vector|  Title_Vector|
+--------+------+----+-----+-----+-------+---------------+-------------+--------------+
|       0|     3|22.0|    1|    0|   7.25|  (2,[0],[1.0])|(1,[0],[1.0])|(16,[0],[1.0])|
|       1|     1|38.0|    1|    0|71.2833|  (2,[1],[1.0])|    (1,[],[])|(16,[2],[1.0])|
|       1|     3|26.0|    0|    0|  7.925|  (2,[0],[1.0])|    (1,[],[])|(16,[1],[1.0])|
|       1|     1|35.0|    1|    0|   53.1|  (2,[0],[1.0])|    (1,[],[])|(16,[2],[1.0])|
|       0|     3|35.0|    0|    0|   8.05|  (2,[0],[1.0])|(1,[0],[1.0])|(16,[0],[1.0])|
|       0|     3|29.0|    0|    0| 8.4583|      (2,[],[])|(1,[0],[1.0])|(16,[0],[1.0])|
|       0|     1|54.0|    0|    0|51.8625|  (2,[0],[1.0])|(1,[0],[1.0])|(16,[0],[1.0])|
|       0|     3| 2.0|    3|    1| 21.075|  (2,[0],[1.0])|(1,[0],[1.0])|(16,[3],[1.0])|
|       1|     3|27.0|    0|    

In [20]:
input_cols = titanic_data.columns

In [21]:
input_cols.remove('Survived')
print(input_cols)



['Pclass', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked_Vector', 'Sex_Vector', 'Title_Vector']


In [22]:
#vectorization
assembler = VectorAssembler(inputCols=input_cols,outputCol='features')
titanic_data = assembler.transform(titanic_data)

In [23]:
titanic_data.show(truncate=False)

+--------+------+----+-----+-----+-------+---------------+-------------+--------------+------------------------------------------------------------+
|Survived|Pclass|Age |SibSp|Parch|Fare   |Embarked_Vector|Sex_Vector   |Title_Vector  |features                                                    |
+--------+------+----+-----+-----+-------+---------------+-------------+--------------+------------------------------------------------------------+
|0       |3     |22.0|1    |0    |7.25   |(2,[0],[1.0])  |(1,[0],[1.0])|(16,[0],[1.0])|(24,[0,1,2,4,5,7,8],[3.0,22.0,1.0,7.25,1.0,1.0,1.0])        |
|1       |1     |38.0|1    |0    |71.2833|(2,[1],[1.0])  |(1,[],[])    |(16,[2],[1.0])|(24,[0,1,2,4,6,10],[1.0,38.0,1.0,71.2833,1.0,1.0])          |
|1       |3     |26.0|0    |0    |7.925  |(2,[0],[1.0])  |(1,[],[])    |(16,[1],[1.0])|(24,[0,1,4,5,9],[3.0,26.0,7.925,1.0,1.0])                   |
|1       |1     |35.0|1    |0    |53.1   |(2,[0],[1.0])  |(1,[],[])    |(16,[2],[1.0])|(24,[0,1,2,4,5,10],

In [24]:
#spliting
train,test = titanic_data.randomSplit([0.8,0.2],seed=1)
print(train.count(),test.count())

731 160


# modeling

In [25]:
print('-'*40 + 'Logistic Regression' +'_'*40)

lr = LogisticRegression(labelCol='Survived', featuresCol='features')
lrmodel = lr.fit(train)

----------------------------------------Logistic Regression________________________________________


In [26]:
# predict_train=model.transform(train)
predict_test=lrmodel.transform(test)
predict_test.select("Survived","prediction").show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [27]:
#predict_test.show()

+--------+------+----+-----+-----+-------+---------------+-------------+---------------+--------------------+--------------------+--------------------+----------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|Embarked_Vector|   Sex_Vector|   Title_Vector|            features|       rawPrediction|         probability|prediction|
+--------+------+----+-----+-----+-------+---------------+-------------+---------------+--------------------+--------------------+--------------------+----------+
|       0|     1|21.0|    0|    1|77.2875|  (2,[0],[1.0])|(1,[0],[1.0])| (16,[0],[1.0])|(24,[0,1,3,4,5,7,...|[0.34160648894338...|[0.58458070545014...|       0.0|
|       0|     1|29.0|    0|    0|    0.0|  (2,[0],[1.0])|(1,[0],[1.0])| (16,[0],[1.0])|(24,[0,1,5,7,8],[...|[0.29852675065698...|[0.57408232932709...|       0.0|
|       0|     1|29.0|    0|    0| 25.925|  (2,[0],[1.0])|(1,[0],[1.0])| (16,[0],[1.0])|(24,[0,1,4,5,7,8]...|[0.25927191364233...|[0.56445730363394...|       0.0|
|       0|     1|29.0|

In [28]:
tp = predict_test.filter((col('Survived')==0) & (col('prediction')==0)).count()
tn = predict_test.filter((col('Survived')==1) & (col('prediction')==1)).count()
fp = predict_test.filter((col('Survived')==1) & (col('prediction')==0)).count()
fn = predict_test.filter((col('Survived')==0) & (col('prediction')==1)).count()

In [29]:
# prediction , Label , float()
preds_and_labels = predict_test.select(['prediction','Survived']).withColumn('label', col('Survived').cast(FloatType())).orderBy('prediction')
metrics = MulticlassMetrics(preds_and_labels.select('prediction','label').rdd.map(tuple))

print(metrics.confusionMatrix().toArray())
print(tp,fp,fn,tn)

print('-'*55)

print('acc = '+ str((tp+tn)/(tp+tn+fp+fn)))


[[89. 15.]
 [14. 42.]]
89 14 15 42
-------------------------------------------------------
acc = 0.81875


In [30]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="Survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predict_test)
print("Test Error = %g " % (1.0 - accuracy))
print(accuracy)
# print(tp/(tp+fp))
# #
#
print(lrmodel.coefficients , lrmodel.intercept)

Test Error = 0.18125 
0.81875
[-1.142901161984344,-0.029523807706080152,-0.5303371698047312,-0.3962965534844821,0.0015141692194655138,-0.7113519015308525,-0.22448627091765708,-29.352086684280955,-13.374964352447638,-39.70100103191786,-38.81545671047,-9.864754400540312,-12.493980709377064,-47.99363901626195,-12.510364404113853,-12.543917071426865,197.06562465690737,-35.98322226901333,230.45751339367908,0.0,-49.16671077012835,240.39826442604013,210.04799658027727,174.3739639562326] 45.13896777306313


In [31]:
print('-'*40 + 'Decision Tree' + '_'*40)

----------------------------------------Decision Tree________________________________________


In [32]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol='Survived', featuresCol='features')
dtmodel = dt.fit(train)
# predict_train=model.transform(train)
predict_testdt=dtmodel.transform(test)
predict_testdt .select("Survived","prediction").show()



+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [33]:
tp = predict_testdt.filter((col('Survived')==0) & (col('prediction')==0)).count()
tn = predict_testdt.filter((col('Survived')==1) & (col('prediction')==1)).count()
fp = predict_testdt.filter((col('Survived')==1) & (col('prediction')==0)).count()
fn = predict_testdt.filter((col('Survived')==0) & (col('prediction')==1)).count()
print(tp,fp,fn,tn)

print('acc = '+ str((tp+tn)/(tp+tn+fp+fn)))

89 18 15 38
acc = 0.79375


In [34]:
accuracy = evaluator.evaluate(predict_testdt)
print("Test Error = %g " % (1.0 - accuracy))


print('-'*40 + 'Random Forest' + '_'*40)

Test Error = 0.20625 
----------------------------------------Random Forest________________________________________


In [35]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='Survived', featuresCol='features')
rfmodel = rf.fit(train)
# predict_train=model.transform(train)
predict_testrf=rfmodel.transform(test)
predict_testrf.select("Survived","prediction").show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [36]:
tp = predict_testrf.filter((col('Survived')==0) & (col('prediction')==0)).count()
tn = predict_testrf.filter((col('Survived')==1) & (col('prediction')==1)).count()
fp = predict_testrf.filter((col('Survived')==1) & (col('prediction')==0)).count()
fn = predict_testrf.filter((col('Survived')==0) & (col('prediction')==1)).count()
print(tp,fp,fn,tn)

print('acc = '+ str((tp+tn)/(tp+tn+fp+fn)))


accuracy = evaluator.evaluate(predict_testdt)
print("Test Error = %g " % (1.0 - accuracy))

88 16 16 40
acc = 0.8
Test Error = 0.20625 


In [37]:
sc.stop()