### Import Libraries

Use Jupyter Notebook as Spark IDE

In [1]:
import findspark
findspark.init()

Import required libraries

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

import pandas as pd

In [None]:
sc=SparkContext.getOrCreate()
ss=SparkSession(sc)

### Load Data

Load data

In [None]:
data=ss.read.csv("../data/titanic.csv",inferSchema=True, header=True)

In [None]:
# data.cache() 

Get data structure

In [None]:
data.printSchema()

Get first 5 records

In [None]:
data.show(5)

In [None]:
data.columns

Describe the data

In [None]:
data.describe().show(truncate=True, vertical=True)

### Preprocess Data

Rename columns

In [None]:
data=data.withColumnRenamed('home.dest','homedest')

Check for missing values

In [None]:
from pyspark.sql import functions
for i in data.columns:
    print(i,data.where(functions.col(i).isNull()).count())

Get records with missing data column

In [None]:
data.where(data['age'].isNull()).show()

In [None]:
data.where(data['embarked'].isNull()).show()

In [None]:
data.where(data['cabin'].isNull()).show(5)

Check Data Type

In [None]:
data.dtypes

Get the mean age

In [None]:
from pyspark.sql.functions import mean
data.select(mean('age')).collect()[0][0]

Impute missing age value with mean age

In [None]:
data=data.na.fill(data.select(mean('age')).collect()[0][0],subset=['age'])

In [None]:
# Check missing age records
data.where(data['age'].isNull()).show()

Impute fare with mean fare

In [None]:
data=data.na.fill(data.select(mean('fare')).collect()[0][0],subset=['fare'])

In [None]:
# Check missing fare records
data.where(data['fare'].isNull()).show()

Impute Embarked with Mode

In [None]:
# Get the mode of the embarked column
data.groupby("embarked").count().orderBy("count", ascending=False).first()[0]

In [None]:
# Impute the embarked with mode value
data=data.na.fill(data.groupby("embarked").count().orderBy("embarked", ascending=False).first()[0],subset=['embarked'])

In [None]:
data.where(data['embarked'].isNull()).show()

Impute cabin with Mode

In [None]:
# Impute the embarked with mode value
data=data.na.fill(data.groupby("cabin").count().orderBy("cabin", ascending=False).first()[0],subset=['cabin'])

In [None]:
data.where(data['cabin'].isNull()).show(5)

Count Distinct Values in Columns

In [None]:
from pyspark.sql.functions import col,countDistinct,max,min
data.agg(countDistinct(col("sex")).alias("sex")).show()

In [None]:
data.groupBy('sex').count().show()

In [None]:
data.agg(countDistinct(col("embarked")).alias("embarked")).show()

In [None]:
data.groupBy('embarked').count().show()

In [None]:
data.agg(countDistinct(col("sibsp")).alias("sibsp")).show()

In [None]:
data.groupBy('sibsp').count().show()

In [None]:
data.agg(countDistinct(col("parch")).alias("parch")).show()

In [None]:
data.groupBy('parch').count().show()

Drop all records with null target variable

In [None]:
data=data.na.drop('any',subset=['survived'])

In [None]:
for i in data.columns:
    print(i,data.where(functions.col(i).isNull()).count())

In [None]:
data.show(5)

Convert sex and embarked data to numeric features

In [None]:
data.dtypes

Bin age into three categories

In [None]:
from pyspark.ml.feature import Bucketizer
age_bucketizer = Bucketizer(splits=[ 0, 20, 50, 80, float('Inf') ],inputCol="age", outputCol="age_bin")
data = age_bucketizer.setHandleInvalid("keep").transform(data)

In [None]:
data.show(5)

Bin fare into three categories

In [None]:
data.agg(max("fare")).show()

In [None]:
data.agg(min("fare")).show()

In [None]:
fare_bucketizer = Bucketizer(splits=[ 0, 200, 400, 6000, float('Inf') ],inputCol="fare", outputCol="fare_bin")
data = fare_bucketizer.setHandleInvalid("keep").transform(data)

In [None]:
data.show(5)

Family Size

In [None]:
data=data.withColumn("family_size", data['sibsp']+data['parch'])

In [None]:
data.groupBy('family_size').count().show()

In [None]:
# Bin family size
family_bucketizer = Bucketizer(splits=[ 0, 1, 4, 30, float('Inf') ],inputCol="family_size", outputCol="family_size_bin")
data = family_bucketizer.setHandleInvalid("keep").transform(data)

In [None]:
data.show(5)

In [None]:
data.dtypes

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorAssembler

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(data) for column in ["sex","embarked","family_size_bin","age_bin","fare_bin"]]
indexers_pipeline = Pipeline(stages=indexers)
data = indexers_pipeline.fit(data).transform(data)


# data = data.select("pclass","sibsp","parch","sex","embarked","family_size_bin","age_bin","fare_bin","survived")

# cat_columns=["pclass","sibsp","parch","sex","embarked","family_size_bin","age_bin","fare_bin"]
# stages=[]

# for i in cat_columns:
#     stringIndexer = StringIndexer(inputCol = i, outputCol = i + '_Index')
#     encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[i + "_encodecVec"])
#     stages += [stringIndexer, encoder]

