#🔹 Step 1 — Configure Secure Access to ADLS via Service Principal (OAuth)

This step configures secure access to Azure Data Lake Storage (ADLS Gen2) using a Service Principal (Client ID, Tenant ID, and Secret) stored in Azure Key Vault.

In [0]:
# attach this notebook to your cluster before running
storage_account = "adlsclinicalpoc"  # exact storage account name


# secrets from your scope  .
client_id     = dbutils.secrets.get("clinical-keyvault-scope", "adls-client-id")
tenant_id     = dbutils.secrets.get("clinical-keyvault-scope", "adls-tenant-id")
client_secret = dbutils.secrets.get("clinical-keyvault-scope", "adls-client-secret")

# ABFS OAuth configs (session-scoped)
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
               f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")



##🔹 Step 2 — Define and Validate RAW → SILVER Data Paths in ADLS (ABFS)

This step sets up the Azure Data Lake Storage (ADLS) paths for both RAW inputs and SILVER outputs using ABFS URIs (no mounts).
It also performs a sanity check to confirm that your paths and permissions are valid, and previews a few rows from each dataset to verify schema and data integrity before transformation.

In [0]:
 # --- inputs: set your container names (adjust if needed)
storage_account="adlsclinicalpoc"
container_bronze    = "bronze"
container_silver = "silver"

# --- ABFS paths to RAW inputs (no mounts!)
BRONZE_SUBJECTS_TRIALS_PATH = f"abfss://{container_bronze}@{storage_account}.dfs.core.windows.net/db_trial_subjects/"
BRONZE_LABS_RESULTS_PATH     = f"abfss://{container_bronze}@{storage_account}.dfs.core.windows.net/adls_lab_results/"
BRONZE_API_ADVERSE_EVENTS_PATH       = f"abfss://{container_bronze}@{storage_account}.dfs.core.windows.net/api_adverse_events/"

# --- ABFS paths to SILVER outputs
SILVER_SUBJECTS_PATH = f"abfss://{container_silver}@{storage_account}.dfs.core.windows.net/trial_subjects/"
SILVER_LABS_PATH     = f"abfss://{container_silver}@{storage_account}.dfs.core.windows.net/lab_results/"
SILVER_AE_PATH       = f"abfss://{container_silver}@{storage_account}.dfs.core.windows.net/adverse_events/"

# --- Sanity check: list RAW folders (confirms path + permissions)
display(dbutils.fs.ls(BRONZE_SUBJECTS_TRIALS_PATH))
display(dbutils.fs.ls(BRONZE_LABS_RESULTS_PATH))
display(dbutils.fs.ls(BRONZE_API_ADVERSE_EVENTS_PATH))

# --- Peek a few rows from each source to confirm format/columns
subjects_sample = spark.read.option("header", True).csv(BRONZE_SUBJECTS_TRIALS_PATH)
labs_sample     = spark.read.option("header", True).csv(BRONZE_LABS_RESULTS_PATH)
ae_sample       = spark.read.json(BRONZE_API_ADVERSE_EVENTS_PATH)

display(subjects_sample.limit(5))
display(labs_sample.limit(5))
display(ae_sample.limit(5))


### STEP 3: Set Spark session defaults for consistency and performance ---

In [0]:

# All timestamps in UTC (prevents timezone drift)
spark.conf.set("spark.sql.session.timeZone", "UTC")

# Default shuffle partitions (controls number of output files)
# 200 is safe for small-to-medium data; tune later for big data
spark.conf.set("spark.sql.shuffle.partitions", "200")

# Case-insensitive column names (so 'Subject_ID' == 'subject_id')
spark.conf.set("spark.sql.caseSensitive", "false")

print("✅ Spark session defaults set successfully!")

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DateType
from pyspark.sql.functions import to_date,col, trim, upper, when, length, to_timestamp, coalesce, lit,substring

# ---------- Explicit schema for RAW trial subjects ----------
subjects_schema = StructType([
    StructField("subject_id",     StringType(), True),
    StructField("trial_id",       StringType(), True),
    StructField("site_id",        StringType(), True),
    StructField("enrollment_date",StringType(), True),  # read as string; we'll parse to timestamp
    StructField("arm",            StringType(), True),
    StructField("sex",            StringType(), True),
    StructField("dob",            StringType(), True),  # read as string; we'll parse to date/timestamp if needed later
    StructField("country",        StringType(), True)
])

# ---------- Read RAW as defined earlier (no mounts, ABFS path variable) ----------
raw_subjects_df = (
    spark.read
         .option("header", True)
         .schema(subjects_schema)
         .csv(BRONZE_SUBJECTS_TRIALS_PATH)
)

print(f"Raw rows: {raw_subjects_df.count()}")

