In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

In [0]:
%run /Workspace/Users/evansavo@gmail.com/Population_Health_&_Readmission_Risk/1.setup/utilities

In [0]:
# setup dbutils
dbutils.widgets.text('catalog', 'phr', 'Catalog')
dbutils.widgets.text('data_source', '', 'Data Source')

In [0]:
catalog = dbutils.widgets.get('catalog')
data_source = dbutils.widgets.get('data_source')

base_path = f'databricks_cms_synthetic_public_use_files_synpuf.cms_synpuf_ext.{data_source}'


print(base_path)

# Bronze level

In [0]:
# Load table
df = spark.read.table(base_path).withColumn('read_timestamp', F.current_timestamp())

In [0]:
# Save data
(df.write
 .format('delta')
 .option('delta.enableChangeDataFeed', 'true')
 .mode('overwrite')
 .saveAsTable(f'{catalog}.{bronze_schema}.{data_source}')
)

# Silver Level

In [0]:
silver_df = spark.sql(f'select * from {catalog}.{bronze_schema}.{data_source}')
# display(silver_df.limit(5))

In [0]:
# Drop columns in list
hcpcs_cols = [f'HCPCS_CD_{x}' for x in range(1,14)]

silver_df = (
    silver_df
    .withColumn("HCPCS_CD",
    F.array(*hcpcs_cols))
    .withColumn("HCPCS_CD",
    F.expr("filter(HCPCS_CD, x -> x is not null)"))
    .drop(*hcpcs_cols)
)

In [0]:
# Drop columns in list
physn_cols = [f'PRF_PHYSN_NPI_{x}' for x in range(1,14)]

silver_df = (
    silver_df
    .withColumn("PRF_PHYSN_NPI",
    F.array(*physn_cols))
    .withColumn("PRF_PHYSN_NPI",
    F.expr("filter(PRF_PHYSN_NPI, x -> x is not null)"))
    .drop(*physn_cols)
)

In [0]:
# Drop columns in list
tax_cols = [f'TAX_NUM_{x}' for x in range(1,14)]

silver_df = (
    silver_df
    .withColumn("TAX_NUM",
    F.array(*tax_cols))
    .withColumn("TAX_NUM",
    F.expr("filter(TAX_NUM, x -> x is not null)"))
    .drop(*tax_cols)
)

In [0]:
# Create column LINE_NCH_PMT_AMT
diag_cols = [f"LINE_NCH_PMT_AMT_{i}" for i in range(1, 14)]

# Change D type
for c in diag_cols:
    silver_df = silver_df.withColumn(c, F.col(c).cast("double"))

silver_df = (
    silver_df
    .withColumn("LINE_NCH_PMT_AMT",
    F.array(*diag_cols))
    .withColumn("LINE_NCH_PMT_AMT",
    F.expr("filter(LINE_NCH_PMT_AMT, x -> x is not null)"))
    .drop(*diag_cols)
)

In [0]:
# Create column LINE_BENE_PTB_DDCTBL_AMT
diag_cols = [f"LINE_BENE_PTB_DDCTBL_AMT_{i}" for i in range(1, 14)]

# Change D type
for c in diag_cols:
    silver_df = silver_df.withColumn(c, F.col(c).cast("double"))


silver_df = (
    silver_df
    .withColumn("LINE_BENE_PTB_DDCTBL_AMT",
    F.array(*diag_cols))
    .withColumn("LINE_BENE_PTB_DDCTBL_AMT",
    F.expr("filter(LINE_BENE_PTB_DDCTBL_AMT, x -> x is not null)"))
    .drop(*diag_cols)
)

In [0]:
# Create column LINE_BENE_PRMRY_PYR_PD_AMT
diag_cols = [f"LINE_BENE_PRMRY_PYR_PD_AMT_{i}" for i in range(1, 14)]

# Change D type
for c in diag_cols:
    silver_df = silver_df.withColumn(c, F.col(c).cast("double"))

silver_df = (
    silver_df
    .withColumn("LINE_BENE_PRMRY_PYR_PD_AMT",
    F.array(*diag_cols))
    .withColumn("LINE_BENE_PRMRY_PYR_PD_AMT",
    F.expr("filter(LINE_BENE_PRMRY_PYR_PD_AMT, x -> x is not null)"))
    .drop(*diag_cols)
)

