In [1]:
# Attempt to import PySpark
try:
    import pyspark
    print("PySpark already installed")
except ImportError:
    print("Installing Pyspark...")
    !pip install pyspark --quiet
    import pyspark

Installing Pyspark...
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
# Import necessary libraries
import pandas as pd
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [3]:
# Function to extract titles from names
def extract_title(name):
    title_search = re.search(' ([A-Za-z]+)\.', name)
    if title_search:
        return title_search.group(1)
    return ""

In [4]:
# Load csv from lin source
url = "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv"
data = pd.read_csv(url)
data.to_csv('titanic.csv', index=False)

In [5]:
# Start Spark session
spark = SparkSession.builder.appName('Titanic').getOrCreate()

# Load the CSV file into a Spark DataFrame
df = spark.read.csv('titanic.csv', inferSchema=True, header=True)

# Register a User Defined Function (UDF) to extract titles
extract_title_udf = udf(extract_title, StringType())

# Add a new column 'Gelar' (title) to the DataFrame
df = df.withColumn('Gelar', extract_title_udf(df['Name']))

# Display the DataFrame
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Gelar|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|    Mr|
|          6|       0|  

In [6]:
# Columns selection
rm_columns = df.select(['Survived','Pclass','Sex','Age','SibSp','Parch','Fare','Embarked','Gelar'])

# Drop rows with null values
result = rm_columns.na.drop()

# Showing the data again
result.show()

+--------+------+------+----+-----+-----+-------+--------+------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked| Gelar|
+--------+------+------+----+-----+-----+-------+--------+------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|    Mr|
|       1|     1|female|38.0|    1|    0|71.2833|       C|   Mrs|
|       1|     3|female|26.0|    0|    0|  7.925|       S|  Miss|
|       1|     1|female|35.0|    1|    0|   53.1|       S|   Mrs|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|    Mr|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|    Mr|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|Master|
|       1|     3|female|27.0|    0|    2|11.1333|       S|   Mrs|
|       1|     2|female|14.0|    1|    0|30.0708|       C|   Mrs|
|       1|     3|female| 4.0|    1|    1|   16.7|       S|  Miss|
|       1|     1|female|58.0|    0|    0|  26.55|       S|  Miss|
|       0|     3|  male|20.0|    0|    0|   8.05|       S|    Mr|
|       0|

In [7]:
# String indexing and one-hot encoding for categorical columns
sex_indexer = StringIndexer(inputCol='Sex', outputCol='SexIndex', handleInvalid='keep')
sex_encoder = OneHotEncoder(inputCol='SexIndex', outputCol='SexVec')

embark_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkIndex', handleInvalid='keep')
embark_encoder = OneHotEncoder(inputCol='EmbarkIndex', outputCol='EmbarkVec')

title_indexer = StringIndexer(inputCol='Gelar', outputCol='GelarIndex', handleInvalid='keep')
title_encoder = OneHotEncoder(inputCol='GelarIndex', outputCol='GelarVec')

In [8]:
# Assemble feature columns into a single vector
assembler = VectorAssembler(
    inputCols=['Pclass', 'SexVec', 'Age', 'SibSp', 'Parch', 'Fare', 'EmbarkVec', 'GelarVec'],
    outputCol='features',
    handleInvalid='keep'
)

In [25]:
# Define Logistic Regression
log_reg = LogisticRegression(featuresCol='features', labelCol='Survived')

# Create a Pipeline with the defined stages
pipeline = Pipeline(stages=[sex_indexer, embark_indexer, title_indexer, sex_encoder, embark_encoder, title_encoder, assembler, log_reg])

In [26]:
# Split the data into training and testing sets
train_data, test_data = result.randomSplit([0.7, 0.3])

In [27]:
# Train the model using the pipeline
fit_model = pipeline.fit(train_data)

In [28]:
# Testing dataset
results = fit_model.transform(test_data)

In [29]:
# Display the predictions
results.show()

+--------+------+------+----+-----+-----+--------+--------+-----+--------+-----------+----------+-------------+-------------+--------------+--------------------+--------------------+--------------------+----------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|    Fare|Embarked|Gelar|SexIndex|EmbarkIndex|GelarIndex|       SexVec|    EmbarkVec|      GelarVec|            features|       rawPrediction|         probability|prediction|
+--------+------+------+----+-----+-----+--------+--------+-----+--------+-----------+----------+-------------+-------------+--------------+--------------------+--------------------+--------------------+----------+
|       0|     1|female| 2.0|    1|    2|  151.55|       S| Miss|     1.0|        0.0|       1.0|(2,[1],[1.0])|(3,[0],[1.0])|(15,[1],[1.0])|(25,[0,2,3,4,5,6,...|[-2.9632569912178...|[0.04911367606714...|       1.0|
|       0|     1|  male|22.0|    0|    0|135.6333|       C|   Mr|     0.0|        1.0|       0.0|(2,[0],[1.0])|(3,[1],[1.0])|(15,[0],[1.0])|

In [30]:
# Select and display the relevant prediction columns
results.select('Survived', 'prediction', 'probability').show()

+--------+----------+--------------------+
|Survived|prediction|         probability|
+--------+----------+--------------------+
|       0|       1.0|[0.04911367606714...|
|       0|       1.0|[0.24682183840674...|
|       0|       1.0|[0.26914216683097...|
|       0|       0.0|[0.61194555032234...|
|       0|       1.0|[0.46351800019210...|
|       0|       0.0|[0.66047755387780...|
|       0|       1.0|[0.49147537968822...|
|       0|       0.0|[0.77930494392704...|
|       0|       0.0|[0.66267872691368...|
|       0|       0.0|[0.66798271784710...|
|       0|       0.0|[0.82523166124157...|
|       0|       0.0|[0.63787343141217...|
|       0|       1.0|[6.43624665902153...|
|       0|       0.0|[0.74858845227065...|
|       0|       0.0|[0.77817360995309...|
|       0|       0.0|[0.99998053049225...|
|       0|       0.0|[0.73066174633695...|
|       0|       1.0|[0.15171167944894...|
|       0|       1.0|[0.17308490653438...|
|       0|       0.0|[0.72375829723486...|
+--------+-

In [31]:
# Define Binary Classification Evaluator
evaluator  = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Survived')

In [32]:
# Calculate and display the ROC AUC metric
ROC_AUC = evaluator.evaluate(results)
print(f"ROC AUC: {ROC_AUC}")

ROC AUC: 0.8068456966762051