# ---------- Normalize text columns: trim + uppercase COUNTRY only ----------
norm_df = (
    raw_subjects_df
      .withColumn("subject_id", trim(col("subject_id")))
      .withColumn("trial_id",   trim(col("trial_id")))
      .withColumn("site_id",    trim(col("site_id")))
      .withColumn("arm",        trim(col("arm")))
      .withColumn("sex",        trim(col("sex")))
      .withColumn("dob",        trim(col("dob")))
      .withColumn("country",    upper(trim(col("country"))))
)

# ---------- Parse enrollment_date robustly (try multiple formats) ----------
# Common patterns we may see: ISO Z, ISO without Z, date-only
enroll_ts = coalesce(
    to_timestamp(col("enrollment_date"), "dd-MM-yyyy"),
    to_timestamp(col("enrollment_date"), "dd/MM/yyyy"),
    to_timestamp(col("enrollment_date"))  # very last resort
)

subjects_parsed = norm_df.withColumn("enrollment_ts_utc", enroll_ts)



# ---------- Filter: drop null/blank subject_id ----------
subjects_nonnull = subjects_parsed.filter(
    (col("subject_id").isNotNull()) & (length(col("subject_id")) > 0)
)

# ---------- Deduplicate on business key subject_id (keep the latest enrollment if tie) ----------
# If you have a rule, adjust here. For now, use a simple dropDuplicates.
silver_subjects = subjects_nonnull.dropDuplicates(["subject_id"])

print(f"After cleaning (non-null + dedupe) rows: {silver_subjects.count()}")

# ---------- Reorder/select final columns for Silver ----------
silver_subjects_final = (
    silver_subjects
      .withColumn("dob", to_date(substring(col("dob"), 1, 10), "yyyy-MM-dd")) 
      .withColumn("enrollment_date", to_date(col("enrollment_ts_utc")))  # convert timestamp → date
      .drop("enrollment_ts_utc")                                        # remove old timestamp column
      .select(
          "subject_id", "trial_id", "site_id",
          "enrollment_date",      # new clean date column
          "arm", "sex",
          "dob","country"
      )
)


display(silver_subjects_final.limit(10))

(
    silver_subjects_final
        .write
        .mode("append")      # overwrite each run (safe for derived Silver data)
        .parquet(SILVER_SUBJECTS_PATH)  # write to ADLS Silver path
)


In [0]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import (
    col, trim, upper, to_timestamp, coalesce, length,
    regexp_replace, when, date_format
)

# --------- INPUT / OUTPUT PATHS ----------
# Make sure these exist in your notebook
# BRONZE_LABS_RESULTS_PATH = f"abfss://{container_raw}@{storage_account}.dfs.core.windows.net/adls_lab_results/"
SILVER_LAB_RESULTS = f"abfss://{container_silver}@{storage_account}.dfs.core.windows.net/lab_results/"

# --------- SCHEMA ----------
labs_results_schema = StructType([
    StructField("lab_result_id",         StringType(), True),
    StructField("subject_id",            StringType(), True),
    StructField("encounter_id",          StringType(), True),
    StructField("order_datetime_utc",    StringType(), True),
    StructField("collected_datetime_utc",StringType(), True),
    StructField("result_datetime_utc",   StringType(), True),
    StructField("loinc_code",            StringType(), True),
    StructField("test_name",             StringType(), True),
    StructField("result_value",          StringType(), True),
    StructField("unit",                  StringType(), True),
    StructField("abnormal_flag",         StringType(), True)
])

# --------- READ RAW ----------
raw_labs_results_df = (
    spark.read
         .option("header", True)
         .schema(labs_results_schema)
         .csv(BRONZE_LABS_RESULTS_PATH)
)

# --------- BASIC NORMALIZATION ----------
normalized_labs_results = (
    raw_labs_results_df
        .withColumn("lab_result_id", trim(col("lab_result_id")))
        .withColumn("subject_id",     trim(col("subject_id")))
        .withColumn("encounter_id",   trim(col("encounter_id")))
        .withColumn("test_name",      upper(trim(col("test_name"))))
        .withColumn("unit",           upper(trim(col("unit"))))
        .withColumn("abnormal_flag",  upper(trim(col("abnormal_flag"))))
        .withColumn("loinc_code",     trim(col("loinc_code")))
        .withColumn("result_value",   regexp_replace(col("result_value"), ",", ""))  # "1,200" -> "1200"
)


# --------- ENSURE NUMERIC + FLAG NORMALIZATION ----------
labs_clean = (
    labs_ts
        .withColumn("result_value", col("result_value").cast("double"))
        .withColumn(
            "abnormal_flag",
            when(col("abnormal_flag").isin("H", "HIGH"), "HIGH")
            .when(col("abnormal_flag").isin("L", "LOW"),  "LOW")
            .when(col("abnormal_flag").isin("N", "NORMAL"), "NORMAL")
            .otherwise("UNKNOWN")
        )
)

