# **Deep Care - Mining Hospital Records for Predicting Patient Drop-off**

#README

 Setting the PySpark Environment following executions have been done -

▶ !apt-get install openjdk-8-jdk-headless -qq > /dev/null

▶ !pip install pyspark

▶ os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [None]:
import os                                                                                                                 # For working with file paths
from pyspark.sql import SparkSession                                                                                      # To create and manage a SparkSession
from pyspark.sql.functions import col, when, mean, stddev, rand, udf                                                      # Common DataFrame functions for data manipulation
from pyspark.ml.feature import StringIndexer, Imputer, OneHotEncoder, StandardScaler, VectorAssembler                     # To convert categorical string columns into numerical indices
from pyspark.sql.functions import array                                                                                   # To work with arrays in DataFrames
from pyspark.ml.clustering import KMeans                                                                                  # KMeans clustering algorithm from MLlib
from pyspark.ml.evaluation import ClusteringEvaluator, BinaryClassificationEvaluator, MulticlassClassificationEvaluator   # For evaluating clustering models
from pyspark.sql.functions import col, count, countDistinct, when, max as spark_max, to_timestamp                         # Compute maximum value; aliased as spark_max to avoid conflicts with built-in max()
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier                           # Import classification models
from pyspark.ml.linalg import VectorUDT                                                                                   # Import for vector handling in custom UDFs (User Defined Functions)
from pyspark.sql.types import DoubleType                                                                                  # Import DoubleType for defining column types in Spark DataFrames

In [None]:
patient_demographics_path = "/content/datasets/patient_demographics.csv"                  #file path for patient_demographics dataset
patient_visits_path = "/content/datasets/patient_visits.csv"                              #file path for patient_visits dataset
hospital_path = "/content/datasets/hospital_logs.xml"                                     #file path for hospital_path dataset

**Creating spark session**

In [None]:
def create_spark_session(app_name="BigDataPipeline", packages="com.databricks:spark-xml_2.12:0.15.0"):
    spark = SparkSession.builder \
        .appName(app_name) \
        .config("spark.jars.packages", packages) \
        .getOrCreate()
    return spark

**Data Preprocessing for Patient Demographics Dataset**

In [None]:
def preprocess_demographics(spark, patient_demographics_path):
    # Load demographics data using the specified file path variable
    df_demo = spark.read.csv(patient_demographics_path, header=True, inferSchema=True)

    # Fill numeric nulls: Compute the mean income and replace missing values for "avg_monthly_income"
    income_mean = df_demo.select(mean("avg_monthly_income")).first()[0]
    df_demo = df_demo.fillna({"avg_monthly_income": income_mean})

    # Fill categorical nulls: Replace missing values in specified categorical columns with "Unknown"
    categorical_demo = ["gender", "insurance", "marital_status", "education_level", "employment_status", "language_preference"]
    for c in categorical_demo:
        df_demo = df_demo.fillna({c: "Unknown"})

    # Standardize gender entries to a consistent format
    df_demo = df_demo.withColumn("gender",
        when(col("gender").isin("M", "Male"), "Male")
        .when(col("gender").isin("F", "Female"), "Female")
        .otherwise("Other")
    )

    # Cap the number of chronic conditions at a maximum value of 5
    df_demo = df_demo.withColumn("chronic_conditions", when(col("chronic_conditions") > 5, 5).otherwise(col("chronic_conditions")))

    return df_demo

In [None]:
spark = create_spark_session()
df_demo = preprocess_demographics(spark, patient_demographics_path)
df_demo.show(5)

+----------+---+------+-------+---------+--------------+---------------+-----------------+------------------+--------------+-------------------+------------------+
|patient_id|age|gender|zipcode|insurance|marital_status|education_level|employment_status|chronic_conditions|has_mobile_app|language_preference|avg_monthly_income|
+----------+---+------+-------+---------+--------------+---------------+-----------------+------------------+--------------+-------------------+------------------+
|         1| 51| Other|  49372| Medicare|        Single|       Bachelor|          Student|                 0|           Yes|              Hindi|           20268.7|
|         2| 14|Female|  84681|     None|        Single|    High School|       Unemployed|                 2|           Yes|            English|          15850.09|
|         3| 71|Female|  48588|  Private|       Married|         Master|       Unemployed|                 3|           Yes|            English|          30295.22|
|         4| 60|

**Data Preprocessing for Patient Visits Dataset**

