In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime
import uuid

# Paths
lake_base = "dbfs:/FileStore/task2_datalake"
raw_path    = f"{lake_base}/raw"
bronze_path = f"{lake_base}/bronze"
silver_path = f"{lake_base}/silver"
gold_path   = f"{lake_base}/gold"

# Ensure directories exist
dbutils.fs.mkdirs(raw_path)
dbutils.fs.mkdirs(bronze_path)
dbutils.fs.mkdirs(silver_path)
dbutils.fs.mkdirs(gold_path)

print("Base paths created (or existed):", lake_base)

Base paths created (or existed): dbfs:/FileStore/task2_datalake


In [0]:
src_base = "dbfs:/FileStore/task1_input_files"

# Copy raw source files into raw layer (keeps an immutable raw copy)
dbutils.fs.cp(f"{src_base}/source1_clinical_notes.jsonl", f"{raw_path}/source1_clinical_notes.jsonl")
dbutils.fs.cp(f"{src_base}/source2_lab_reports.csv", f"{raw_path}/source2_lab_reports.csv")

display(dbutils.fs.ls(raw_path))

path,name,size,modificationTime
dbfs:/FileStore/task2_datalake/raw/source1_clinical_notes.jsonl,source1_clinical_notes.jsonl,81577,1763108073000
dbfs:/FileStore/task2_datalake/raw/source2_lab_reports.csv,source2_lab_reports.csv,14650,1763108073000


In [0]:
# Define Schemas
notes_schema = StructType([
    StructField("patient_id", StringType(), True),
    StructField("patient_name", StringType(), True),
    StructField("note_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("clinical_text", StringType(), True),
    StructField("doctor_name", StringType(), True),
    StructField("hospital", StringType(), True),
    StructField("mrn", StringType(), True)
])

lab_schema = StructType([
    StructField("patient_id", StringType(), True),
    StructField("report_id", StringType(), True),
    StructField("test_name", StringType(), True),
    StructField("result_value", DoubleType(), True),
    StructField("unit", StringType(), True),
    StructField("report_date", StringType(), True),
    StructField("lab_name", StringType(), True),
    StructField("technician", StringType(), True)
])

In [0]:
# Ingest into BRONZE
# Read raw files with schema, do minimal fixes and write to Delta in bronze
notes_bronze = spark.read.schema(notes_schema).json(f"{raw_path}/source1_clinical_notes.jsonl") \
    .withColumn("timestamp", F.to_timestamp("timestamp")) \
    .withColumn("ingested_at", F.current_timestamp()) \
    .withColumn("ingest_batch_id", F.lit(str(uuid.uuid4())))

labs_bronze = spark.read.option("header", True).schema(lab_schema).csv(f"{raw_path}/source2_lab_reports.csv") \
    .withColumn("report_date", F.to_date("report_date", "yyyy-MM-dd")) \
    .withColumn("ingested_at", F.current_timestamp()) \
    .withColumn("ingest_batch_id", F.lit(str(uuid.uuid4())))

# Write as Delta (bronze)
notes_bronze.write.format("delta").mode("overwrite").save(f"{bronze_path}/notes")
labs_bronze.write.format("delta").mode("overwrite").save(f"{bronze_path}/labs")

print("Bronze notes:", spark.read.format("delta").load(f"{bronze_path}/notes").count())
print("Bronze labs :", spark.read.format("delta").load(f"{bronze_path}/labs").count())

Bronze notes: 200
Bronze labs : 200


In [0]:
# Delta versioning
# Show current version info (Delta transaction log)
display(spark.sql(f"DESCRIBE DETAIL delta.`{bronze_path}/notes`"))

# Make a small append/change to create version 1 -> 2 (simulating new load)
sample = notes_bronze.limit(5).withColumn("clinical_text", F.concat(F.col("clinical_text"), F.lit(" [append test]")))
sample.write.format("delta").mode("append").save(f"{bronze_path}/notes")

# Show versions after append
display(spark.sql(f"DESCRIBE HISTORY delta.`{bronze_path}/notes`").limit(10))

# Time travel: read previous version (version 0)
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load(f"{bronze_path}/notes")
print("Rows in version 0:", df_v0.count())

format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics
delta,24aa6a8e-445a-4cd8-9eeb-6b91b766ec90,,,dbfs:/FileStore/task2_datalake/bronze/notes,2025-11-14T08:15:54.107Z,2025-11-14T08:16:01Z,List(),List(),1,18728,Map(),1,2,"List(appendOnly, invariants)",Map()


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2025-11-14T08:20:46Z,1273012412979455,harshinivijayarajan@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> false, partitionBy -> [])",,List(3346931961398007),1114-061524-4w2wzas5,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 5, numOutputBytes -> 4603)",,Databricks-Runtime/15.4.x-scala2.12
0,2025-11-14T08:16:01Z,1273012412979455,harshinivijayarajan@gmail.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [])",,List(3346931961398007),1114-061524-4w2wzas5,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 200, numOutputBytes -> 18728)",,Databricks-Runtime/15.4.x-scala2.12