# --------- FILTER INTEGRITY + DEDUPE ----------
labs_filtered = (
    labs_clean
        .filter((col("lab_result_id").isNotNull()) & (length(col("lab_result_id")) > 0))
        .filter(col("loinc_code").isNotNull())
        .dropDuplicates(["lab_result_id"])
)

# --------- FINAL SELECT (RECOMMENDED: keep timestamps) ----------

#(Optional) if you truly want human-readable strings INSTEAD of timestamps:
silver_labs_results_final = (
    labs_filtered
      .withColumn("order_datetime",     date_format(col("order_datetime_utc"),     "yyyy-MM-dd HH:mm:ss"))
      .withColumn("collected_datetime", date_format(col("collected_datetime_utc"), "yyyy-MM-dd HH:mm:ss"))
      .withColumn("result_datetime",    date_format(col("result_datetime_utc"),    "yyyy-MM-dd HH:mm:ss"))
      .select(
          "lab_result_id","subject_id","encounter_id","loinc_code","test_name",
          "result_value","unit","abnormal_flag",
          "order_datetime","collected_datetime","result_datetime"
      )
)

display(silver_labs_results_final.limit(10))

# --------- WRITE TO SILVER ----------
(silver_labs_results_final
    .write
    .mode("overwrite")
    .parquet(SILVER_LAB_RESULTS)
)

print("✅ silver_lab_results written to:", SILVER_LAB_RESULTS)




In [0]:
display(spark.read.parquet(SILVER_LAB_RESULTS))

In [0]:
ae = spark.read.json(BRONZE_API_ADVERSE_EVENTS_PATH) 
display(ae.limit(100))

In [0]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import (
    col, trim, upper, initcap, coalesce, to_timestamp, regexp_replace, length,
    when, datediff, current_timestamp
)

# --------- INPUT / OUTPUT PATHS ----------
# Make sure these are set in your notebook:
# storage_account   = "adlsclinicalpoc"
# container_raw     = "raw"      or "bronze"
# container_silver  = "silver"
BRONZE_API_ADVERSE_EVENTS_PATH       = f"abfss://{container_bronze}@{storage_account}.dfs.core.windows.net/api_adverse_events/"
SILVER_ADVERSE_EVENTS      = f"abfss://{container_silver}@{storage_account}.dfs.core.windows.net/adverse_events/"

# --------- SCHEMA (strings in, we’ll parse later) ----------
ae_schema = StructType([
    StructField("event_id",                 StringType(), True),
    StructField("subject_id",            StringType(), True),
    StructField("trial_id",          StringType(), True),
    StructField("preferred_term",                  StringType(), True),  # e.g., "headache"
    StructField("onset_date",    StringType(), True),  # e.g., "2025-07-10T09:15:00Z"
    StructField("resolved_date", StringType(), True),  # may be null
    StructField("severity",              StringType(), True),  # e.g., mild/moderate/severe
    StructField("serious",           StringType(), True),  # e.g., serious/non-serious
    StructField("outcome",               StringType(), True),  # e.g., RECOVERED, NOT RECOVERED
    StructField("relatedness",          StringType(), True),  # e.g., RELATED/NOT RELATED/UNKNOWN
])

# --------- READ BRONZE (JSON) ----------
raw_ae = (
    spark.read
         .schema(ae_schema)
         

         .option("multiline", "false")
         .json(BRONZE_API_ADVERSE_EVENTS_PATH)
)

# --------- BASIC NORMALIZATION ----------
ae_norm = (
    raw_ae
      .withColumn("event_id",        trim(col("event_id")))
      .withColumn("subject_id",   trim(col("subject_id")))
      .withColumn("trial_id", trim(col("trial_id")))
      .withColumn("preferred_term",         initcap(trim(col("preferred_term"))))  # "Headache", "Injection Site Pain"
      .withColumn("severity",     upper(trim(col("severity"))))
      .withColumn("serious",  upper(trim(col("serious"))))
      .withColumn("outcome",      upper(trim(col("outcome"))))
      .withColumn("relatedness", upper(trim(col("relatedness"))))
)

