# Patient Churn Prediction ML Training

This notebook demonstrates the end-to-end process for training, evaluating, and logging machine learning models to predict patient churn. It uses PySpark, MLflow, and scikit-learn for feature engineering, model training, and experiment tracking.

##Imports and Setup

Imports all required libraries for Spark ML, MLflow, scikit-learn and pandas. Sets up the environment for data processing and model training.

In [0]:
import mlflow
import mlflow.spark
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from mlflow.models.signature import infer_signature
from sklearn.metrics import classification_report
import pandas as pd
from pyspark.sql.functions import to_date, datediff, lit

##MLflow Configuration

Configures the MLflow experiment path and sets up the UC volume path for model logging. This is crucial for serverless clusters on Databricks.

In [0]:
# 1. Configuration (Crucial for Free Tier Serverless)
tmp_dir = "/Volumes/patient_churn_prediction/log_model/log_model" 
experiment_path = '/Workspace/Users/csridhar.mbbs.ms.mch@gmail.com/patient_churn_analysis'
mlflow.set_experiment(experiment_path)

<Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/3027887952544605', creation_time=1769795308492, experiment_id='3027887952544605', last_update_time=1769840975414, lifecycle_stage='active', name='/Users/csridhar.mbbs.ms.mch@gmail.com/patient_churn_analysis', tags={'mlflow.experiment.sourceName': '/Users/csridhar.mbbs.ms.mch@gmail.com/patient_churn_analysis',
 'mlflow.experimentKind': 'custom_model_development',
 'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
 'mlflow.ownerEmail': 'csridhar.mbbs.ms.mch@gmail.com',
 'mlflow.ownerId': '74335737305723'}>

##Load Bronze Data

Loads the raw patient churn dataset from the UC volume into a Spark DataFrame. This is the starting point for feature engineering.

In [0]:
# 2. Load the Bronze Data we created in Phase 1
bronze_df = spark.read.csv('/Volumes/patient_churn_prediction/source_data/source_data/patient_churn_dataset.csv', header=True, inferSchema=True)
display(bronze_df.limit(10))

PatientID,Age,Gender,State,Tenure_Months,Specialty,Insurance_Type,Visits_Last_Year,Missed_Appointments,Days_Since_Last_Visit,Last_Interaction_Date,Overall_Satisfaction,Wait_Time_Satisfaction,Staff_Satisfaction,Provider_Rating,Avg_Out_Of_Pocket_Cost,Billing_Issues,Portal_Usage,Referrals_Made,Distance_To_Facility_Miles,Churned
C20000,41,Female,PA,62,Pediatrics,Medicaid,1,0,564,2024-07-05,3.5,4.9,3.8,4.2,306,0,0,3,21.4,1
C20001,43,Female,GA,44,Internal Medicine,Self-Pay,7,4,254,2025-05-11,2.6,3.1,4.7,4.3,1851,0,0,0,47.6,1
C20002,21,Male,MI,120,Internal Medicine,Medicaid,15,5,89,2025-10-23,1.6,4.4,2.1,4.7,391,0,0,2,7.1,0
C20003,65,Male,FL,118,General Practice,Private,10,3,135,2025-09-07,2.6,4.3,4.3,4.9,808,0,0,0,11.6,1
C20004,18,Female,CA,70,Cardiology,Medicaid,5,4,696,2024-02-24,2.2,4.0,4.1,4.4,866,0,0,0,10.3,1
C20005,65,Male,MI,82,Orthopedics,Self-Pay,15,1,629,2024-05-01,1.7,4.9,2.3,4.6,1548,0,1,1,5.6,0
C20006,82,Female,FL,56,Pediatrics,Medicare,10,1,520,2024-08-18,2.9,4.7,3.2,4.5,423,0,1,2,15.8,1
C20007,66,Male,MI,22,General Practice,Private,7,0,178,2025-07-26,2.7,4.7,2.3,4.0,1771,0,0,1,32.1,1
C20008,18,Female,FL,39,Orthopedics,Medicare,5,4,492,2024-09-15,2.9,2.2,2.9,3.3,122,0,0,2,37.7,1
C20009,47,Male,NY,15,Internal Medicine,Self-Pay,2,2,298,2025-03-28,4.4,1.5,2.2,4.2,1685,0,1,2,12.9,0


