# A simple Spark ML example

Using the titanic dataset, a simple Spark ML example

In [None]:
import findspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Create a spark session
spark = SparkSession.builder \
    .appName("Spark Titanic") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.memory", "2g")\
    .config("spark.sql.shuffle.partitions", "2")\
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('ERROR')
findspark.find()

In [None]:
df = spark.read.format("csv").option('header', True).load('./titanicData/train.csv')

In [None]:
df.show(5)

In [None]:
# Memory expensive since it loads everything into memory
df.toPandas()

In [None]:
df.count()

In [None]:
df.columns

In [None]:
df.dtypes

In [None]:
df.printSchema()

In [None]:
df.describe().toPandas()

All the columns are shown as string types, that's not correct.</br>
Thus, cast some of the columns to numeric

In [None]:
from pyspark.sql.functions import col

dataset = df.select(col('Survived').cast('float'),
            col('Pclass').cast('float'),
            col('Sex'), 
            col('Age').cast('float'),
            col('Fare').cast('float'),
            col('Embarked'))

In [None]:
dataset.show(5)

In [None]:
dataset.printSchema()

In [None]:
# To check number of null values in columms
from pyspark.sql.functions import isnull, when, count, col

dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

In [None]:
dataset.filter('Age is null').count()

In [None]:
# Replace null values in columns
dataset = dataset.replace('?', None).dropna(how='any')

In [None]:
dataset.count()

# Encoding categorical columns with StringIndexer

Index the columns 1 by 1

In [None]:
from pyspark.ml.feature import StringIndexer

# Order options: frequencyDesc, frequencyAsc, alphabetDesc, alphabetAsc
# handleInvalid options: skip, error, keep
# genderIndexer = StringIndexer(inputCol='Sex', outputCol='gender', handleInvalid='keep', stringOrderType='frequencyDesc')
# boardedIndexer = StringIndexer(inputCol='Embarked', outputCol='boarded', handleInvalid='keep', stringOrderType='frequencyDesc')

# dataset = genderIndexer.fit(dataset).transform(dataset)
# dateset = boardedIndexer.fit(dataset).transform(dataset)

dataset = StringIndexer(
    inputCol = 'Sex',
    outputCol = 'Gender',
    handleInvalid = 'keep',
    stringOrderType='frequencyDesc').fit(dataset).transform(dataset)

dataset = StringIndexer(
    inputCol = 'Embarked',
    outputCol = 'Boarded',
    handleInvalid = 'keep',
    stringOrderType='frequencyDesc').fit(dataset).transform(dataset)

dataset.show()

Index multiple columns at once

In [None]:
from pyspark.ml.feature import StringIndexer

# StringIndexer also accepts arrays
dataset = StringIndexer(inputCols=['Sex', 'Embarked'], outputCols=['Gender', 'Boarded'], handleInvalid='keep', 
stringOrderType='frequencyDesc').fit(dataset).transform(dataset)

dataset.show()

For StringIndexer, there's no method to retrieve the mapping? </br>
Only through ordering by frequency or dependent on the ordering type?

In [None]:
dataset.groupBy('gender', 'sex').count().orderBy('count', ascending=False).show()

In [None]:
dataset.select('gender', 'sex').distinct().orderBy('gender', ascending=True).show()

In [None]:
dataset.show()

In [None]:
dataset.select('embarked', 'boarded').distinct().orderBy('Embarked', ascending=True).show()

In [None]:
dataset.groupby('embarked', 'boarded').count().orderBy('count', ascending=False).show()

In [None]:
# Drop unnecesary columns
dataset = dataset.drop('Sex')
dataset = dataset.drop('Embarked')

dataset.show(5)

## Column features

Combine all the feature columns (excluding the target column) into 1 vector with <strong>VectorAssembler</strong>

In [None]:
from pyspark.ml.feature import VectorAssembler

required_features = ['Pclass', 'Age', 'Fare', 'Gender', 'Boarded']

assembler = VectorAssembler(inputCols=required_features, outputCol='features')

transformed_data = assembler.transform(dataset)
transformed_data.show(5)

# Modelling

In [None]:
(training_data, test_data) = transformed_data.randomSplit([0.8, 0.2])
print(training_data.count())
print(test_data.count())

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='Survived', featuresCol='features', maxDepth=5)
model = rf.fit(training_data)
predictions = model.transform(test_data)

## Evaluate the model

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='Survived', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f'Test accuracy: {accuracy}')

In [None]:
import matplotlib.pyplot as plt

trainingSummary = model.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'], roc['TPR'])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.title('ROC Curve')
plt.show()

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'], pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.title('Precision-Recall Curve')
plt.show()

In [None]:
# metricName options: accuracy, f1, precisionByLabel, recallByLabel
eval_acc = MulticlassClassificationEvaluator(labelCol='Survived', predictionCol='prediction', metricName='accuracy')
eval_pre = MulticlassClassificationEvaluator(labelCol='Survived', predictionCol='prediction', metricName='precisionByLabel')
eval_rec = MulticlassClassificationEvaluator(labelCol='Survived', predictionCol='prediction', metricName='recallByLabel')
eval_f1 = MulticlassClassificationEvaluator(labelCol='Survived', predictionCol='prediction', metricName='f1')
accuracy = eval_acc.evaluate(predictions)
precision = eval_pre.evaluate(predictions)
recall = eval_rec.evaluate(predictions)
f1 = eval_f1.evaluate(predictions)

print(f'Accracy: {accuracy:.2f}')
print(f'Precision: {precision:.2f}')
print(f'Recall: {recall:.2f}')
print(f'f1 score: {f1:.2f}')

In [None]:
# Confusion Matrix
from pyspark.mllib.evaluation import MulticlassMetrics
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType

#important: need to cast to float type, and order by prediction, else it won't work
preds_and_labels = predictions.select(['prediction','Survived']).withColumn('Survived', F.col('Survived').cast(FloatType())).orderBy('prediction')
# select only prediction and label columns
preds_and_labels = preds_and_labels.select(['prediction','Survived'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
cm2 = metrics.confusionMatrix().toArray()
disp2 = ConfusionMatrixDisplay(cm2)
disp2.plot()
plt.title('Confusion Matrix from PySpark')
plt.show()

# Has error using jupyter, on .py file it is fine, refer to sparkTitanic.py

In [None]:
# Alternative: using sklearn confusion matrix
# if the data size is huge, sklearn may not be able to handle
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

y_true = predictions.select('Survived').collect()
y_pred = predictions.select('prediction').collect()

print(classification_report(y_true, y_pred))
cm = confusion_matrix(y_true, y_pred)
# disp = ConfusionMatrixDisplay(cm, display_labels=['Not Survived', 'Survived'])
disp = ConfusionMatrixDisplay(cm)
disp.plot()


In [None]:
predictionAndLabels = sc.parallelize([(0.0, 0.0), (0.0, 1.0), (0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (1.0, 1.0)])
print(type(predictionAndLabels.collect()))
# metrics = MulticlassMetrics(predictionAndLabels)
# metrics.confusionMatrix().toArray()