In [0]:
spark.sql('''USE CATALOG ushealthcaredynamics''');
spark.sql('''USE SCHEMA silver''');



Physician

In [0]:
import dlt
import re
from pyspark.sql.functions import col, round

spark.sql('''USE CATALOG ushealthcaredynamics''')
spark.sql('''USE SCHEMA silver''')

def camel_to_snake(name):
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()


@dlt.table(
    name="Physician_LookUp",
    comment="Transformed dimphysician table for US Healthcare analytics, with clear column names, type handling, and audit columns included"
)
@dlt.expect("dim_physician_pk_not_null", "dim_physician_pk IS NOT NULL")
def dimphysician():
    # Read from bronze layer
    df = dlt.read("ushealthcaredynamics.bronze.dimphysician")

    # Rename columns to snake_case
    renamed_cols = {c: camel_to_snake(c) for c in df.columns}
    df = df.select([col(c).alias(renamed_cols[c]) for c in df.columns])

    # Apply transformations
    df_transformed = (
        df
        .withColumn("dim_physician_pk", col("dim_physician_pk").cast("int"))
        .withColumn("provider_npi", col("provider_npi").cast("int"))
        .withColumn("speciality_code", col("speciality_code").cast("int"))
        .withColumn("provider_fte", round(col("provider_fte").cast("float"), 2))
        .withColumn("provider_name", col("provider_name").cast("string"))
        .filter(col("dim_physician_pk").isNotNull())
        .dropDuplicates()
    )

    return df_transformed


Payer

In [0]:
import dlt
import re
from pyspark.sql.functions import col

spark.sql('''USE CATALOG ushealthcaredynamics''');
spark.sql('''USE SCHEMA silver''');

def camel_to_snake(name):
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

@dlt.table(
    name="Payer_LookUp",
    comment="Transformed dimpayor table for US Healthcare analytics, with clear column names, type handling, and audit columns included"
)
def dimpayor():
    df = dlt.read("ushealthcaredynamics.bronze.dimpayor")
    renamed_cols = {c: camel_to_snake(c) for c in df.columns}
    df = df.select([col(c).alias(renamed_cols[c]) for c in df.columns])
    df_transformed = (
        df
        .withColumn("dim_payer_pk", col("dim_payer_pk").cast("int"))
        .withColumn("payer_name", col("payer_name").cast("string"))
        .filter(col("dim_payer_pk").isNotNull())
        .dropDuplicates()
    )
    return df_transformed

Transaction

In [0]:
import dlt
import re
from pyspark.sql.functions import col

spark.sql('''USE CATALOG ushealthcaredynamics''');
spark.sql('''USE SCHEMA silver''');

def camel_to_snake(name):
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

@dlt.table(
    name=" Transaction_LookUp",
    comment="Transformed dimtransaction table for US Healthcare analytics, with clear column names, type handling, and audit columns included"
)
def dimtransaction():
    df = dlt.read("ushealthcaredynamics.bronze.dimtransaction")
    renamed_cols = {c: camel_to_snake(c) for c in df.columns}
    df = df.select([col(c).alias(renamed_cols[c]) for c in df.columns])
    df_transformed = (
        df
        .withColumn("dim_transaction_pk", col("dim_transaction_pk").cast("int"))
        .withColumn("transaction_type", col("transaction_type").cast("string"))
        .withColumn("transaction", col("transaction").cast("string"))
        .withColumn("adjustment_reason", col("adjustment_reason").cast("string"))
        .filter(col("dim_transaction_pk").isNotNull())
        .dropDuplicates()
    )
    return df_transformed

Diagnosis

In [0]:
import dlt
from pyspark.sql.functions import col, trim, regexp_replace, to_date, current_timestamp

spark.sql('''USE CATALOG ushealthcaredynamics''');
spark.sql('''USE SCHEMA silver''');

@dlt.table(
    name="DiagnosisCode_LookUp",
    comment="Cleaned diagnosis data for Capstone",
    table_properties={"quality": "DimDiagnosis silver"}
)
def DimDiagnosis_silver():
    df = dlt.read("ushealthcaredynamics.bronze.dimdiagnosis")
    df = df.toDF(*[c.replace(' ', '_') for c in df.columns])
    df = df.filter(col("dimDiagnosisCodePK").isNotNull())
    df = df.dropDuplicates(["dimDiagnosisCodePK"])
    df = df.withColumn("dimDiagnosisCodePK", col("dimDiagnosisCodePK").cast("long"))
    dlt.expect("PK_not_null", "dimDiagnosisCodePK IS NOT NULL")
    df = df.withColumn("update_date", current_timestamp())
    
    return df