In [None]:
def preprocess_visits(spark, patient_visits_path):
    # Load visits data from the specified file path variable
    df_visits = spark.read.csv(patient_visits_path, header=True, inferSchema=True)

    # Fill nulls for specific columns with default values
    df_visits = df_visits.fillna({
        "total_spent": 0,           # Set missing total spent to 0
        "visit_duration": 0,        # Set missing visit duration to 0
        "visit_type": "Unknown",    # Set missing visit type to "Unknown"
        "department": "Unknown",    # Set missing department to "Unknown"
        "appointment_day": "Unknown", # Set missing appointment day to "Unknown"
        "drop_off": 0               # Set missing drop off values to 0
    })

    # Ensure that the drop_off column is cast to integer type
    df_visits = df_visits.withColumn("drop_off", col("drop_off").cast("int"))

    return df_visits


In [None]:
df_visits = preprocess_visits(spark, patient_visits_path)
df_visits.show(5)

+----------+----------+-----------+---------------+----------+---------------+-----------+------------------+---------------+--------------+------------------+--------------------+--------+
|patient_id|num_visits|total_spent|time_in_waiting|visit_type|appointment_day| department|satisfaction_score|doctor_assigned|visit_duration|prescription_given|followup_recommended|drop_off|
+----------+----------+-----------+---------------+----------+---------------+-----------+------------------+---------------+--------------+------------------+--------------------+--------+
|    266401|         3|     459.34|           23.5| Follow-up|        Tuesday|Dermatology|                 1|           D147|          16.1|               Yes|                  No|       0|
|    306112|         5|     896.87|           37.2|       New|         Friday|      Ortho|                10|           D019|          23.3|               Yes|                  No|       0|
|    277015|         4|    1073.94|           32.1

**Data Processing for Hospital Logs (XML Format)**

In [None]:
def preprocess_logs(spark, hospital_path):
    # Load logs data from an XML file using the specified row tag "log"
    df_logs = spark.read.format("xml") \
        .option("rowTag", "log") \
        .load(hospital_path)

    # Print the schema of the DataFrame to review the structure of the XML data
    df_logs.printSchema()

    # Display the first 5 rows with no truncation of the column values
    df_logs.show(5, truncate=False)

    # Cast "patient_id" column to integer and convert "timestamp" column to a proper timestamp type
    df_logs = df_logs.withColumn("patient_id", col("patient_id").cast("int")) \
                     .withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd HH:mm"))

    return df_logs

In [None]:
df_logs = preprocess_logs(spark, hospital_path)

root
 |-- department: string (nullable = true)
 |-- event: string (nullable = true)
 |-- log_type: string (nullable = true)
 |-- patient_id: long (nullable = true)
 |-- staff_on_duty: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+-----------+------------------+--------+----------+-------------+-------------------+
|department |event             |log_type|patient_id|staff_on_duty|timestamp          |
+-----------+------------------+--------+----------+-------------+-------------------+
|Dermatology|Procedure         |info    |423602    |S030         |2023-04-13 20:27:00|
|Ortho      |Checked-In        |critical|139717    |S088         |2023-05-09 13:04:00|
|Dermatology|Checked-In        |critical|564794    |S054         |2023-11-16 16:42:00|
|Pediatrics |Missed Appointment|info    |643004    |S088         |2023-04-21 12:45:00|
|Ortho      |Procedure         |info    |159674    |S044         |2023-12-05 06:06:00|
+-----------+------------------+--------+---------

**Data Integration: Merging Demographics, Visits, and Logs**

In [None]:
# Load datasets
df_demo = spark.read.csv(patient_demographics_path, header=True, inferSchema=True)
df_visits = spark.read.csv(patient_visits_path, header=True, inferSchema=True)
df_logs = spark.read.format("xml").option("rowTag", "log").load(hospital_path)

# Preprocess XML for merging
df_logs = df_logs.withColumn("patient_id", col("patient_id").cast("int")) \
                 .withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd HH:mm"))

# Create aggregated log features per patient
df_log_features = df_logs.groupBy("patient_id").agg(
    count("*").alias("log_count"),
    count(when(col("log_type") == "critical", True)).alias("critical_logs"),
    countDistinct("event").alias("unique_events")
)

latest_logs = df_logs.groupBy("patient_id").agg(spark_max("timestamp").alias("latest_log_time"))
# Change 'timestamp' to 'latest_log_time' in the join condition
latest_department = df_logs.join(latest_logs, on=["patient_id"], how='inner') \
                           .filter(col("timestamp") == col("latest_log_time")) \
                           .select("patient_id", "department") \
                           .withColumnRenamed("department", "most_recent_department") \
                           .dropDuplicates(["patient_id"])

