In [0]:
# 1. Load raw data from the mounted path
df = spark.read.csv("/mnt/delta/patient_data.csv", header=True, inferSchema=True)

# 2. Data cleaning
from pyspark.sql.functions import col, to_date

df_clean = df.dropDuplicates(["patient_id"]) \
             .na.drop(subset=["patient_id", "dob", "diagnosis"]) \
             .withColumn("dob", to_date(col("dob"))) \
             .withColumn("last_visit", to_date(col("last_visit")))

# 3. Add derived features
from pyspark.sql.functions import current_date, datediff

df_final = df_clean.withColumn("age", datediff(current_date(), col("dob")) / 365)



In [0]:
# Step 3: Validate the data
from pyspark.sql.functions import count, when

df_final.select([count(when(col(c).isNull(), c)).alias(c) for c in df_final.columns]).show()
df_final.groupBy("patient_id").count().filter("count > 1").show()

+----------+----+---+------+---------+----------+----------+---+
|patient_id|name|dob|gender|diagnosis|last_visit|readmitted|age|
+----------+----+---+------+---------+----------+----------+---+
|         0|   0|  0|     0|        0|         0|         0|  0|
+----------+----+---+------+---------+----------+----------+---+

+----------+-----+
|patient_id|count|
+----------+-----+
+----------+-----+



In [0]:
pdf = df_final.select("age", "gender", "readmitted").dropna().toPandas()

from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

pdf["gender"] = LabelEncoder().fit_transform(pdf["gender"])
X = pdf[["age", "gender"]]
y = pdf["readmitted"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier().fit(X_train, y_train)

Uploading artifacts:   0%|          | 0/3 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

In [0]:
import mlflow.sklearn
mlflow.sklearn.log_model(model, "rf_model")



Uploading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]



<mlflow.models.model.ModelInfo at 0x750138064470>

In [0]:
# On the cluster library page, install:
# - snowflake-jdbc
# - spark-snowflake_2.12


In [0]:
sfOptions = {
  "sfURL"       : "FFDUASY-II65737.snowflakecomputing.com",  # Replace with your account URL
  "sfUser"      : "kieshore",
  "sfPassword"  : "Borntowin12345",
  "sfDatabase"  : "HEALTHCARE_DB",
  "sfSchema"    : "PUBLIC",
  "sfWarehouse" : "COMPUTE_WH",
  "sfRole"      : "ACCOUNTADMIN"  # or another role you use
}


In [0]:
df_final.write \
  .format("snowflake") \
  .options(**sfOptions) \
  .option("dbtable", "HEALTHCARE_DB.PUBLIC.CLEANED_PATIENTS") \
  .mode("overwrite") \
  .save()




In [0]:
from datetime import datetime

log_df = spark.createDataFrame([(
  "run_" + datetime.now().strftime("%Y%m%d_%H%M%S"),
  datetime.now(),
  df_final.count(),
  "SUCCESS"
)], ["run_id", "timestamp", "row_count", "status"])

log_df.write \
  .format("snowflake") \
  .options(**sfOptions) \
  .option("dbtable", "ETL_LOGS") \
  .mode("append") \
  .save()