In [0]:
# Create column LINE_COINSRNC_AMT
diag_cols = [f"LINE_COINSRNC_AMT_{i}" for i in range(1, 14)]

# Change D type
for c in diag_cols:
    silver_df = silver_df.withColumn(c, F.col(c).cast("double"))

silver_df = (
    silver_df
    .withColumn("LINE_COINSRNC_AMT",
    F.array(*diag_cols))
    .withColumn("LINE_COINSRNC_AMT",
    F.expr("filter(LINE_COINSRNC_AMT, x -> x is not null)"))
    .drop(*diag_cols)
)

In [0]:
# Create column LINE_ALOWD_CHRG_AMT
diag_cols = [f"LINE_ALOWD_CHRG_AMT_{i}" for i in range(1, 14)]

# Change D type
for c in diag_cols:
    silver_df = silver_df.withColumn(c, F.col(c).cast("double"))


silver_df = (
    silver_df
    .withColumn("LINE_ALOWD_CHRG_AMT",
    F.array(*diag_cols))
    .withColumn("LINE_ALOWD_CHRG_AMT",
    F.expr("filter(LINE_ALOWD_CHRG_AMT, x -> x is not null)"))
    .drop(*diag_cols)
)

In [0]:
# Create column LINE_PRCSG_IND_CD
diag_cols = [f"LINE_PRCSG_IND_CD_{i}" for i in range(1, 14)]

silver_df = (
    silver_df
    .withColumn("LINE_PRCSG_IND_CD",
    F.array(*diag_cols))
    .withColumn("LINE_PRCSG_IND_CD",
    F.expr("filter(LINE_PRCSG_IND_CD, x -> x is not null)"))
    .drop(*diag_cols)
)

In [0]:
# Create column LINE_ICD9_DGNS_CD
diag_cols = [f"LINE_ICD9_DGNS_CD_{i}" for i in range(1, 14)]

silver_df = (
    silver_df
    .withColumn("LINE_ICD9_DGNS_CD",
    F.array(*diag_cols))
    .withColumn("LINE_ICD9_DGNS_CD",
    F.expr("filter(LINE_ICD9_DGNS_CD, x -> x is not null)"))
    .drop(*diag_cols)
)

In [0]:
# Create column ICD9_DGNS_CD
diag_cols = [f"ICD9_DGNS_CD_{i}" for i in range(1, 9)]

silver_df = (
    silver_df
    .withColumn("ICD9_DGNS_CD",
    F.array(*diag_cols))
    .withColumn("ICD9_DGNS_CD",
    F.expr("filter(ICD9_DGNS_CD, x -> x is not null)"))
    .drop(*diag_cols)
)

In [0]:
# Change Dtype (Date)
silver_df = (
    silver_df
    .withColumn('CLM_FROM_DT', F.to_date(F.col('CLM_FROM_DT'), 'yyyyMMdd'))
    .withColumn('CLM_THRU_DT', F.to_date(F.col('CLM_THRU_DT'), 'yyyyMMdd'))
)

In [0]:
# Create column CLAIM_DURATION
silver_df = (silver_df
            .withColumn('CLAIM_DURATION', F.datediff('CLM_THRU_DT', 'CLM_FROM_DT'))
)

In [0]:
# count nulls in each column
total_rows = silver_df.count()

for col_name in silver_df.columns:
    null_count = silver_df.filter(F.col(col_name).isNull()).count()
    null_ratio = null_count * 100 / total_rows
    print(f"Nulls in {col_name}: {null_count}, {null_ratio}")

In [0]:
# Drop missing
silver_df = silver_df.dropna()

# Drop read_timestamp
silver_df = silver_df.drop('read_timestamp')

In [0]:
# Load ben_sum
ben_sum = spark.read.table('phr.`02_silver`.ben_sum')

# Drop read_timestamp 
ben_sum = ben_sum.drop('read_timestamp')

# Join with ben_sum
silver_df = silver_df.join(ben_sum, on='DESYNPUF_ID', how='inner')

In [0]:
# Write to silver layer
(silver_df.write
 .format('delta')
 .mode('overwrite')
 .option('enableChangeDataFeed', 'true')
 .saveAsTable(f'{catalog}.{silver_schema}.{data_source}_enriched')
)