## Model Building

### Import Required Libraries 

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import (
    LogisticRegression, DecisionTreeClassifier, RandomForestClassifier,
    GBTClassifier, NaiveBayes, LinearSVC
)
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score, f1_score, roc_curve, auc
import matplotlib.pyplot as plt
import seaborn as sns

In [3]:
# Start spark session
spark = SparkSession.builder.appName("diabetes_readmission").getOrCreate()

In [4]:
# Create a spark dataframe
df = spark.read.csv("datasets/diabetes_cleaned_onehot.csv", header=True, inferSchema=True)

### Prepare data

In [5]:
# Verify schema
df.printSchema()

root
 |-- admission_type_id: integer (nullable = true)
 |-- discharge_disposition_id: integer (nullable = true)
 |-- admission_source_id: integer (nullable = true)
 |-- time_in_hospital: integer (nullable = true)
 |-- num_lab_procedures: integer (nullable = true)
 |-- num_procedures: integer (nullable = true)
 |-- num_medications: integer (nullable = true)
 |-- number_outpatient: integer (nullable = true)
 |-- number_emergency: integer (nullable = true)
 |-- number_inpatient: integer (nullable = true)
 |-- number_diagnoses: integer (nullable = true)
 |-- readmitted: string (nullable = true)
 |-- age_num: integer (nullable = true)
 |-- race_AfricanAmerican: boolean (nullable = true)
 |-- race_Asian: boolean (nullable = true)
 |-- race_Caucasian: boolean (nullable = true)
 |-- race_Hispanic: boolean (nullable = true)
 |-- race_Other: boolean (nullable = true)
 |-- race_Unknown: boolean (nullable = true)
 |-- gender_Female: boolean (nullable = true)
 |-- gender_Male: boolean (nullable = t

In [6]:
# Show first 5 rows
df.show(5)

+-----------------+------------------------+-------------------+----------------+------------------+--------------+---------------+-----------------+----------------+----------------+----------------+----------+-------+--------------------+----------+--------------+-------------+----------+------------+-------------+-----------+----------------------+-----------+-----------+-----------+-----------+-----------+-----------+------------------+------------------+------------------+---------------------+------------+------------+--------------+-----------------+--------------+------------+----------------+------------+----------------+--------------+------------------+--------------+----------------+--------------+------------------+--------------+-----------------+---------------------+-----------------+----------------+--------------+------------------+--------------+----------------+--------------------+--------------+------------+----------------+------------+--------------+------------

In [16]:
# Create Feature Columns
target_col = "readmitted_label"
features = [col for col in df.columns if col != target_col]

### Assemble Features

In [17]:
# Create a Vector Assembler
assembler = VectorAssembler(inputCols=features, outputCol="features_vector")

### Split Data into Training and Test sets

In [18]:
# Split data into two parts
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

### Initialize and Configure Models

In [20]:
# Define models
lr = LogisticRegression(featuresCol="features_vector", labelCol="readmitted_label")
rf = RandomForestClassifier(featuresCol="features_vector", labelCol="readmitted_label")
gbt = GBTClassifier(featuresCol="features_vector", labelCol="readmitted_label")
dt = DecisionTreeClassifier(featuresCol="features_vector", labelCol="readmitted_label")

In [None]:
# Create pipeline
models = {
    "LogisticRegression":Pipeline(stages=[assembler, lr]),
    "RandomForestClassifier":Pipeline(stages=[assembler, rf]),
    "GBTClassifier":Pipeline(stages=+[assembler, gbt]),
    "DecisionTreeClassifier":Pipeline(stages=+[assembler, dt])
}

### Train & Evaluate Models

In [None]:
# Initialize evaluator for AUC (PySpark)
auc_evaluator = BinaryClassificationEvaluator(labelCol="readmitted_binary", metricName="areaUnderROC")

# Create empty results
results = []

for name, model in models.items():
    # Build pipeline and fit
    pipeline = Pipeline(stages=[assembler, model])
    pipeline_model = pipeline.fit(train_data)
    predictions = pipeline_model.transform(test_data)

    # AUC using Spark
    auc = auc_evaluator.evaluate(predictions)
    
    # Convert predictions to pandas for sklearn metrics
    preds_pd = predictions.select("readmitted_binary", "prediction").toPandas()

    acc = accuracy_score(preds_pd["readmitted_binary"], preds_pd["prediction"])
    f1 = f1_score(preds_pd["readmitted_binary"], preds_pd["prediction"])

    results.append({
        "Model": name,
        "AUC": round(auc, 4),
        "Accuracy": round(acc, 4),
        "F1 Score": round(f1, 4)
    })

### Compare Model Performance


Summarize AUC / Accuracy / F1 for each model in a single table.

In [None]:
# Create summary table
results_df = pd.DataFrame(results).sort_values(by="AUC", ascending=False)
results_df.reset_index(drop=True, inplace=True)

# Display the table
print(results_df)

### Visualize Model Performance

#### Confusion Matrix

In [None]:

# Convert predictions to pandas
preds_pd = predictions.select("readmitted_binary", "prediction").toPandas()

# Generate confusion matrix
cm = confusion_matrix(preds_pd["readmitted_binary"], preds_pd["prediction"])

# Plot using seaborn
plt.figure(figsize=(5, 4))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Not Readmitted', 'Readmitted'], yticklabels=['Not Readmitted', 'Readmitted'])
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.tight_layout()
plt.show()


#### ROC Curve

In [None]:
# Extract probability of class 1 (readmitted)
probs_labels = predictions.select("probability", "readmitted_binary").collect()
probs = [row["probability"][1] for row in probs_labels]
labels = [row["readmitted_binary"] for row in probs_labels]

fpr, tpr, _ = roc_curve(labels, probs)
roc_auc = auc(fpr, tpr)

plt.plot(fpr, tpr, label=f"AUC = {roc_auc:.4f}")
plt.plot([0, 1], [0, 1], "k--")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend(loc="lower right")
plt.grid(True)
plt.show()

#### Analyze Feature Importance

Visualize which features had the strongest influence on the model’s decisions.


#### Tune Hyperparameters

Use cross-validation and grid search to optimize model parameters.


### Interpret Results

### Save PySpark Model Pipeline