##Feature Engineering - Days Since Interaction

Adds a new column 'Days_since_interacted' to the DataFrame, calculating the number of days since the patient's last interaction. Uses the current date for calculation.

In [0]:
# Use the correct column name: 'Last_Interaction_Date'
bronze_df = bronze_df.withColumn(
    "Days_since_interacted",
    datediff(lit("2026-01-25"), to_date("Last_Interaction_Date", "yyyy-MM-dd"))
)

display(bronze_df.limit(10))

PatientID,Age,Gender,State,Tenure_Months,Specialty,Insurance_Type,Visits_Last_Year,Missed_Appointments,Days_Since_Last_Visit,Last_Interaction_Date,Overall_Satisfaction,Wait_Time_Satisfaction,Staff_Satisfaction,Provider_Rating,Avg_Out_Of_Pocket_Cost,Billing_Issues,Portal_Usage,Referrals_Made,Distance_To_Facility_Miles,Churned,Days_since_interacted
C20000,41,Female,PA,62,Pediatrics,Medicaid,1,0,564,2024-07-05,3.5,4.9,3.8,4.2,306,0,0,3,21.4,1,569
C20001,43,Female,GA,44,Internal Medicine,Self-Pay,7,4,254,2025-05-11,2.6,3.1,4.7,4.3,1851,0,0,0,47.6,1,259
C20002,21,Male,MI,120,Internal Medicine,Medicaid,15,5,89,2025-10-23,1.6,4.4,2.1,4.7,391,0,0,2,7.1,0,94
C20003,65,Male,FL,118,General Practice,Private,10,3,135,2025-09-07,2.6,4.3,4.3,4.9,808,0,0,0,11.6,1,140
C20004,18,Female,CA,70,Cardiology,Medicaid,5,4,696,2024-02-24,2.2,4.0,4.1,4.4,866,0,0,0,10.3,1,701
C20005,65,Male,MI,82,Orthopedics,Self-Pay,15,1,629,2024-05-01,1.7,4.9,2.3,4.6,1548,0,1,1,5.6,0,634
C20006,82,Female,FL,56,Pediatrics,Medicare,10,1,520,2024-08-18,2.9,4.7,3.2,4.5,423,0,1,2,15.8,1,525
C20007,66,Male,MI,22,General Practice,Private,7,0,178,2025-07-26,2.7,4.7,2.3,4.0,1771,0,0,1,32.1,1,183
C20008,18,Female,FL,39,Orthopedics,Medicare,5,4,492,2024-09-15,2.9,2.2,2.9,3.3,122,0,0,2,37.7,1,497
C20009,47,Male,NY,15,Internal Medicine,Self-Pay,2,2,298,2025-03-28,4.4,1.5,2.2,4.2,1685,0,1,2,12.9,0,303


##Feature Engineering Pipeline

Builds a Spark ML pipeline for categorical encoding, numerical scaling, and feature assembly. Splits the data into training and test sets, fits the pipeline, and transforms both sets.

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler

categorical_cols = ["Gender", "State", "Specialty", "Insurance_Type"]

# 1. CATEGORICAL STAGE
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_indexed", handleInvalid="keep") for c in categorical_cols]

# Added handleInvalid="keep" here to match indexers
ohe = OneHotEncoder(inputCols=[f"{c}_indexed" for c in categorical_cols], 
                    outputCols=[f"{c}_ohe" for c in categorical_cols],
                    handleInvalid="keep") 

# 2. NUMERICAL STAGE
# Ensure this list only captures pure numerical inputs
int_cols = [field.name for field in bronze_df.schema.fields 
            if str(field.dataType) in ['IntegerType()', 'DoubleType()', 'LongType()'] 
            and field.name != "Churned" 
            and not field.name.endswith("_indexed")]

num_assembler = VectorAssembler(inputCols=int_cols, outputCol="int_features")
scaler = MinMaxScaler(inputCol="int_features", outputCol="int_features_scaled")

# 3. THE "MASTER" ASSEMBLER
final_inputs = [f"{c}_ohe" for c in categorical_cols] + ["int_features_scaled"]
master_assembler = VectorAssembler(inputCols=final_inputs, outputCol="features")