Date

In [0]:
import dlt
from pyspark.sql.functions import col, current_timestamp, lit, lpad
spark.sql('''USE CATALOG ushealthcaredynamics''');
spark.sql('''USE SCHEMA silver''');

@dlt.table(
    name="dimDate",
    comment="Cleaned and standardized date dimension data.",
    table_properties={"quality": "silver"}
)
def DimDate_silver():
  """
  Reads the bronze DimDate table, filters out null dates, renames/standardizes columns,
  and casts the 'Date' column to the DateType.
  """
  return (
    dlt.read("ushealthcaredynamics.bronze.dimdate")
    # 1. Filter out records where 'Date' is null before processing
    .filter(col("Date").isNotNull())
    .select(      
      # Rename remaining columns
      col("Year").alias("date_year"),
      col("Month").alias("date_month_name"),
      
      # Use lpad to ensure MonthPeriod is a two-digit string (e.g., '1' -> '01')
      lpad(col("MonthPeriod"), 2, '0').alias("date_month_number"), 
      
      col("MonthYear").alias("date_month_year"),
      col("Day").alias("date_day_of_month"),
      col("Date").alias("Date"),
      col("DayName").alias("date_day_name"),
      
      # Keep audit columns
      col("ingestion_date"),
      col("source_file"),
      col("audit_source")
    )
  )
 

Patient

In [0]:
import dlt
from pyspark.sql.functions import col, when, expr, upper, substring, concat_ws, udf
from pyspark.sql.types import StringType
import re
spark.sql('''USE CATALOG ushealthcaredynamics''');
spark.sql('''USE SCHEMA silver''');

# Define a masking UDF for PatientNumber
def mask_mrn(mrn):
    if mrn is None:
        return None
    mrn_str = str(mrn)
    if len(mrn_str) <= 4:
        return '*' * len(mrn_str)
    return '*' * (len(mrn_str) - 4) + mrn_str[-4:]

mask_mrn_udf = udf(mask_mrn, StringType())

def camel_to_snake(name):
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

@dlt.table(name="Patient_LookUp")
def silver_dimpatient():
    bronze_df = dlt.read("ushealthcaredynamics.bronze.dimpatient")
    # Rename columns to snake_case
    renamed_cols = {c: camel_to_snake(c) for c in bronze_df.columns}
    df = bronze_df.select([col(c).alias(renamed_cols[c]) for c in bronze_df.columns])

    df = (
        df.withColumn("patient_email_domain", expr("substring(email, length(email) - 9, 10)"))
          .withColumn("full_name", concat_ws(" ", col("first_name"), col("last_name")))
          .withColumn("patient_state_code", upper(substring(col("state_code"),1,3)))
          .withColumn("patient_age_group",
                      when(col("patient_age") < 18, "Child")
                      .when((col("patient_age") >= 18) & (col("patient_age") < 65), "Adult")
                      .otherwise("Senior"))
          .withColumn("patmrn", mask_mrn_udf(col("patient_number")))
          .dropDuplicates(["dim_patient_pk"])
    )
    return df


CPTCode

In [0]:
import dlt
from pyspark.sql.functions import col, upper, trim, initcap, substring, when, current_timestamp
import re

spark.sql('''USE CATALOG ushealthcaredynamics''');
spark.sql('''USE SCHEMA silver''');

# Utility to convert camelCase/PascalCase to snake_case
def camel_to_snake(name):
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

@dlt.table(name="CPTCode_LookUp")
def silver_dimcptcode():
    bronze_df = dlt.read("ushealthcaredynamics.bronze.dimcptcode")
    # Rename all columns to snake_case
    renamed_cols = {c: camel_to_snake(c) for c in bronze_df.columns}
    df = bronze_df.select([col(c).alias(renamed_cols[c]) for c in bronze_df.columns])

    df = (
        df.dropDuplicates(["cpt_code"])
          .withColumn("cpt_code", upper(trim(col("cpt_code"))))
          .withColumn("cpt_desc", initcap(trim(col("cpt_desc"))))
          .withColumn("short_desc", substring(col("cpt_desc"), 1, 30))
          .withColumn("is_cpt_code_missing", col("cpt_code").isNull() | (col("cpt_code") == ""))
          .withColumn("ingestion_timestamp", current_timestamp())
    )
    return df


Hospital

In [0]:
import dlt
from pyspark.sql.functions import col, initcap, trim, current_timestamp
import re

spark.sql('''USE CATALOG ushealthcaredynamics''');
spark.sql('''USE SCHEMA silver''');

# Utility for snake_case conversion
def camel_to_snake(name):
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