# Merge log features with department info
df_log_features = df_log_features.join(latest_department, on="patient_id", how="left")

# Final fill for missing department
df_log_features = df_log_features.fillna({
    "log_count": 0,
    "critical_logs": 0,
    "unique_events": 0,
    "most_recent_department": "Unknown"
})

# MERGE ALL

# 1. Merge demographics and visits
df_patient = df_demo.join(df_visits, on="patient_id", how="inner")

# 2. Merge with XML-derived log features
df_final = df_patient.join(df_log_features, on="patient_id", how="left") \
                     .fillna({
                         "log_count": 0,
                         "critical_logs": 0,
                         "unique_events": 0,
                         "most_recent_department": "Unknown"
                     })

# Final dataset ready
df_final.show(5)

+----------+---+------+-------+---------+--------------+---------------+-----------------+------------------+--------------+-------------------+------------------+----------+-----------+---------------+----------+---------------+-----------+------------------+---------------+--------------+------------------+--------------------+--------+---------+-------------+-------------+----------------------+
|patient_id|age|gender|zipcode|insurance|marital_status|education_level|employment_status|chronic_conditions|has_mobile_app|language_preference|avg_monthly_income|num_visits|total_spent|time_in_waiting|visit_type|appointment_day| department|satisfaction_score|doctor_assigned|visit_duration|prescription_given|followup_recommended|drop_off|log_count|critical_logs|unique_events|most_recent_department|
+----------+---+------+-------+---------+--------------+---------------+-----------------+------------------+--------------+-------------------+------------------+----------+-----------+----------

**Saving the final csv file**

In [None]:
df_final.toPandas().to_csv("output.csv", index=False)

**Encoding & Scaling Categorical & Numerical Features respectively**

In [None]:
# Categorical columns
categorical_cols = [
    "gender", "insurance", "marital_status", "education_level",
    "employment_status", "language_preference", "visit_type",
    "appointment_day", "department", "most_recent_department",
    "has_mobile_app"
]

# Encode Categorical Columns
for c in categorical_cols:
    # String Indexing
    indexer = StringIndexer(inputCol=c, outputCol=f"{c}_index", handleInvalid="keep")
    df_final = indexer.fit(df_final).transform(df_final)

    # One-Hot Encoding
    encoder = OneHotEncoder(inputCol=f"{c}_index", outputCol=f"{c}_encoded")
    df_final = encoder.fit(df_final).transform(df_final)

# Scale Numerical Columns Individually
numerical_cols = [
    "age", "chronic_conditions", "avg_monthly_income", "num_visits",
    "total_spent", "time_in_waiting", "visit_duration",
    "satisfaction_score", "log_count", "critical_logs", "unique_events"
]

for c in numerical_cols:
    assembler = VectorAssembler(inputCols=[c], outputCol=f"{c}_vec")
    df_final = assembler.transform(df_final)

    scaler = StandardScaler(inputCol=f"{c}_vec", outputCol=f"{c}_scaled", withMean=True, withStd=True)
    df_final = scaler.fit(df_final).transform(df_final)

# Select Only Required Processed Columns for Modeling/Mining
preprocessed_cols = ["drop_off"] + \
                    [f"{c}_scaled" for c in numerical_cols] + \
                    [f"{c}_encoded" for c in categorical_cols]

df_preprocessed = df_final.select(preprocessed_cols)
df_preprocessed.show(3, truncate=False)


+--------+---------------------+-------------------------+-------------------------+---------------------+---------------------+----------------------+---------------------+-------------------------+--------------------+----------------------+--------------------+--------------+-----------------+----------------------+-----------------------+-------------------------+---------------------------+------------------+-----------------------+------------------+------------------------------+----------------------+
|drop_off|age_scaled           |chronic_conditions_scaled|avg_monthly_income_scaled|num_visits_scaled    |total_spent_scaled   |time_in_waiting_scaled|visit_duration_scaled|satisfaction_score_scaled|log_count_scaled    |critical_logs_scaled  |unique_events_scaled|gender_encoded|insurance_encoded|marital_status_encoded|education_level_encoded|employment_status_encoded|language_preference_encoded|visit_type_encoded|appointment_day_encoded|department_encoded|most_recent_department_en

**Data Mining: Extracting patterns from the scaled & encoded features**

In [None]:
# Columns to include: all scaled numerics + encoded categories
clustering_features = [col for col in df_preprocessed.columns if col.endswith("_scaled") or col.endswith("_encoded")]

