In [0]:
# Databricks Auto Loader Ingestion Notebook for Bronze Tables

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Common base path
landing_zone_path = "s3://kungfupandas/landing_zone/"
bronze_base_path = "s3://kungfupandas/bronze/"
checkpoint_base_path = "s3://kungfupandas/checkpoints/bronze/"

In [0]:
# ---------------------- patient_details ----------------------
patient_details_schema = StructType([
    StructField("id", StringType(), True),
    StructField("First Name", StringType(), True),
    StructField("Surname", StringType(), True),
    StructField("Title", StringType(), True),
    StructField("Date of Birth", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("Mobile", StringType(), True),
    StructField("Home Phone", StringType(), True),
    StructField("Work Phone", StringType(), True)
])

raw_patient_details_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(patient_details_schema)
        .load(f"{landing_zone_path}patient_details_*")
)

patient_details_df = raw_patient_details_df.toDF(
    "id", "first_name", "surname", "title", "date_of_birth",
    "address", "mobile", "home_phone", "work_phone"
)

(
    patient_details_df
    .writeStream
    .format("delta")
    .option("checkpointLocation", f"{checkpoint_base_path}kgf_patient_details")
    .option("mergeSchema", "true")
    .outputMode("append")
    .trigger(once=True)
    .start(f"{bronze_base_path}kgf_patient_details")
)

In [0]:
# ---------------------- patient_demographics ----------------------
patient_demographics_schema = StructType([
    StructField("id", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Marital Status", StringType(), True),
    StructField("Employment Status", StringType(), True),
    StructField("Education Level", StringType(), True),
    StructField("Ethnicity", StringType(), True),
    StructField("Annual Income (AUD)", StringType(), True),
    StructField("Insurance Type", StringType(), True),
    StructField("Primary Language", StringType(), True)
])

raw_patient_demographics_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(patient_demographics_schema)
        .load(f"{landing_zone_path}patient_demographics_*")
)

patient_demographics_df = raw_patient_demographics_df.toDF(
    "id", "gender", "marital_status", "employment_status", "education_level",
    "ethnicity", "annual_income_aud", "insurance_type", "primary_language"
)

(
    patient_demographics_df
    .writeStream
    .format("delta")
    .option("checkpointLocation", f"{checkpoint_base_path}kgf_patient_demographics")
    .option("mergeSchema", "true")
    .outputMode("append")
    .trigger(once=True)
    .start(f"{bronze_base_path}kgf_patient_demographics")
)

In [0]:
# ---------------------- patient_cancer_risk ----------------------
patient_cancer_risk_schema = StructType([
    StructField("id", StringType(), True),
    StructField("smoking", StringType(), True),
    StructField("alcohol_consumption", StringType(), True),
    StructField("family_history", StringType(), True),
    StructField("poor_diet", StringType(), True),
    StructField("obesity", StringType(), True),
    StructField("sedentary_lifestyle", StringType(), True),
    StructField("exposure_to_carcinogens", StringType(), True),
    StructField("chronic_inflammation", StringType(), True),
    StructField("hpv_infection", StringType(), True),
    StructField("sun_exposure", StringType(), True),
    StructField("air_pollution", StringType(), True),
    StructField("radiation_exposure", StringType(), True),
    StructField("chemical_exposure", StringType(), True),
    StructField("immunosuppression", StringType(), True),
    StructField("night_shift_work", StringType(), True),
    StructField("age", StringType(), True),
    StructField("gender", StringType(), True)
])

patient_cancer_risk_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(patient_cancer_risk_schema)
        .load(f"{landing_zone_path}patient_cancer_risk_*")
)

(
    patient_cancer_risk_df
    .writeStream
    .format("delta")
    .option("checkpointLocation", f"{checkpoint_base_path}kgf_patient_cancer_risk")
    .option("mergeSchema", "true")
    .outputMode("append")
    .trigger(once=True)
    .start(f"{bronze_base_path}kgf_patient_cancer_risk")
)


In [0]:
# ---------------------- patient_cancer_detection ----------------------
patient_cancer_detection_schema = StructType([
    StructField("Patient ID", StringType(), True),
    StructField("Detection Date", StringType(), True),
    StructField("Detection Method", StringType(), True),
    StructField("Stage at Detection", StringType(), True),
    StructField("Cancer Type", StringType(), True),
    StructField("Cancer Probability", StringType(), True)
])

raw_patient_cancer_detection_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(patient_cancer_detection_schema)
        .load(f"{landing_zone_path}patient_cancer_detection_*")
)

patient_cancer_detection_df = raw_patient_cancer_detection_df.toDF(
    "patient_id", "detection_date", "detection_method", "stage_at_detection",
    "cancer_type", "cancer_probability"
)

(
    patient_cancer_detection_df
    .writeStream
    .format("delta")
    .option("checkpointLocation", f"{checkpoint_base_path}kgf_patient_cancer_detection")
    .option("mergeSchema", "true")
    .outputMode("append")
    .trigger(once=True)
    .start(f"{bronze_base_path}kgf_patient_cancer_detection")
)

In [0]:
# ---------------------- patient_risk_scores ----------------------
patient_risk_scores_schema = StructType([
    StructField("id", StringType(), True),
    StructField("age", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("smoking_score", StringType(), True),
    StructField("alcohol_consumption_score", StringType(), True),
    StructField("family_history_score", StringType(), True),
    StructField("poor_diet_score", StringType(), True),
    StructField("obesity_score", StringType(), True),
    StructField("sedentary_lifestyle_score", StringType(), True),
    StructField("exposure_to_carcinogens_score", StringType(), True),
    StructField("chronic_inflammation_score", StringType(), True),
    StructField("hpv_infection_score", StringType(), True),
    StructField("sun_exposure_score", StringType(), True),
    StructField("air_pollution_score", StringType(), True),
    StructField("radiation_exposure_score", StringType(), True),
    StructField("chemical_exposure_score", StringType(), True),
    StructField("immunosuppression_score", StringType(), True),
    StructField("night_shift_work_score", StringType(), True),
    StructField("total_risk_score", StringType(), True),
    StructField("lung_cancer", StringType(), True),
    StructField("breast_cancer", StringType(), True),
    StructField("colon_cancer", StringType(), True),
    StructField("skin_cancer", StringType(), True),
    StructField("cervical_cancer", StringType(), True),
    StructField("prostate_cancer", StringType(), True),
    StructField("leukemia", StringType(), True),
    StructField("pancreatic_cancer", StringType(), True)
])

patient_risk_scores_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(patient_risk_scores_schema)
        .load(f"{landing_zone_path}patient_risk_scores_*")
)

(
    patient_risk_scores_df
    .writeStream
    .format("delta")
    .option("checkpointLocation", f"{checkpoint_base_path}kgf_patient_risk_scores")
    .option("mergeSchema", "true")
    .outputMode("append")
    .trigger(once=True)
    .start(f"{bronze_base_path}kgf_patient_risk_scores")
)