# 4. DEFINE THE PIPELINE
# This combines the list 'indexers' with the other stages into one flat list
pipeline = Pipeline(stages=indexers + [ohe, num_assembler, scaler, master_assembler])

# 1. Split data first
train_df, test_df = bronze_df.randomSplit([0.7, 0.3], seed=42)

# 2. Fit the pipeline ONLY on training data
pipeline_model = pipeline.fit(train_df)

# 3. Transform both
train_transformed = pipeline_model.transform(train_df)
test_transformed = pipeline_model.transform(test_df)

##Logistic Regression Model Training & Logging

Trains a logistic regression model, evaluates its performance, logs metrics and parameters to MLflow, and registers the model in the MLflow Model Registry. Includes classification reports for train and test sets.

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import classification_report

with mlflow.start_run(run_name="log_regression"):
    lr = LogisticRegression(
        labelCol="Churned", 
        featuresCol="features", 
        maxIter=100, 
        regParam=0.01, 
        elasticNetParam=0.5, 
        tol=1e-6,
    )
    model_lr = lr.fit(train_transformed)

    # Log hyperparameters
    mlflow.log_param("maxIter", 100)
    mlflow.log_param("regParam", 0.01)
    mlflow.log_param("elasticNetParam", 0.5)
    mlflow.log_param("tol", 1e-6)
    mlflow.log_param("labelCol", "Churned")
    mlflow.log_param("featuresCol", "features")

    # Predict and evaluate on test data
    test_pred = model_lr.transform(test_transformed)
    acc_test = MulticlassClassificationEvaluator(labelCol="Churned", metricName="accuracy").evaluate(test_pred)
    mlflow.log_metric("test_accuracy", acc_test)

    # Predict and evaluate on train data
    train_pred = model_lr.transform(train_transformed)
    acc_train = MulticlassClassificationEvaluator(labelCol="Churned", metricName="accuracy").evaluate(train_pred)
    mlflow.log_metric("train_accuracy", acc_train)

    # Classification report for train data
    y_true_train = train_pred.select("Churned").toPandas().values.flatten()
    y_pred_train = train_pred.select("prediction").toPandas().values.flatten()
    report_train = classification_report(y_true_train, y_pred_train)
    mlflow.log_text(report_train, "classification_report_train.txt")

    # Classification report for test data
    y_true_test = test_pred.select("Churned").toPandas().values.flatten()
    y_pred_test = test_pred.select("prediction").toPandas().values.flatten()
    report_test = classification_report(y_true_test, y_pred_test)
    mlflow.log_text(report_test, "classification_report_test.txt")

    # Infer model signature
    signature = infer_signature(train_transformed.toPandas(), train_pred.select("prediction").toPandas())

    final_pipeline_model_lr = PipelineModel(stages=pipeline_model.stages + [model_lr])

    # Log and register the WHOLE pipeline
    mlflow.spark.log_model(
        spark_model=final_pipeline_model_lr, 
        artifact_path="model", 
        signature=signature, 
        registered_model_name="patient_churn_prediction.default.patient_churn_logreg",
        dfs_tmpdir=tmp_dir
    )   

   

    display(f"Train Accuracy: {acc_train}")
    display(f"Test Accuracy: {acc_test}")
    print("Train Classification Report:")
    print(report_train)
    print("\nTest Classification Report:")
    print(report_test)



Uploading artifacts:   0%|          | 0/76 [00:00<?, ?it/s]

Registered model 'patient_churn_prediction.default.patient_churn_logreg' already exists. Creating a new version of this model...


Uploading artifacts:   0%|          | 0/76 [00:00<?, ?it/s]

Created version '5' of model 'patient_churn_prediction.default.patient_churn_logreg'.


'Train Accuracy: 0.6805359661495064'

'Test Accuracy: 0.7027491408934707'

Train Classification Report:
              precision    recall  f1-score   support

           0       0.53      0.13      0.21       459
           1       0.69      0.94      0.80       959

    accuracy                           0.68      1418
   macro avg       0.61      0.54      0.51      1418
weighted avg       0.64      0.68      0.61      1418