# --------- CLEAN + PARSE DATETIMES (remove 'T' / 'Z' / 'UTC', try common patterns) ----------
def parse_clean_ts(scol):
    c = trim(regexp_replace(regexp_replace(scol, "T", " "), r"(?i)Z|UTC$", ""))
    return coalesce(
        to_timestamp(c, "yyyy-MM-dd HH:mm:ss"),
        to_timestamp(c, "yyyy/MM/dd HH:mm:ss"),
        to_timestamp(c, "dd-MM-yyyy HH:mm:ss"),
        to_timestamp(c, "dd/MM/yyyy HH:mm:ss"),
        to_timestamp(c, "yyyy-MM-dd HH:mm"),     # no seconds
        to_timestamp(c, "dd-MM-yyyy HH:mm"),
        to_timestamp(c, "dd/MM/yyyy HH:mm"),
        to_timestamp(c, "yyyy-MM-dd"),           # date-only → midnight
        to_timestamp(c, "dd-MM-yyyy"),
        to_timestamp(c, "dd/MM/yyyy")
    )

ae_ts = (
    ae_norm
      .withColumn("onset_date",    parse_clean_ts(col("onset_date")))
      .withColumn("resolved_date", parse_clean_ts(col("resolved_date")))
)

# --------- NORMALIZE CATEGORICALS ----------
# Severity → MILD / MODERATE / SEVERE / UNKNOWN
ae_cats = (
    ae_ts
      .withColumn(
          "severity",
          when(col("severity").isin("MILD", "MI", "LOW"), "MILD")
          .when(col("severity").isin("MODERATE", "MOD", "MEDIUM"), "MODERATE")
          .when(col("severity").isin("SEVERE", "SEV", "HIGH"), "SEVERE")
          .otherwise("UNKNOWN")
      )
      # Seriousness → SERIOUS / NON-SERIOUS / UNKNOWN
      .withColumn(
          "serious",
          when(col("serious").isin("SERIOUS", "YES", "Y"), "SERIOUS")
          .when(col("serious").isin("NON-SERIOUS", "NO", "N", "NOT SERIOUS"), "NON-SERIOUS")
          .otherwise("UNKNOWN")
      )
      # Relationship → RELATED / NOT RELATED / POSSIBLE / UNKNOWN
      .withColumn(
          "relatedness",
          when(col("relatedness").isin("RELATED", "CERTAIN", "PROBABLE", "LIKELY"), "RELATED")
          .when(col("relatedness").isin("NOT RELATED", "UNRELATED"), "NOT RELATED")
          .when(col("relatedness").isin("POSSIBLE", "POSSIBLY RELATED"), "POSSIBLE")
          .otherwise("UNKNOWN")
      )
      # Outcome → RECOVERED / RECOVERING / NOT RECOVERED / FATAL / UNKNOWN (basic map)
      .withColumn(
          "outcome",
          when(col("outcome").isin("RECOVERED", "RESOLVED"), "RECOVERED")
          .when(col("outcome").isin("RECOVERING", "IMPROVING"), "RECOVERING")
          .when(col("outcome").isin("NOT RECOVERED", "NOT RESOLVED", "PERSISTING"), "NOT RECOVERED")
          .when(col("outcome").isin("FATAL", "DEATH"), "FATAL")
          .otherwise("UNKNOWN")
      )
)

# --------- DERIVED FIELDS ----------
ae_derived = (
    ae_cats
      # duration_days: if resolved is null → keep null (or set to current; choose policy)
      .withColumn("duration_days",
                  when(col("resolved_date").isNotNull(),
                       datediff(col("resolved_date"), col("onset_date")))
                  .otherwise(None))
      # flag ongoing events
      .withColumn("is_ongoing", col("resolved_date").isNull())
      # guard against negative durations (bad data: resolved before onset)
      .withColumn("duration_days",
                  when(col("duration_days") < 0, None).otherwise(col("duration_days")))
)
# Add this code before the 'ae_filtered' step to check
print("Count of records per event_id:")
(ae_derived
    .groupBy("event_id")
    .count()
    .orderBy(col("count").desc())
    .show()
)
# --------- ENFORCE KEYS + DEDUPE ----------
ae_filtered = (
    ae_derived
      .filter((col("event_id").isNotNull()) & (length(col("event_id")) > 0))
      .filter((col("subject_id").isNotNull()) & (length(col("subject_id")) > 0))
      .dropDuplicates(["event_id"])
)

# --------- FINAL SILVER SELECT ----------
silver_adverse_events = ae_filtered.select(
    "event_id","subject_id","trial_id",
    "preferred_term","severity","serious","relatedness","outcome",
    "onset_date","resolved_date",
    "duration_days","is_ongoing"
)

display(silver_adverse_events.limit(20))
print("Null onset:", silver_adverse_events.filter(col("onset_date").isNull()).count())
print("Null severity:", silver_adverse_events.filter(col("severity").isNull()).count())

#--------- WRITE TO SILVER (Parquet) ----------
(silver_adverse_events
    .write
    .mode("overwrite")
    .parquet(SILVER_ADVERSE_EVENTS)
)
print("✅ silver_adverse_events written to:", SILVER_ADVERSE_EVENTS)