# # label_stringIdx = StringIndexer(inputCol = 'survived', outputCol = 'label')
# # stages += [label_stringIdx]
    
# nume_cols = ['fare', 'age','survived']
# assemblerInputs = [c + "classVec" for c in cat_columns] + nume_cols
# assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
# stages += [assembler]

In [None]:
# from pyspark.ml import Pipeline

# pipeline = Pipeline(stages = stages)
# pipelineModel = pipeline.fit(data)
# data = pipelineModel.transform(data)
# selectedCols = ['survived', 'features'] 
# data = data.select(selectedCols)

In [None]:
data.show(2,vertical=True)

Convert string to numeric type

In [None]:
data=data.withColumn('sex_index',data.sex_index.cast('int')).withColumn('embarked_index',data.embarked_index.cast('int')).withColumn('family_size_bin_index',data.family_size_bin_index.cast('int')).withColumn('age_bin_index',data.age_bin_index.cast('int')).withColumn('fare_bin_index',data.fare_bin_index.cast('int'))

In [None]:
data.dtypes

Drop unwanted columns

In [None]:
data=data[['pclass', 'survived', 'sibsp', 'parch', 'sex_index','embarked_index','family_size_bin_index','age_bin_index','fare_bin_index']]

In [None]:
data.show(5)

Encoding Categorical features

In [None]:
cat_data=data[['pclass', 'sibsp', 'parch', 'sex_index','embarked_index','family_size_bin_index','age_bin_index','fare_bin_index']]

In [None]:
encoders = [OneHotEncoder(inputCol=indexed_column, outputCol=indexed_column+"_enc").fit(data) for indexed_column in ["pclass","sibsp","parch","sex_index","embarked_index","family_size_bin_index","age_bin_index","fare_bin_index"]]

In [None]:
encoders_pipeline = Pipeline(stages=encoders)
data = encoders_pipeline.fit(data).transform(data)

In [None]:
data.show(1, vertical=True)

Create Input Vector

In [None]:
inputAssembler=['pclass_enc', 'sibsp_enc', 'parch_enc', 'sex_index_enc', 'embarked_index_enc', 'family_size_bin_index_enc', 'age_bin_index_enc', 'fare_bin_index_enc']

In [None]:
assembler=VectorAssembler(inputCols=inputAssembler,outputCol="features").transform(data)

In [None]:
assembler.show(1, vertical=True)

In [None]:
ml_data=assembler[['pclass_enc', 'sibsp_enc', 'parch_enc', 'sex_index_enc', 'embarked_index_enc', 'family_size_bin_index_enc', 'age_bin_index_enc', 'fare_bin_index_enc','features','survived']]
ml_data.show(1, vertical=True)

Split data into training and testing sets

In [None]:
(train, test) = ml_data.randomSplit([0.7,0.3])

In [None]:
train.select('features','survived').show(5)

In [None]:
test.select('features','survived').show(5)

# Machine Learning Modeling

## 1.RandomForestClassifier

In [None]:
from pyspark.ml.classification import RandomForestClassifier

In [None]:
rm_model=RandomForestClassifier(labelCol='survived').fit(train)
rm_model

In [None]:
rm_test_model=rm_model.transform(train)
rm_test_model

Evaluate Random Forest ML Model

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
print("Accuracy :")
MulticlassClassificationEvaluator(labelCol='survived',metricName='accuracy').evaluate(rm_test_model)

In [None]:
print("Precision :")
MulticlassClassificationEvaluator(labelCol='survived', metricName='weightedPrecision').evaluate(rm_test_model)

In [None]:
print("Recal :")
MulticlassClassificationEvaluator(labelCol='survived', metricName='weightedRecall').evaluate(rm_test_model)

## 2. LogisticRegression

In [None]:
from pyspark.ml.classification import LogisticRegression
log_reg = LogisticRegression(labelCol='survived')
log_reg_model = log_reg.fit(train)

In [None]:
log_reg_model_test=log_reg_model.transform(train)
log_reg_model_test

Evaluate Random Forest ML Model

In [None]:
print("Accuracy :")
MulticlassClassificationEvaluator(labelCol='survived',metricName='accuracy').evaluate(log_reg_model_test)

In [None]:
print("Precision :")
MulticlassClassificationEvaluator(labelCol='survived', metricName='weightedPrecision').evaluate(log_reg_model_test)

In [None]:
print("Recal :")
MulticlassClassificationEvaluator(labelCol='survived', metricName='weightedRecall').evaluate(log_reg_model_test)

In [None]:
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')

plt.figure(figsize=(14,7))
plt.title('AUROC - Area Under the Receiver Operating Characteristics')
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(log_reg_model.summary.roc.select('FPR').collect(),
         log_reg_model.summary.roc.select('TPR').collect())
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.show()

In [None]:
## 3. DecisionTreeClassifier

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

In [None]:
decision_tree = DecisionTreeClassifier(labelCol='survived')
decision_tree_model = decision_tree.fit(train)

In [None]:
decision_tree_model_test=decision_tree_model.transform(train)
decision_tree_model_test

# Model Testing

In [None]:
rm_test_model.show(5)

In [None]:
rm_test_model.select('features','rawPrediction','survived','prediction','probability').show(5)