Test Classification Report:
              precision    recall  f1-score   support

           0       0.51      0.12      0.20       174
           1       0.72      0.95      0.82       408

    accuracy                           0.70       582
   macro avg       0.61      0.54      0.51       582
weighted avg       0.66      0.70      0.63       582



##Random Forest Model Training & Logging

Trains a random forest classifier, evaluates its performance, logs metrics and parameters to MLflow, and registers the model in the MLflow Model Registry. Includes classification reports for train and test sets.

In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import PipelineModel

with mlflow.start_run(run_name="random_forest_classifier"):
    rf = RandomForestClassifier(
        labelCol="Churned",
        featuresCol="features",
        numTrees=100,
        maxDepth=5,
        seed=42
    )
    model_rf = rf.fit(train_transformed)

    # Log hyperparameters
    mlflow.log_param("numTrees", 100)
    mlflow.log_param("maxDepth", 5)
    mlflow.log_param("seed", 42)
    mlflow.log_param("labelCol", "Churned")
    mlflow.log_param("featuresCol", "features")

    # Predict and evaluate on test data
    test_pred_rf = model_rf.transform(test_transformed)
    acc_test_rf = MulticlassClassificationEvaluator(labelCol="Churned", metricName="accuracy").evaluate(test_pred_rf)
    mlflow.log_metric("test_accuracy_rf", acc_test_rf)

    # Predict and evaluate on train data
    train_pred_rf = model_rf.transform(train_transformed)
    acc_train_rf = MulticlassClassificationEvaluator(labelCol="Churned", metricName="accuracy").evaluate(train_pred_rf)
    mlflow.log_metric("train_accuracy_rf", acc_train_rf)

    # Classification report for train data
    y_true_train_rf = train_pred_rf.select("Churned").toPandas().values.flatten()
    y_pred_train_rf = train_pred_rf.select("prediction").toPandas().values.flatten()
    report_train_rf = classification_report(y_true_train_rf, y_pred_train_rf)
    mlflow.log_text(report_train_rf, "classification_report_train_rf.txt")

    # Classification report for test data
    y_true_test_rf = test_pred_rf.select("Churned").toPandas().values.flatten()
    y_pred_test_rf = test_pred_rf.select("prediction").toPandas().values.flatten()
    report_test_rf = classification_report(y_true_test_rf, y_pred_test_rf)
    mlflow.log_text(report_test_rf, "classification_report_test_rf.txt")

    # Infer model signature
    signature_rf = infer_signature(train_transformed.toPandas(), train_pred_rf.select("prediction").toPandas())

    final_pipeline_model_rf = PipelineModel(stages=pipeline_model.stages + [model_rf])

    # Log and register the WHOLE pipeline
    mlflow.spark.log_model(
        spark_model=final_pipeline_model_rf,
        artifact_path="model_rf",
        signature=signature_rf,
        registered_model_name="patient_churn_prediction.default.patient_churn_rf",
        dfs_tmpdir=tmp_dir
    )

    display(f"Train Accuracy (RF): {acc_train_rf}")
    display(f"Test Accuracy (RF): {acc_test_rf}")
    print("Train Classification Report (RF):")
    print(report_train_rf)
    print("\nTest Classification Report (RF):")
    print(report_test_rf)



Uploading artifacts:   0%|          | 0/80 [00:00<?, ?it/s]

Registered model 'patient_churn_prediction.default.patient_churn_rf' already exists. Creating a new version of this model...


Uploading artifacts:   0%|          | 0/80 [00:00<?, ?it/s]

Created version '5' of model 'patient_churn_prediction.default.patient_churn_rf'.


'Train Accuracy (RF): 0.685472496473907'

'Test Accuracy (RF): 0.7010309278350515'

Train Classification Report (RF):
              precision    recall  f1-score   support

           0       1.00      0.03      0.06       459
           1       0.68      1.00      0.81       959

    accuracy                           0.69      1418
   macro avg       0.84      0.51      0.43      1418
weighted avg       0.79      0.69      0.57      1418


Test Classification Report (RF):
              precision    recall  f1-score   support

           0       0.50      0.01      0.01       174
           1       0.70      1.00      0.82       408

    accuracy                           0.70       582
   macro avg       0.60      0.50      0.42       582
weighted avg       0.64      0.70      0.58       582

