1. Setup and Data Loading


In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when, isnan, regexp_extract
from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline


# Databricks automatically creates a 'spark' session.
# If running locally, uncomment: spark = SparkSession.builder.appName("TitanicML").getOrCreate()

# Load the dataset (Assuming file is uploaded to DBFS or available at path)
# You can upload titanic.csv to Databricks via 'Data' tab -> 'Create Table'
file_path = "/Volumes/workspace/default/titanic/titanic.csv" 

df = spark.read.csv(file_path, header=True, inferSchema=True)

# Display schema and initial data
df.printSchema()
display(df.limit(10))

2. Data Cleaning and Handling Missing Values


In [0]:
# Check for null values in each column
print("Missing values per column:")

# Define numeric columns for isnan()
numeric_cols = ["PassengerId", "Survived", "Pclass", "Age", "SibSp", "Parch", "Fare"]

missing_exprs = [
    count(when((isnan(c) if c in numeric_cols else False) | col(c).isNull(), c)).alias(c)
    for c in df.columns
]
df.select(missing_exprs).show()

# --- Cleaning Strategy ---
# 1. 'Age': Fill with Mean
# 2. 'Embarked': Fill with Mode (usually 'S')
# 3. 'Cabin': Too many missing, we often drop it or create a binary "HasCabin" feature (covered in Feature Engineering)

# Impute Age with Mean
imputer = Imputer(inputCols=["Age"], outputCols=["Age"]).setStrategy("mean")
df = imputer.fit(df).transform(df)

# Fill Embarked with 'S' (most common) and Fare with 0 (if any nulls)
df = df.fillna({'Embarked': 'S', 'Fare': 0})

print("Missing values after cleaning:")
missing_exprs_cleaned = [
    count(when((isnan(c) if c in numeric_cols else False) | col(c).isNull(), c)).alias(c)
    for c in df.columns
]
df.select(missing_exprs_cleaned).show()

3. Feature Engineering


In [0]:
# Create "FamilySize" (SibSp + Parch + 1)
df = df.withColumn("FamilySize", col("SibSp") + col("Parch") + 1)

# Create "IsAlone" (1 if FamilySize == 1, else 0)
df = df.withColumn("IsAlone", when(col("FamilySize") == 1, 1).otherwise(0))

# Extract Titles from Name (e.g., Mr, Mrs, Miss)
df = df.withColumn("Title", regexp_extract(col("Name"), "([A-Za-z]+)\.", 1))

# Group rare titles into "Rare"
rare_titles = ["Dr", "Rev", "Major", "Col", "Mlle", "Mme", "Ms", "Capt", "Lady", "Jonkheer", "Don", "Countess", "Sir"]
df = df.withColumn("Title", when(col("Title").isin(rare_titles), "Rare").otherwise(col("Title")))

print("Data Cleaning and Engineering Completed in Spark.")

4. Categorical Encoding & Vector Assembly


In [0]:
cat_cols = ["Sex", "Embarked", "Title"]

# Create a list of indexers
indexers = [StringIndexer(inputCol=c, outputCol=c+"_Index") for c in cat_cols]

# Run the indexers using a Pipeline (cleaner way to run multiple steps)
pipeline = Pipeline(stages=indexers)
df_encoded = pipeline.fit(df).transform(df)

# Select only the numerical columns we need for Scikit-Learn
final_cols = ["Survived", "Pclass", "Age", "Fare", "FamilySize", "IsAlone", "Sex_Index", "Embarked_Index", "Title_Index"]
df_final = df_encoded.select(final_cols)

display(df_final.limit(5))

# Convert PySpark DataFrame to Pandas DataFrame
pdf = df_final.toPandas()

# Check the data types to ensure they are float/int suitable for sklearn
print(pdf.info())

# Separate X (Features) and y (Target)
X = pdf.drop("Survived", axis=1)
y = pdf["Survived"]

5. Train/Test Split & Model Building


In [0]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

# Split Data (80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize Random Forest
rf_model = RandomForestClassifier(n_estimators=100, max_depth=5, random_state=42)

# Train the model
rf_model.fit(X_train, y_train)

print("Model Trained using Scikit-Learn.")

6. Model Evaluation


In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# Make Predictions
y_pred = rf_model.predict(X_test)

# Calculate Accuracy
acc = accuracy_score(y_test, y_pred)
print(f"Model Accuracy: {acc:.2f}")

# Print Classification Report
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

# Plot Confusion Matrix
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(6,4))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show() # In Databricks, this will render the plot inline

In [0]:
import sqlite3
 
db_path = "/Volumes/workspace/digital_twin/digital-twin/digital_twin.db"
conn = sqlite3.connect(db_path)
tables_df = spark.createDataFrame(conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall(), ["table_name"])
display(tables_df)
conn.close()
 