In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, StringType, StructType, StructField
from pyspark.ml.feature import StringIndexer, VectorAssembler, QuantileDiscretizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark import SparkContext
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors, VectorUDT
import pandas as pd
import numpy as np
from sklearn import metrics 
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
spark = SparkSession.builder.appName("Titanic data ").getOrCreate()


In [3]:
data = spark.read.csv("/FileStore/tables/Titanic1.csv", header="true", inferSchema="true")
display(data)

last,first,gender,age,Pclass,fare,embarked,survived
Braund,Mr. Owen Harris,M,22.0,3,7.25,Southampton,no
Cumings,Mrs. John Bradley (Florence Briggs Thayer),F,38.0,1,71.2833,Cherbourg,yes
Heikkinen,Miss Laina,F,26.0,3,7.925,Southampton,yes
Futrelle,Mrs. Jacques Heath (Lily May Peel),F,35.0,1,53.1,Southampton,yes
Allen,Mr. William Henry,M,35.0,3,8.05,Southampton,no
Moran,Mr. James,M,,3,8.4583,Queenstown,no
McCarthy,Mr. Timothy J,M,54.0,1,51.8625,Southampton,no
Palsson,Master Gosta Leonard,M,2.0,3,21.075,Southampton,no
Johnson,Mrs. Oscar W (Elisabeth Vilhelmina Berg),F,27.0,3,11.1333,Southampton,yes
Nasser,Mrs. Nicholas (Adele Achem),F,14.0,2,30.0708,Cherbourg,yes


In [4]:
data.createOrReplaceTempView("data")

In [5]:
empty_columns =[]
for col_name in data.columns:
    empty_values = data.where(F.col(col_name).isNull()).count()
    if(empty_values > 0):
        empty_columns.append((col_name, empty_values))
print(empty_columns)

In [6]:
round(spark.sql("SELECT AVG(age) FROM data").collect()[0][0])


In [7]:
data = data.fillna(30, subset=['age'])


In [8]:
labels = [StringIndexer(inputCol=column, outputCol=column+"_new").fit(data) for column in ["gender","embarked","survived"]]

pipeline = Pipeline(stages=labels)
data = pipeline.fit(data).transform(data)

data.show()

In [9]:
data = data.drop('gender','last','first','fare', 'embarked', 'survived')

In [10]:
# Survived with respect to gender
display(data)

age,Pclass,gender_new,embarked_new,survived_new
22.0,3,0.0,0.0,0.0
38.0,1,1.0,1.0,1.0
26.0,3,1.0,0.0,1.0
35.0,1,1.0,0.0,1.0
35.0,3,0.0,0.0,0.0
30.0,3,0.0,2.0,0.0
54.0,1,0.0,0.0,0.0
2.0,3,0.0,0.0,0.0
27.0,3,1.0,0.0,1.0
14.0,2,1.0,1.0,1.0


In [11]:
#Survive with respect to Pclass 
display(data)

age,Pclass,gender_new,embarked_new,survived_new
22.0,3,0.0,0.0,0.0
38.0,1,1.0,1.0,1.0
26.0,3,1.0,0.0,1.0
35.0,1,1.0,0.0,1.0
35.0,3,0.0,0.0,0.0
30.0,3,0.0,2.0,0.0
54.0,1,0.0,0.0,0.0
2.0,3,0.0,0.0,0.0
27.0,3,1.0,0.0,1.0
14.0,2,1.0,1.0,1.0


In [12]:
# Survive with respect to age
display(data)

age,Pclass,gender_new,embarked_new,survived_new
22.0,3,0.0,0.0,0.0
38.0,1,1.0,1.0,1.0
26.0,3,1.0,0.0,1.0
35.0,1,1.0,0.0,1.0
35.0,3,0.0,0.0,0.0
30.0,3,0.0,2.0,0.0
54.0,1,0.0,0.0,0.0
2.0,3,0.0,0.0,0.0
27.0,3,1.0,0.0,1.0
14.0,2,1.0,1.0,1.0


In [13]:
# Survive with respect to embarked 
display(data)

age,Pclass,gender_new,embarked_new,survived_new
22.0,3,0.0,0.0,0.0
38.0,1,1.0,1.0,1.0
26.0,3,1.0,0.0,1.0
35.0,1,1.0,0.0,1.0
35.0,3,0.0,0.0,0.0
30.0,3,0.0,2.0,0.0
54.0,1,0.0,0.0,0.0
2.0,3,0.0,0.0,0.0
27.0,3,1.0,0.0,1.0
14.0,2,1.0,1.0,1.0


In [14]:
assembler = VectorAssembler(inputCols=["age","Pclass","gender_new","embarked_new"],outputCol="features")
X_assembler = assembler.transform(data)
X_assembler.show()

In [15]:
X_train, X_test = X_assembler.randomSplit([0.8, 0.2])

In [16]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
dt = DecisionTreeRegressor(labelCol="survived_new", featuresCol="features")

In [17]:
dt_model = dt.fit(X_train)
dt_predictions = dt_model.transform(X_test)
dt_predictions.select("prediction", "survived_new", "features").show()

In [18]:
evaluator = RegressionEvaluator(
    labelCol="survived_new", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [19]:
dt_predictions.count()

In [20]:
display(dt_predictions)

age,Pclass,gender_new,embarked_new,survived_new,features,prediction
1.0,2,0.0,1.0,1.0,"List(1, 4, List(), List(1.0, 2.0, 0.0, 1.0))",1.0
1.0,3,1.0,0.0,1.0,"List(1, 4, List(), List(1.0, 3.0, 1.0, 0.0))",0.2857142857142857
2.0,1,1.0,0.0,0.0,"List(1, 4, List(), List(2.0, 1.0, 1.0, 0.0))",0.9565217391304348
2.0,2,1.0,0.0,1.0,"List(1, 4, List(), List(2.0, 2.0, 1.0, 0.0))",1.0
2.0,3,0.0,0.0,0.0,"List(1, 4, List(), List(2.0, 3.0, 0.0, 0.0))",0.5
2.0,3,1.0,0.0,0.0,"List(1, 4, List(), List(2.0, 3.0, 1.0, 0.0))",0.2857142857142857
2.0,3,1.0,0.0,1.0,"List(1, 4, List(), List(2.0, 3.0, 1.0, 0.0))",0.2857142857142857
3.0,3,0.0,0.0,1.0,"List(1, 4, List(), List(3.0, 3.0, 0.0, 0.0))",0.5
4.0,3,1.0,0.0,1.0,"List(1, 4, List(), List(4.0, 3.0, 1.0, 0.0))",0.2857142857142857
4.0,3,1.0,0.0,1.0,"List(1, 4, List(), List(4.0, 3.0, 1.0, 0.0))",0.2857142857142857