Rows in version 0: 200


In [0]:
# Load bronze
notes = spark.read.format("delta").load(f"{bronze_path}/notes")
labs = spark.read.format("delta").load(f"{bronze_path}/labs")
import json
# Minimal redaction (use phi_rules.json from task1_input_files)
phi = json.loads(dbutils.fs.head("dbfs:/FileStore/task1_input_files/phi_rules.json", 10000))
patterns = phi["patterns"]
redaction_format = phi["redaction_format"]

def apply_redaction_expr(col_expr):
    expr = col_expr
    for p in patterns:
        expr = F.regexp_replace(expr, p["pattern"], redaction_format.replace("{type}", p["type"].upper()))
    return expr

notes_silver = (notes
                .withColumn("clinical_text", apply_redaction_expr(F.col("clinical_text")))
                .withColumn("patient_name", apply_redaction_expr(F.col("patient_name")))
                .withColumn("doctor_name", apply_redaction_expr(F.col("doctor_name")))
                .withColumn("mrn", apply_redaction_expr(F.col("mrn")))
                .dropDuplicates(["note_id"])
                .withColumn("cleaned_at", F.current_timestamp())
               )

labs_silver = (labs
               .withColumn("lab_name", apply_redaction_expr(F.col("lab_name")))
               .dropDuplicates(["report_id"])
               .withColumn("cleaned_at", F.current_timestamp())
              )

# Data quality example checks: non-null patient_id and timestamp
dq_notes_failed = notes_silver.filter(F.col("patient_id").isNull() | F.col("timestamp").isNull())
dq_labs_failed = labs_silver.filter(F.col("patient_id").isNull() | F.col("report_date").isNull())

# Save silver as Delta
notes_silver.write.format("delta").mode("overwrite").save(f"{silver_path}/notes")
labs_silver.write.format("delta").mode("overwrite").save(f"{silver_path}/labs")

print("DQ notes failed:", dq_notes_failed.count())
print("DQ labs failed:", dq_labs_failed.count())

DQ notes failed: 0
DQ labs failed: 0


In [0]:
# Canonicalize into one QLM-ready table (same approach as Task 1)
notes_q = spark.read.format("delta").load(f"{silver_path}/notes").select(
    "patient_id",
    F.col("note_id").alias("source_record_id"),
    F.col("timestamp").alias("event_timestamp"),
    "clinical_text",
    F.col("doctor_name").alias("doctor_name"),
).withColumn("source_system", F.lit("EHR_Notes")).withColumn("record_type", F.lit("clinical_note"))

labs_q = spark.read.format("delta").load(f"{silver_path}/labs").select(
    "patient_id",
    F.col("report_id").alias("source_record_id"),
    F.col("report_date").alias("event_timestamp"),
    F.concat_ws(" | ", F.col("test_name"), F.col("result_value").cast("string"), F.col("unit")).alias("clinical_text"),
).withColumn("source_system", F.lit("Labs")).withColumn("record_type", F.lit("lab_report"))

qlm_ready = notes_q.unionByName(labs_q, allowMissingColumns=True)

qlm_ready.write.format("delta").mode("overwrite").save(f"{gold_path}/qlm_ready")

# show a count and sample
print("QLM ready rows:", spark.read.format("delta").load(f"{gold_path}/qlm_ready").count())
spark.read.format("delta").load(f"{gold_path}/qlm_ready").limit(5).display()

QLM ready rows: 400