# Assemble all into a single features vector for KMeans
vec_assembler = VectorAssembler(inputCols=clustering_features, outputCol="features")
df_cluster_ready = vec_assembler.transform(df_preprocessed)


In [None]:
# Initialize KMeans with k clusters (try 3 first, tune later)
kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=3, seed=42)

# Fit the model
kmeans_model = kmeans.fit(df_cluster_ready)

# Predict cluster assignments
df_clustered = kmeans_model.transform(df_cluster_ready)

# Show sample with clusters
df_clustered.select("drop_off", "cluster").show(10)


+--------+-------+
|drop_off|cluster|
+--------+-------+
|       0|      0|
|       1|      2|
|       0|      0|
|       1|      1|
|       0|      1|
|       0|      0|
|       0|      1|
|       0|      0|
|       0|      1|
|       1|      0|
+--------+-------+
only showing top 10 rows



In [None]:
# Count how many patients are in each cluster
df_clustered.groupBy("cluster").count().show()


+-------+------+
|cluster| count|
+-------+------+
|      1|485061|
|      2|122960|
|      0|191979|
+-------+------+



In [None]:
# Convert 'cluster' into usable numeric column
df_clustered = df_clustered.withColumnRenamed("cluster", "patient_segment")


**Balancing the dataset  for modelling**

In [None]:
# Split by class
df_major = df_clustered.filter(col("drop_off") == 0)
df_minor = df_clustered.filter(col("drop_off") == 1)

# Undersample class 0
df_major_sampled = df_major.sample(withReplacement=False, fraction=24000/559743, seed=42)

# Combine and shuffle
df_balanced = df_major_sampled.union(df_minor).orderBy(rand())


In [None]:
ml_features = [c for c in df_balanced.columns if c.endswith("_scaled") or c.endswith("_encoded")] + ["patient_segment"]

# Drop existing features column if present
df_balanced = df_balanced.drop("features")

# Assemble
assembler = VectorAssembler(inputCols=ml_features, outputCol="features")
df_ml_ready = assembler.transform(df_balanced)


**Splitting & Applying ML Models**

In [None]:
train, test = df_ml_ready.randomSplit([0.7, 0.3], seed=42)


In [None]:
# Disable codegen if using Colab
spark.conf.set("spark.sql.codegen.wholeStage", "false")

auc_eval = BinaryClassificationEvaluator(labelCol="drop_off", metricName="areaUnderROC")
acc_eval = MulticlassClassificationEvaluator(labelCol="drop_off", metricName="accuracy")

results = []

# Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="drop_off")
lr_model = lr.fit(train)
lr_preds = lr_model.transform(test)
results.append(("Logistic Regression", auc_eval.evaluate(lr_preds), acc_eval.evaluate(lr_preds)))

# Random Forest
rf = RandomForestClassifier(featuresCol="features", labelCol="drop_off", numTrees=50)
rf_model = rf.fit(train)
rf_preds = rf_model.transform(test)
results.append(("Random Forest", auc_eval.evaluate(rf_preds), acc_eval.evaluate(rf_preds)))

# GBT
gbt = GBTClassifier(featuresCol="features", labelCol="drop_off", maxIter=50)
gbt_model = gbt.fit(train)
gbt_preds = gbt_model.transform(test)
results.append(("Gradient Boosted Trees", auc_eval.evaluate(gbt_preds), acc_eval.evaluate(gbt_preds)))

# Output
print(" Model Evaluation (Balanced Dataset):")
for name, auc, acc in results:
    print(f"{name:<25} AUC: {auc:.4f}  Accuracy: {acc:.4f}")


 Model Evaluation (Balanced Dataset):
Logistic Regression       AUC: 0.5022  Accuracy: 0.9104
Random Forest             AUC: 0.5100  Accuracy: 0.9104
Gradient Boosted Trees    AUC: 0.5675  Accuracy: 0.9105


**Predictions**

In [None]:
# Extract only the probability of class 1
@udf(returnType=DoubleType())
def extract_prob(prob_vector):
    return float(prob_vector[1])  # probability of drop_off = 1

# Add column with extracted probability
gbt_preds_export = gbt_preds.withColumn("prob_dropoff", extract_prob("probability"))

# Select relevant columns
final_preds = gbt_preds_export.select("drop_off", "prediction", "prob_dropoff", "patient_segment")

# Export to CSV
final_preds.write.csv("drop_off_predictions.csv", header=True, mode="overwrite")
