In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Core column and expression functions
from pyspark.sql.functions import col, lit, when, lag, datediff

# Date-related functions
from pyspark.sql.functions import year, month

# Aggregation functions
from pyspark.sql.functions import sum as _sum, avg, count, rank

In [0]:
beneficiary_features = spark.read.parquet("/mnt/silver/beneficiary/beneficiary_features")
beneficiary_clean = spark.read.parquet("/mnt/silver/beneficiary/beneficiary_clean")


 chronic conditions: Help clinical stakeholders identify high-risk groups.

In [0]:
from pyspark.sql.functions import sum as _sum

chronic_by_demo = beneficiary_features.groupBy(
    "age_group", "bene_sex_ident_cd", "bene_race_cd"
).agg(
    _sum("sp_diabetes").alias("diabetes_count"),
    _sum("sp_chf").alias("chf_count"),
    _sum("sp_copd").alias("copd_count"),
    _sum("sp_cncr").alias("cancer_count")
)

# Save to Gold
chronic_by_demo.write.mode("overwrite").parquet("/mnt/gold/beneficiary/chronic_condition_by_demo")


In [0]:
chronic_by_demo.show()


Assign a health risk score to each beneficiary based on chronic conditions.

In [0]:
from pyspark.sql.functions import sum as _sum, col, when, lit
from functools import reduce

# Define weights for each chronic condition
condition_weights = {
    "sp_diabetes": 1,
    "sp_chf": 2,
    "sp_cncr": 2,
    "sp_copd": 1.5
}

# Compute risk_score dynamically
risk_score_expr = reduce(
    lambda acc, item: acc + (col(item[0]) * lit(item[1])),
    condition_weights.items(),
    lit(0)
)

# Add risk_score and risk_level
beneficiary_risk = beneficiary_features.withColumn("risk_score", risk_score_expr).withColumn(
    "risk_level",
    when(col("risk_score") >= 4, "High")
    .when(col("risk_score") >= 2, "Medium")
    .otherwise("Low")
)

# Select only useful columns for the Gold layer
beneficiary_risk = beneficiary_risk.select(
    "desynpuf_id",
    "age",
    "age_group",
    "bene_sex_ident_cd",
    "bene_race_cd",
    "sp_diabetes",
    "sp_chf",
    "sp_cncr",
    "sp_copd",
    "risk_score",
    "risk_level"
)

# Save to Gold
beneficiary_risk.write.mode("overwrite").parquet("/mnt/gold/beneficiary/beneficiary_risk_scores")


In [0]:
beneficiary_risk.show()

Age at Death or Current Age

In [0]:
from pyspark.sql.functions import datediff, current_date

beneficiary_age = beneficiary_clean.withColumn(
    "age_at_death_or_current",
    when(
        col("bene_death_dt").isNotNull(),
        (datediff(col("bene_death_dt"), col("bene_birth_dt")) / 365).cast("int")
    ).otherwise(
        (datediff(current_date(), col("bene_birth_dt")) / 365).cast("int")
    )
)

beneficiary_age = beneficiary_age.select(
    "desynpuf_id",
    "bene_birth_dt",
    "bene_death_dt",
    "bene_sex_ident_cd",
    "bene_race_cd",
    "age_at_death_or_current"
)

# Save to Gold
beneficiary_age.write.mode("overwrite").parquet("/mnt/gold/beneficiary/beneficiary_lifespan")


In [0]:
beneficiary_age.show()