@dlt.table(name="Hospital_LookUp")
def silver_dimhospital():
    bronze_df = dlt.read("ushealthcaredynamics.bronze.dimhospital")
    # Rename both columns to snake_case
    renamed_cols = {c: camel_to_snake(c) for c in bronze_df.columns}
    df = bronze_df.select([col(c).alias(renamed_cols[c]) for c in bronze_df.columns])
    df = (
        df.withColumn("location_name", initcap(trim(col("location_name"))))
          .dropDuplicates(["dim_location_pk"])
          .withColumn("ingestion_timestamp", current_timestamp())
    )
    return df


FactTable

In [0]:
import dlt
import re
from pyspark.sql.functions import col, to_date, round, udf
from pyspark.sql.types import StringType

spark.sql('''USE CATALOG ushealthcaredynamics''');
spark.sql('''USE SCHEMA silver''');

def camel_to_snake(name):
    s = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    s = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s)

    # Remove multiple underscores
    s = re.sub('_+', '_', s)

    return s.lower()


def to_snake_case_df(df):
    renamed_cols = {c: camel_to_snake(c) for c in df.columns}
    return df.select([col(c).alias(renamed_cols[c]) for c in df.columns])



def mask_mrn(mrn):
    if mrn is None:
        return None
    mrn_str = str(mrn)
    if len(mrn_str) <= 4:
        return '*' * len(mrn_str)
    return '*' * (len(mrn_str) - 4) + mrn_str[-4:]

mask_mrn_udf = udf(mask_mrn, StringType())


# ------------------------------------------------------------
# Single Silver Table
# ------------------------------------------------------------
@dlt.table(
    comment="Single-step silver table with snake_case, filtering, casting, and MRN masking."
)
def facttable():

    # Read bronze
    df = dlt.read("ushealthcaredynamics.bronze.facttable")

    # Filter out missing key fields
    df_filtered = df.filter(
        col("FactTablePK").isNotNull() & (col("FactTablePK") != "") &
        col("dimPatientPK").isNotNull() & (col("dimPatientPK") != "") &
        col("dimPhysicianPK").isNotNull() & (col("dimPhysicianPK") != "") &
        col("dimDateServicePK").isNotNull() & (col("dimDateServicePK") != "") &
        col("dimDatePostPK").isNotNull() & (col("dimDatePostPK") != "") &
        col("dimCPTCodePK").isNotNull() & (col("dimCPTCodePK") != "") &
        col("dimPayerPK").isNotNull() & (col("dimPayerPK") != "") &
        col("dimTransactionPK").isNotNull() & (col("dimTransactionPK") != "") &
        col("dimLocationPK").isNotNull() & (col("dimLocationPK") != "")
    )

    
    df_snake = to_snake_case_df(df_filtered)

   
    df_final = (
        df_snake
        .withColumn("fact_table_pk", col("fact_table_pk").cast("long"))
        .withColumn("check_dimension", col("check_dimension").cast("long"))
        .withColumn("dim_patient_pk", col("dim_patient_pk").cast("long"))
        .withColumn("dim_physician_pk", col("dim_physician_pk").cast("long"))
        .withColumn("dim_date_service_pk", to_date(col("dim_date_service_pk"), "dd-MM-yyyy"))
        .withColumn("dim_date_post_pk", to_date(col("dim_date_post_pk"), "dd-MM-yyyy"))
        .withColumn("dim_cpt_code_pk", col("dim_cpt_code_pk").cast("long"))
        .withColumn("dim_payer_pk", col("dim_payer_pk").cast("long"))
        .withColumn("dim_transaction_pk", col("dim_transaction_pk").cast("long"))
        .withColumn("dim_location_pk", col("dim_location_pk").cast("long"))

        
        .withColumn("patient_number", mask_mrn_udf(col("patient_number")))

        .withColumn("dim_diagnosis_code_pk", col("dim_diagnosis_code_pk").cast("long"))
        .withColumn("cpt_units", col("cpt_units").cast("int"))
        .withColumn("gross_expenses", col("gross_expenses").cast("double"))
        .withColumn("adjustment", col("adjustment").cast("double"))
        .withColumn("insurance_payment", col("insurance_payment").cast("int"))
        .withColumn("patient_payment", col("patient_payment").cast("double"))
        .withColumn("ar", round(col("ar").cast("double"), 2))
        .withColumn("ingestion_date", col("ingestion_date").cast("timestamp"))
        .withColumn("source_file", col("source_file").cast("string"))
        .withColumn("audit_source", col("audit_source").cast("string"))
    )

    return df_final