patient_id,source_record_id,event_timestamp,clinical_text,doctor_name,source_system,record_type
P001,CN00001,2024-09-22T00:00:00Z,[REDACTED_PATIENT_NAME]. Presented with chest pain for 5 days. Assessment: chest pain. Plan: Start Aspirin 75mg. Contact: [REDACTED_PHONE_NUMBER]. MRN: [REDACTED_MRN]. [REDACTED_ADDRESS].,[REDACTED_DOCTOR_NAME],EHR_Notes,clinical_note
P002,CN00002,2024-11-03T00:00:00Z,[REDACTED_PATIENT_NAME]. Presented with chest pain for 4 days. Assessment: chest pain. Plan: Start Aspirin 75mg. Contact: [REDACTED_PHONE_NUMBER]. MRN: [REDACTED_MRN]. [REDACTED_ADDRESS].,[REDACTED_DOCTOR_NAME],EHR_Notes,clinical_note
P003,CN00003,2024-01-19T00:00:00Z,[REDACTED_PATIENT_NAME]. Presented with abdominal pain for 6 days. Assessment: abdominal pain. Plan: Start Pantoprazole 40mg. Contact: [REDACTED_PHONE_NUMBER]. MRN: [REDACTED_MRN]. [REDACTED_ADDRESS].,[REDACTED_DOCTOR_NAME],EHR_Notes,clinical_note
P004,CN00004,2024-01-06T00:00:00Z,[REDACTED_PATIENT_NAME]. Presented with abdominal pain for 7 days. Assessment: abdominal pain. Plan: Start Pantoprazole 40mg. Contact: [REDACTED_PHONE_NUMBER]. MRN: [REDACTED_MRN]. [REDACTED_ADDRESS].,[REDACTED_DOCTOR_NAME],EHR_Notes,clinical_note
P005,CN00005,2024-11-28T00:00:00Z,[REDACTED_PATIENT_NAME]. Presented with abdominal pain for 2 days. Assessment: abdominal pain. Plan: Start Pantoprazole 40mg. Contact: [REDACTED_PHONE_NUMBER]. MRN: [REDACTED_MRN]. [REDACTED_ADDRESS].,[REDACTED_DOCTOR_NAME],EHR_Notes,clinical_note


In [0]:
# Create a provenance record for the gold write
provenance_path = f"{lake_base}/provenance"
dbutils.fs.mkdirs(provenance_path)
batch_id = str(uuid.uuid4())
prov = {
    "batch_id": batch_id,
    "created_at": datetime.utcnow().isoformat(),
    "sources": ["source1_clinical_notes.jsonl","source2_lab_reports.csv"],
    "raw_path": raw_path,
    "bronze_path": bronze_path,
    "silver_path": silver_path,
    "gold_path": f"{gold_path}/qlm_ready",
    "dq_issues": {
        "notes_failed": int(dq_notes_failed.count()),
        "labs_failed": int(dq_labs_failed.count())
    }
}
dbutils.fs.put(f"{provenance_path}/provenance_{batch_id}.json", json.dumps(prov, indent=2), True)
print("Provenance saved:", f"{provenance_path}/provenance_{batch_id}.json")

Wrote 486 bytes.
Provenance saved: dbfs:/FileStore/task2_datalake/provenance/provenance_c9e209ee-fcf8-424b-85dc-6b4299effbca.json


In [0]:
# Databricks standard audit logs are enabled by workspace admin and land in cloud storage.
# simulate an audit event by writing a small audit log for operations.

audit_event = {
  "event_id": str(uuid.uuid4()),
  "event_time": datetime.utcnow().isoformat(),
  "user": "notebook_user",
  "action": "write_gold_qlm_ready",
  "target": f"{gold_path}/qlm_ready",
  "details": prov
}

dbutils.fs.put(f"{lake_base}/audit/audit_{audit_event['event_id']}.json", json.dumps(audit_event, indent=2), True)
print("Audit event created:", f"{lake_base}/audit/audit_{audit_event['event_id']}.json")

# Show audit folder
display(dbutils.fs.ls(f"{lake_base}/audit"))

Wrote 757 bytes.
Audit event created: dbfs:/FileStore/task2_datalake/audit/audit_ed8827c8-a2cc-4a06-a767-d3ae1b2fcba0.json


path,name,size,modificationTime
dbfs:/FileStore/task2_datalake/audit/audit_ed8827c8-a2cc-4a06-a767-d3ae1b2fcba0.json,audit_ed8827c8-a2cc-4a06-a767-d3ae1b2fcba0.json,757,1763108821000
