## PySpark Setup

In [1]:
# Download Java and Spark

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
# Set up the paths

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

In [3]:
# Create a Spark session

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark.conf.set("spark.sql.caseSensitive", True) # Avoid error "Found duplicate column(s) in the data schema"
spark

## 3. PySpark MLlib first impression

In [4]:
# Import necessary PySpark modules
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.stat import Correlation
from pyspark.ml.linalg import DenseMatrix
from pyspark.sql import functions as f
from pyspark.ml.linalg import DenseVector

In [5]:
# First load the data (be sure it is uploaded in the runtime)

titanic_train = spark.read.csv('titanic_train.csv', header = True, inferSchema=True)

In [6]:
# Take a look

titanic_train.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [7]:
# Produce summary statistics
# The "count" row gives the info about null values

titanic_train.describe()

summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
min,1.0,0.0,1.0,"""Andersson, Mr. A...",female,0.42,0.0,0.0,110152,0.0,A10,C
max,891.0,1.0,3.0,"van Melkebeke, Mr...",male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


In [8]:
# Drop columns less informative (Sequencial, ID, Monotonic, Too much missing values)

titanic_train = titanic_train.drop("PassengerId","Name","Ticket","Cabin")

In [9]:
# Explore the target variable "Survived"

titanic_train.groupBy("Survived").count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



In [10]:
# Calculate and display count and proportion of each group in 'Survived'
titanic_train.groupBy("Survived").agg(F.count("*").alias("Count")).withColumn("Proportion"
                                                                            , F.col("Count")
                                                                            / titanic_train.count()).show()


+--------+-----+------------------+
|Survived|Count|        Proportion|
+--------+-----+------------------+
|       1|  342|0.3838383838383838|
|       0|  549|0.6161616161616161|
+--------+-----+------------------+



In [11]:
# Explore correlation of Embarked
titanic_train.groupBy("Embarked","Survived").count().orderBy("Embarked","Survived").show()

+--------+--------+-----+
|Embarked|Survived|count|
+--------+--------+-----+
|    null|       1|    2|
|       C|       0|   75|
|       C|       1|   93|
|       Q|       0|   47|
|       Q|       1|   30|
|       S|       0|  427|
|       S|       1|  217|
+--------+--------+-----+



In [12]:
# Handle missing values - Impute 'Embarked' with mode
mode_embarked = titanic_train.groupBy("Embarked").count().orderBy(F.desc("count")).first()["Embarked"]
titanic_train = titanic_train.fillna({"Embarked": mode_embarked})

In [13]:
#Check again
titanic_train.groupBy("Embarked","Survived").count().orderBy("Embarked","Survived").show()

+--------+--------+-----+
|Embarked|Survived|count|
+--------+--------+-----+
|       C|       0|   75|
|       C|       1|   93|
|       Q|       0|   47|
|       Q|       1|   30|
|       S|       0|  427|
|       S|       1|  219|
+--------+--------+-----+



In [14]:
## Step 1: StringIndexer to encode 'Embarked' to numerical indices
#indexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
#titanic_train = indexer.fit(titanic_train).transform(titanic_train)

## Step 2: One-Hot Encoding for 'EmbarkedIndex' using OneHotEncoder
#encoder = OneHotEncoder(inputCols=["EmbarkedIndex"], outputCols=["EmbarkedVec"])
#titanic_train = encoder.fit(titanic_train).transform(titanic_train)

# Manually create one-hot encoded columns for 'Embarked'
titanic_train = titanic_train.withColumn("Embarked_S", F.when(titanic_train["Embarked"] == "S", 1).otherwise(0))
titanic_train = titanic_train.withColumn("Embarked_C", F.when(titanic_train["Embarked"] == "C", 1).otherwise(0))

In [15]:
#titanic_train.groupBy("Embarked","EmbarkedIndex","EmbarkedVec").count().orderBy("Embarked","EmbarkedIndex","EmbarkedVec").show()
titanic_train.groupBy("Embarked","Embarked_S","Embarked_C").count().orderBy("Embarked","Embarked_S","Embarked_C").show()

+--------+----------+----------+-----+
|Embarked|Embarked_S|Embarked_C|count|
+--------+----------+----------+-----+
|       C|         0|         1|  168|
|       Q|         0|         0|   77|
|       S|         1|         0|  646|
+--------+----------+----------+-----+



In [16]:
# Explore correlation : Sex

titanic_train.groupBy("Sex","Survived").count().orderBy("Sex","Survived").show()

+------+--------+-----+
|   Sex|Survived|count|
+------+--------+-----+
|female|       0|   81|
|female|       1|  233|
|  male|       0|  468|
|  male|       1|  109|
+------+--------+-----+



In [17]:
## Dummy variable for gender
#from pyspark.ml.feature import StringIndexer

## Create indexer for gender
#stringIndexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')

## Fit to dataframe
#myFit = stringIndexer.fit(titanic_train)

## Apply the transform
#titanic_train = myFit.transform(titanic_train)

# Manually create one-hot encoded columns for 'Sex'
titanic_train = titanic_train.withColumn("SexIndex", F.when(titanic_train["Sex"] == "male", 1).otherwise(0))

In [18]:
#Check the transformation
titanic_train.groupBy("Sex","SexIndex").count().orderBy("Sex","SexIndex").show()

+------+--------+-----+
|   Sex|SexIndex|count|
+------+--------+-----+
|female|       0|  314|
|  male|       1|  577|
+------+--------+-----+



In [19]:
titanic_train.show(5)

+--------+------+------+----+-----+-----+-------+--------+----------+----------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Embarked_S|Embarked_C|SexIndex|
+--------+------+------+----+-----+-----+-------+--------+----------+----------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|         1|         0|       1|
|       1|     1|female|38.0|    1|    0|71.2833|       C|         0|         1|       0|
|       1|     3|female|26.0|    0|    0|  7.925|       S|         1|         0|       0|
|       1|     1|female|35.0|    1|    0|   53.1|       S|         1|         0|       0|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|         1|         0|       1|
+--------+------+------+----+-----+-----+-------+--------+----------+----------+--------+
only showing top 5 rows



In [20]:
## Drop the original "Sex","Embarked","EmbarkedIndex" column
#titanic_train = titanic_train.drop("Sex","Embarked","EmbarkedIndex")
titanic_train = titanic_train.drop("Sex","Embarked")

In [21]:
titanic_train.show(5)

+--------+------+----+-----+-----+-------+----------+----------+--------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|Embarked_S|Embarked_C|SexIndex|
+--------+------+----+-----+-----+-------+----------+----------+--------+
|       0|     3|22.0|    1|    0|   7.25|         1|         0|       1|
|       1|     1|38.0|    1|    0|71.2833|         0|         1|       0|
|       1|     3|26.0|    0|    0|  7.925|         1|         0|       0|
|       1|     1|35.0|    1|    0|   53.1|         1|         0|       0|
|       0|     3|35.0|    0|    0|   8.05|         1|         0|       1|
+--------+------+----+-----+-----+-------+----------+----------+--------+
only showing top 5 rows



In [22]:
titanic_train.select('Age').describe().show()

+-------+------------------+
|summary|               Age|
+-------+------------------+
|  count|               714|
|   mean| 29.69911764705882|
| stddev|14.526497332334035|
|    min|              0.42|
|    max|              80.0|
+-------+------------------+



In [23]:
# Finally replace null values in "Age" column with average age
# Calculate the average
avg_age = titanic_train.select(f.avg('Age')).collect()[0][0]

# Replace
titanic_train = titanic_train.na.fill({"Age" : avg_age})

In [24]:
# Add up SibSp (Siblings/Spouses) and Parch (Parents/Children) to make a new variable

titanic_train = titanic_train.withColumn("FamilySize",f.col('SibSp')+f.col('Parch'))

# Drop the previous columns
titanic_train = titanic_train.drop("SibSp","Parch")

In [25]:
titanic_train.show(5)

+--------+------+----+-------+----------+----------+--------+----------+
|Survived|Pclass| Age|   Fare|Embarked_S|Embarked_C|SexIndex|FamilySize|
+--------+------+----+-------+----------+----------+--------+----------+
|       0|     3|22.0|   7.25|         1|         0|       1|         1|
|       1|     1|38.0|71.2833|         0|         1|       0|         1|
|       1|     3|26.0|  7.925|         1|         0|       0|         0|
|       1|     1|35.0|   53.1|         1|         0|       0|         1|
|       0|     3|35.0|   8.05|         1|         0|       1|         0|
+--------+------+----+-------+----------+----------+--------+----------+
only showing top 5 rows



In [26]:
titanic_train.describe()

summary,Survived,Pclass,Age,Fare,Embarked_S,Embarked_C,SexIndex,FamilySize
count,891.0,891.0,891.0,891.0,891.0,891.0,891.0,891.0
mean,0.3838383838383838,2.308641975308642,29.699117647058763,32.2042079685746,0.7250280583613917,0.1885521885521885,0.6475869809203143,0.904601571268238
stddev,0.4865924542648575,0.8360712409770491,13.002015226002891,49.69342859718089,0.4467509100341467,0.3913721645054734,0.4779900708960982,1.613458541355087
min,0.0,1.0,0.42,0.0,0.0,0.0,0.0,0.0
max,1.0,3.0,80.0,512.3292,1.0,1.0,1.0,10.0


In [27]:
# Explore another correlation

titanic_train.groupBy("Pclass","Survived").count().orderBy("Pclass","Survived").show()

+------+--------+-----+
|Pclass|Survived|count|
+------+--------+-----+
|     1|       0|   80|
|     1|       1|  136|
|     2|       0|   97|
|     2|       1|   87|
|     3|       0|  372|
|     3|       1|  119|
+------+--------+-----+



In [28]:
titanic_train.dtypes

[('Survived', 'int'),
 ('Pclass', 'int'),
 ('Age', 'double'),
 ('Fare', 'double'),
 ('Embarked_S', 'int'),
 ('Embarked_C', 'int'),
 ('SexIndex', 'int'),
 ('FamilySize', 'int')]

In [29]:
# Ensure no null values in numerical columns - Checked in describe
# Including the target variable 'Survived'
numerical_cols = ['Survived', 'Pclass', 'Age', 'FamilySize', 'Fare', 'SexIndex', 'Embarked_S', 'Embarked_C']

# Prepare the feature vector for correlation calculation
assembler = VectorAssembler(inputCols=numerical_cols, outputCol="features")
feature_data = assembler.transform(titanic_train).select("features")

# Convert vectors to dense format to avoid sparse issues (if necessary)
feature_data = feature_data.rdd.map(lambda row: (DenseVector(row['features'].toArray()),)).toDF(['features'])

# Calculate the Pearson correlation matrix
pearson_corr_matrix = Correlation.corr(feature_data, "features", method="pearson").head()[0]

# Convert the correlation matrix to a readable format
corr_array = pearson_corr_matrix.toArray()

# Display the Pearson correlation matrix as a DataFrame
import pandas as pd
corr_df = pd.DataFrame(corr_array, columns=numerical_cols, index=numerical_cols)
print(corr_df)


            Survived    Pclass       Age  FamilySize      Fare  SexIndex  \
Survived    1.000000 -0.338481 -0.069809    0.016639  0.257307 -0.543351   
Pclass     -0.338481  1.000000 -0.331339    0.065997 -0.549500  0.131900   
Age        -0.069809 -0.331339  1.000000   -0.248512  0.091566  0.084153   
FamilySize  0.016639  0.065997 -0.248512    1.000000  0.217138 -0.200988   
Fare        0.257307 -0.549500  0.091566    0.217138  1.000000 -0.182333   
SexIndex   -0.543351  0.131900  0.084153   -0.200988 -0.182333  1.000000   
Embarked_S -0.149683  0.074053 -0.019336    0.077359 -0.162184  0.119224   
Embarked_C  0.168240 -0.243292  0.032024   -0.046215  0.269335 -0.082853   

            Embarked_S  Embarked_C  
Survived     -0.149683    0.168240  
Pclass        0.074053   -0.243292  
Age          -0.019336    0.032024  
FamilySize    0.077359   -0.046215  
Fare         -0.162184    0.269335  
SexIndex      0.119224   -0.082853  
Embarked_S    1.000000   -0.782742  
Embarked_C   -0.782



In [30]:
#Checking the VIF values
numerical_cols = ['Pclass', 'Age', 'FamilySize', 'Fare', 'SexIndex', 'Embarked_S', 'Embarked_C']


# Function to calculate VIF for each feature
def calculate_vif(df, features):
    vif_data = []

    for feature in features:
        # Define the dependent variable (current feature)
        y = feature

        # Define the independent variables (all features except the current one)
        X = [f for f in features if f != y]

        # Assemble independent features into a vector
        assembler = VectorAssembler(inputCols=X, outputCol="features")
        feature_data = assembler.transform(df).select(F.col(y).alias("label"), "features")

        # Fit the Linear Regression model
        lr = LinearRegression(featuresCol="features", labelCol="label")
        lr_model = lr.fit(feature_data)

        # Calculate R-squared value
        r_squared = lr_model.summary.r2

        # Handle R-squared values of 1 to prevent division by zero
        if r_squared >= 1.0:
            vif = float('inf')  # Assign infinity or a very high value if perfect multicollinearity exists
        else:
            vif = 1 / (1 - r_squared)

        # Append the feature name and its VIF value
        vif_data.append({"Feature": y, "VIF": vif})

    # Convert results to a Pandas DataFrame for display
    vif_df = pd.DataFrame(vif_data)
    return vif_df

# Calculate VIF for all features
vif_results = calculate_vif(titanic_train, numerical_cols)

# Display VIF results
print(vif_results)

      Feature       VIF
0      Pclass  1.766262
1         Age  1.210913
2  FamilySize  1.217097
3        Fare  1.645791
4    SexIndex  1.101646
5  Embarked_S  2.760252
6  Embarked_C  2.888039


In [31]:
# Prepare the feature vector

feature = VectorAssembler(inputCols=titanic_train.columns[1:],outputCol="features")
feature_vector= feature.transform(titanic_train)

In [32]:
# Take a look

feature_vector.show()

+--------+------+-----------------+-------+----------+----------+--------+----------+--------------------+
|Survived|Pclass|              Age|   Fare|Embarked_S|Embarked_C|SexIndex|FamilySize|            features|
+--------+------+-----------------+-------+----------+----------+--------+----------+--------------------+
|       0|     3|             22.0|   7.25|         1|         0|       1|         1|[3.0,22.0,7.25,1....|
|       1|     1|             38.0|71.2833|         0|         1|       0|         1|[1.0,38.0,71.2833...|
|       1|     3|             26.0|  7.925|         1|         0|       0|         0|[3.0,26.0,7.925,1...|
|       1|     1|             35.0|   53.1|         1|         0|       0|         1|[1.0,35.0,53.1,1....|
|       0|     3|             35.0|   8.05|         1|         0|       1|         0|[3.0,35.0,8.05,1....|
|       0|     3|29.69911764705882| 8.4583|         0|         0|       1|         0|[3.0,29.699117647...|
|       0|     1|             54.0|51

In [33]:
# Make a split

(training, test) = feature_vector.randomSplit([0.8, 0.2],seed = 6886)

In [34]:
from pyspark.ml.regression import GeneralizedLinearRegression

# Prepare the feature vector
numerical_cols = ['Pclass', 'Age', 'FamilySize', 'Fare', 'SexIndex', 'Embarked_S', 'Embarked_C']
assembler = VectorAssembler(inputCols=numerical_cols, outputCol="features")
data = assembler.transform(titanic_train).select("features", "Survived")

# Fit the Generalized Linear Regression model as logistic regression (family='binomial')
glm = GeneralizedLinearRegression(featuresCol="features", labelCol="Survived", family="binomial", link="logit")
glm_model = glm.fit(data)

# Access the model summary
glm_summary = glm_model.summary

# Extract coefficients and intercepts
coefficients = glm_model.coefficients
intercept = glm_model.intercept

# Convert coefficients to a readable format
coefficients_list = list(coefficients)

# Extract standard errors for the coefficients (excluding intercept)
standard_errors = glm_summary.coefficientStandardErrors[1:]  # Exclude the first element (intercept's standard error)

# Extract p-values (excluding intercept)
p_values = glm_summary.pValues[1:]  # Exclude the intercept p-value

# Perform Wald Test: Coefficient / Standard Error
wald_statistics = [coeff / se if se != 0 else float('inf') for coeff, se in zip(coefficients_list, standard_errors)]

# Display results
results = pd.DataFrame({
    "Feature": numerical_cols,
    "Coefficient": coefficients_list,
    "Standard Error": standard_errors,
    "Wald Statistic": wald_statistics,
    "P-Value": p_values
})

# Display the results to check variable significance
print(results)


      Feature  Coefficient  Standard Error  Wald Statistic       P-Value
0      Pclass    -1.094738        0.007820     -139.989681  5.978836e-07
1         Age    -0.039037        0.068232       -0.572128  1.101224e-03
2  FamilySize    -0.222661        0.002388      -93.233237  3.740475e-01
3        Fare     0.002123        0.199635        0.010634  0.000000e+00
4    SexIndex    -2.746158        0.328415       -8.361861  2.687425e-01
5  Embarked_S    -0.363215        0.380759       -0.953922  8.651122e-01
6  Embarked_C     0.064680        0.646783        0.100002  8.881784e-16


In [35]:
# Use logistic regression
lr = LogisticRegression(labelCol="Survived", featuresCol="features")

# Set up the model
lrModel = lr.fit(training)
lr_prediction = lrModel.transform(test)
lr_prediction.select("prediction", "Survived", "features").orderBy("Survived",ascending=False).show(5)
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

# Report the accuracy

lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of this Logistic Regression model is %g"% (lr_accuracy))

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       1|[1.0,11.0,120.0,1...|
|       1.0|       1|[1.0,29.699117647...|
|       1.0|       1|[1.0,14.0,120.0,1...|
|       1.0|       1|[1.0,23.0,63.3583...|
|       1.0|       1|[1.0,27.0,30.5,1....|
+----------+--------+--------------------+
only showing top 5 rows

Accuracy of this Logistic Regression model is 0.803279


In [41]:
#Below function will capture all the metrics for respective models
import pandas as pd

# Initialize an empty DataFrame to store model evaluation metrics
metrics_df = pd.DataFrame(columns=['Model', 'AUC', 'Accuracy', 'Precision', 'Recall', 'F1'])

# Function to evaluate the model and append results to the DataFrame
def evaluate_model(model_name, predictions, metrics_df):
    # Evaluate the model with AUC, Accuracy, Recall, Precision, etc.
    evaluator = BinaryClassificationEvaluator(labelCol="Survived", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
    auc = evaluator.evaluate(predictions)

    # Extract metrics such as Accuracy, Precision, Recall, F1 Score
    multi_evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction")
    accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
    precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
    recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
    f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})

    # Create a DataFrame with the new metrics
    new_metrics = pd.DataFrame([{
        'Model': model_name,
        'AUC': auc,
        'Accuracy': accuracy,
        'Precision': precision,
        'Recall': recall,
        'F1': f1
    }])

    # Check if new_metrics is empty or all-NA before concatenating
    if not new_metrics.empty and not new_metrics.isna().all(axis=None):
        metrics_df = pd.concat([metrics_df, new_metrics], ignore_index=True)

    return metrics_df

In [42]:
# Train logistic regression model
lr = LogisticRegression(labelCol="Survived", featuresCol="features", maxIter=10)
lr_model = lr.fit(training)

# Predict on test data
predictions = lr_model.transform(test)

# Show the first few predictions
predictions.select("prediction", "Survived", "probability").show(10, truncate=False)

# Evaluate the model with AUC, Accuracy, Recall, Precision, etc.
evaluator = BinaryClassificationEvaluator(labelCol="Survived", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)

# Extract metrics such as TPR, FPR, Recall, Precision, Accuracy
multi_evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction")
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})

# Display metrics
print(f"AUC: {auc}")
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")

# Evaluate the Logistic Regression model and add metrics to the DataFrame
metrics_df = evaluate_model("Logistic Regression", predictions, metrics_df)

+----------+--------+----------------------------------------+
|prediction|Survived|probability                             |
+----------+--------+----------------------------------------+
|1.0       |0       |[0.02903860682002943,0.9709613931799705]|
|1.0       |0       |[0.3084646035207361,0.6915353964792639] |
|1.0       |0       |[0.06207722261413074,0.9379227773858693]|
|0.0       |0       |[0.5255388381873344,0.4744611618126656] |
|0.0       |0       |[0.5365894384326478,0.46341056156735216]|
|0.0       |0       |[0.5133352831643637,0.4866647168356363] |
|0.0       |0       |[0.5059172825210869,0.4940827174789131] |
|0.0       |0       |[0.5269615595562157,0.4730384404437843] |
|0.0       |0       |[0.6148754461988651,0.38512455380113486]|
|0.0       |0       |[0.6230210218089306,0.3769789781910694] |
+----------+--------+----------------------------------------+
only showing top 10 rows

AUC: 0.869817470664928
Accuracy: 0.8032786885245902
Precision: 0.8085851172609426
Recall: 0.

  metrics_df = pd.concat([metrics_df, new_metrics], ignore_index=True)


In [43]:
print(metrics_df)

                 Model       AUC  Accuracy  Precision    Recall        F1
0  Logistic Regression  0.869817  0.803279   0.808585  0.803279  0.805052


In [44]:
#Decision Trees
#A tree-based algorithm that splits data into nodes based on feature values.
#Handles both continuous and categorical data well.

from pyspark.ml.classification import DecisionTreeClassifier

# Create a Decision Tree classifier
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")

# Train the model
dt_model = dt.fit(training)

# Make predictions on the test data
dt_predictions = dt_model.transform(test)

# Show the first few predictions
dt_predictions.select("prediction", "Survived", "probability").show(10, truncate=False)

# Initialize the evaluators
binary_evaluator = BinaryClassificationEvaluator(labelCol="Survived", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction")

# Calculate various metrics
auc = binary_evaluator.evaluate(dt_predictions)
accuracy = multi_evaluator.evaluate(dt_predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(dt_predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(dt_predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(dt_predictions, {multi_evaluator.metricName: "f1"})

# Display the metrics
print(f"AUC: {auc}")
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")

## Display the tree structure as a text representation
#print("Decision Tree Structure:")
#print(dt_model.toDebugString)

# Evaluate the Decision Tree model and add metrics to the DataFrame
metrics_df = evaluate_model("Decision Tree", dt_predictions, metrics_df)

+----------+--------+----------------------------------------+
|prediction|Survived|probability                             |
+----------+--------+----------------------------------------+
|1.0       |0       |[0.0,1.0]                               |
|0.0       |0       |[0.6309523809523809,0.36904761904761907]|
|1.0       |0       |[0.0,1.0]                               |
|0.0       |0       |[0.6309523809523809,0.36904761904761907]|
|0.0       |0       |[1.0,0.0]                               |
|0.0       |0       |[0.6309523809523809,0.36904761904761907]|
|0.0       |0       |[0.6309523809523809,0.36904761904761907]|
|0.0       |0       |[0.6309523809523809,0.36904761904761907]|
|0.0       |0       |[1.0,0.0]                               |
|0.0       |0       |[1.0,0.0]                               |
+----------+--------+----------------------------------------+
only showing top 10 rows

AUC: 0.597457627118644
Accuracy: 0.8360655737704918
Precision: 0.8373564624587633
Recall: 0.

In [45]:
#Random Forest :- Random Forest is an ensemble method that builds multiple decision trees and combines their outputs to make predictions.

from pyspark.ml.classification import RandomForestClassifier
# Create a Random Forest classifier
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features", numTrees=10, seed=6886)

# Train the Random Forest model
rf_model = rf.fit(training)

# Make predictions on the test data
rf_predictions = rf_model.transform(test)

# Display the first few predictions
rf_predictions.select("prediction", "Survived", "probability").show(10, truncate=False)

# Evaluate the Random Forest model
binary_evaluator = BinaryClassificationEvaluator(labelCol="Survived", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction")

# Calculate various metrics
auc = binary_evaluator.evaluate(rf_predictions)
accuracy = multi_evaluator.evaluate(rf_predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(rf_predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(rf_predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(rf_predictions, {multi_evaluator.metricName: "f1"})

# Display metrics
print(f"AUC: {auc}")
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")

# Evaluate the Random Forest model and add metrics to the DataFrame
metrics_df = evaluate_model("Random Forest", rf_predictions, metrics_df)

+----------+--------+----------------------------------------+
|prediction|Survived|probability                             |
+----------+--------+----------------------------------------+
|1.0       |0       |[0.01963504697151308,0.9803649530284868]|
|0.0       |0       |[0.787225384679192,0.21277461532080794] |
|1.0       |0       |[0.01963504697151308,0.9803649530284868]|
|0.0       |0       |[0.5648960697533572,0.4351039302466429] |
|0.0       |0       |[0.7238127258853608,0.2761872741146392] |
|0.0       |0       |[0.6570825326415062,0.3429174673584939] |
|0.0       |0       |[0.6570825326415062,0.3429174673584939] |
|0.0       |0       |[0.649742243157883,0.350257756842117]   |
|0.0       |0       |[0.7238127258853608,0.2761872741146392] |
|0.0       |0       |[0.7238127258853608,0.2761872741146392] |
+----------+--------+----------------------------------------+
only showing top 10 rows

AUC: 0.8922425032594523
Accuracy: 0.8415300546448088
Precision: 0.84102349956376
Recall: 0.8

In [46]:
#Gradient-Boosted Trees (GBT)

#Gradient-Boosted Trees (GBT) is another powerful ensemble method that builds trees sequentially,
#with each tree correcting the errors of the previous one.
#GBTs often perform better on structured data compared to other ensemble methods like Random Forests
#because they focus on improving the model by optimizing errors at each step.


from pyspark.ml.classification import GBTClassifier

# Create a GBT classifier
gbt = GBTClassifier(labelCol="Survived", featuresCol="features", maxIter=100, seed=6886)

# Train the GBT model
gbt_model = gbt.fit(training)

# Make predictions on the test data
gbt_predictions = gbt_model.transform(test)

# Display the first few predictions
gbt_predictions.select("prediction", "Survived", "probability").show(10, truncate=False)

# Evaluate the GBT model
binary_evaluator = BinaryClassificationEvaluator(labelCol="Survived", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction")

# Calculate various metrics
auc = binary_evaluator.evaluate(gbt_predictions)
accuracy = multi_evaluator.evaluate(gbt_predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(gbt_predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(gbt_predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(gbt_predictions, {multi_evaluator.metricName: "f1"})

# Display metrics
print(f"AUC: {auc}")
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")

# Evaluate the Gradient-Boosted Trees (GBT) model and add metrics to the DataFrame
metrics_df = evaluate_model("Gradient-Boosted Trees (GBT)", gbt_predictions, metrics_df)

+----------+--------+-----------------------------------------+
|prediction|Survived|probability                              |
+----------+--------+-----------------------------------------+
|1.0       |0       |[0.01162590203314181,0.9883740979668582] |
|0.0       |0       |[0.9884340892330836,0.011565910766916376]|
|1.0       |0       |[0.008414693093656405,0.9915853069063436]|
|0.0       |0       |[0.666746465303691,0.33325353469630903]  |
|0.0       |0       |[0.982780142300344,0.01721985769965595]  |
|0.0       |0       |[0.5183645102315939,0.4816354897684061]  |
|1.0       |0       |[0.35045966511995436,0.6495403348800457] |
|0.0       |0       |[0.6885987876867928,0.31140121231320717] |
|0.0       |0       |[0.9814212906610913,0.018578709338908705]|
|0.0       |0       |[0.9578157151271403,0.042184284872859745]|
+----------+--------+-----------------------------------------+
only showing top 10 rows

AUC: 0.866362451108214
Accuracy: 0.8032786885245902
Precision: 0.8065465397873

In [47]:
#Support Vector Machine (SVM)
#A powerful classification algorithm that finds the optimal hyperplane to separate classes.
#SVM with linear or non-linear kernels can handle complex decision boundaries.

from pyspark.ml.classification import LinearSVC

# Create an SVM classifier
svm = LinearSVC(labelCol="Survived", featuresCol="features", maxIter=100, regParam=0.01)

# Train the SVM model
svm_model = svm.fit(training)

# Make predictions on the test data
svm_predictions = svm_model.transform(test)

# Display the first few predictions
svm_predictions.select("prediction", "Survived").show(10, truncate=False)

# Evaluate the SVM model
binary_evaluator = BinaryClassificationEvaluator(labelCol="Survived", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction")

# Calculate various metrics
auc = binary_evaluator.evaluate(svm_predictions)
accuracy = multi_evaluator.evaluate(svm_predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(svm_predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(svm_predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(svm_predictions, {multi_evaluator.metricName: "f1"})

# Display metrics
print(f"SVM AUC: {auc}")
print(f"SVM Accuracy: {accuracy}")
print(f"SVM Precision: {precision}")
print(f"SVM Recall: {recall}")
print(f"SVM F1 Score: {f1}")

# Evaluate the Support Vector Machine (SVM) model and add metrics to the DataFrame
metrics_df = evaluate_model("Support Vector Machine (SVM)", svm_predictions, metrics_df)

+----------+--------+
|prediction|Survived|
+----------+--------+
|1.0       |0       |
|0.0       |0       |
|1.0       |0       |
|0.0       |0       |
|0.0       |0       |
|0.0       |0       |
|0.0       |0       |
|0.0       |0       |
|0.0       |0       |
|0.0       |0       |
+----------+--------+
only showing top 10 rows

SVM AUC: 0.8545632333767929
SVM Accuracy: 0.7650273224043715
SVM Precision: 0.7677211297919025
SVM Recall: 0.7650273224043715
SVM F1 Score: 0.7661646905556299


In [48]:
#Naive Bayes
#A probabilistic classifier based on Bayes' theorem.
#Assumes independence among predictors, which might not always be realistic.

from pyspark.ml.classification import NaiveBayes

# Create a Naive Bayes classifier
nb = NaiveBayes(labelCol="Survived", featuresCol="features", modelType="multinomial")

# Train the Naive Bayes model
nb_model = nb.fit(training)

# Make predictions on the test data
nb_predictions = nb_model.transform(test)

# Display the first few predictions
nb_predictions.select("prediction", "Survived", "probability").show(10, truncate=False)

# Evaluate the Naive Bayes model
binary_evaluator = BinaryClassificationEvaluator(labelCol="Survived", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction")

# Calculate various metrics
auc = binary_evaluator.evaluate(nb_predictions)
accuracy = multi_evaluator.evaluate(nb_predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(nb_predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(nb_predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(nb_predictions, {multi_evaluator.metricName: "f1"})

# Display metrics
print(f"Naive Bayes AUC: {auc}")
print(f"Naive Bayes Accuracy: {accuracy}")
print(f"Naive Bayes Precision: {precision}")
print(f"Naive Bayes Recall: {recall}")
print(f"Naive Bayes F1 Score: {f1}")

# Evaluate the Naive Bayes model and add metrics to the DataFrame
metrics_df = evaluate_model("Naive Bayes", nb_predictions, metrics_df)

+----------+--------+------------------------------------------+
|prediction|Survived|probability                               |
+----------+--------+------------------------------------------+
|1.0       |0       |[1.128259445981022E-29,1.0]               |
|1.0       |0       |[7.594338769216139E-45,1.0]               |
|1.0       |0       |[1.6501093114469699E-25,1.0]              |
|1.0       |0       |[2.0083533125622545E-7,0.9999997991646686]|
|0.0       |0       |[0.9999997739206112,2.260793887802203E-7] |
|0.0       |0       |[0.9556734446229252,0.04432655537707473]  |
|1.0       |0       |[0.3053243059704151,0.694675694029585]    |
|1.0       |0       |[3.575316798559091E-23,1.0]               |
|0.0       |0       |[0.9999999953231802,4.676819820050758E-9] |
|0.0       |0       |[0.9999999969178117,3.0821883346062114E-9]|
+----------+--------+------------------------------------------+
only showing top 10 rows

Naive Bayes AUC: 0.5283572359843547
Naive Bayes Accuracy: 0.6994

In [53]:
metrics_df.sort_values(by='AUC', ascending=False).reset_index(drop=True)

Unnamed: 0,Model,AUC,Accuracy,Precision,Recall,F1
0,Random Forest,0.892243,0.84153,0.841023,0.84153,0.84125
1,Logistic Regression,0.869817,0.803279,0.808585,0.803279,0.805052
2,Gradient-Boosted Trees (GBT),0.866362,0.803279,0.806547,0.803279,0.804519
3,Support Vector Machine (SVM),0.854563,0.765027,0.767721,0.765027,0.766165
4,Decision Tree,0.597458,0.836066,0.837356,0.836066,0.836607
5,Naive Bayes,0.528357,0.699454,0.689202,0.699454,0.690602
