In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("hiv-project") \
    .getOrCreate()

In [0]:
%pip uninstall -y databricks_helpers exercise_ev_databricks_unit_tests
%pip install git+https://github.com/data-derp/databricks_helpers#egg=databricks_helpers git+https://github.com/data-derp/exercise_ev_databricks_unit_tests#egg=exercise_ev_databricks_unit_tests

In [0]:
from databricks_helpers.databricks_helpers import DataDerpDatabricksHelpers
exercise_name = "hiv-project"
helpers = DataDerpDatabricksHelpers(dbutils, exercise_name)

# SILVER

## Read from Bronze

In [0]:

def read_from_unity_catalog(table_name: str) -> DataFrame:
    return spark.read.table(table_name)

In [0]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType, StringType
from utils import write_to_uc_table
from pyspark.sql.functions import col, when, lit, regexp_replace, isnan


In [0]:
bronze_df = read_from_unity_catalog("catalog_de.bronze.raw_hiv_data")


In [0]:
available_years = [2011, 2012, 2013, 2014, 2015, 2017, 2018]
poverty_dfs = {}

for year in available_years:
    poverty_dfs[year] = read_from_unity_catalog(f"catalog_de.bronze.raw_poverty_{year}")



hiv

In [0]:
hiv_silver = bronze_df.withColumn("hiv_diagnosis_rate", 
                       when(col("hiv_diagnosis_rate") == 99999, None).otherwise(col("hiv_diagnosis_rate")))



In [0]:
hiv_silver = hiv_silver.withColumn("perc_linked_to_care_within_3_months", 
                       when(col("perc_linked_to_care_within_3_months") == 99999, None).otherwise(col("perc_linked_to_care_within_3_months")))
hiv_silver = hiv_silver.withColumn("aids_diagnoses", 
                       when(col("aids_diagnoses") == 99999, None).otherwise(col("aids_diagnoses")))
hiv_silver = hiv_silver.withColumn("aids_diagnosis_rate", 
                       when(col("aids_diagnosis_rate") == 99999, None).otherwise(col("aids_diagnosis_rate")))
hiv_silver = hiv_silver.withColumn("plwdhi_prevalence", 
                       when(col("plwdhi_prevalence") == 99999, None).otherwise(col("plwdhi_prevalence")))
hiv_silver = hiv_silver.withColumn("perc_viral_suppression", 
                       when(col("perc_viral_suppression") == 99999, None).otherwise(col("perc_viral_suppression")))
hiv_silver = hiv_silver.withColumn("deaths", 
                       when(col("deaths") == 99999, None).otherwise(col("deaths")))
hiv_silver = hiv_silver.withColumn("death_rate", 
                       when(col("death_rate") == 99999, None).otherwise(col("death_rate")))
hiv_silver = hiv_silver.withColumn("hiv_related_death_rate", 
                       when(col("hiv_related_death_rate") == 99999, None).otherwise(col("hiv_related_death_rate")))
hiv_silver = hiv_silver.withColumn("non_hiv_related_death_rate", 
                       when(col("non_hiv_related_death_rate") == 99999, None).otherwise(col("non_hiv_related_death_rate")))

In [0]:
hiv_silver = hiv_silver.withColumn("age", 
                       when(col("age") == "All", "All Ages").otherwise(col("age")))
hiv_silver = hiv_silver.withColumn("race", 
                       when(col("race") == "All", "All Races").otherwise(col("race")))
hiv_silver = hiv_silver.withColumn("gender", 
                       when(col("gender") == "All", "All Genders").otherwise(col("gender")))



In [0]:
hiv_silver = hiv_silver.withColumn("borough", 
                       when(col("borough") == "All", "Citywide").otherwise(col("borough")))
hiv_silver = hiv_silver.withColumn("uhf", 
                       when(col("uhf") == "All", "All UHF").otherwise(col("uhf")))


In [0]:
hiv_silver = hiv_silver.withColumn("perc_linked_to_care_within_3_months", 
                       when(col("perc_linked_to_care_within_3_months").isNotNull() & 
                            (col("perc_linked_to_care_within_3_months") <= 1),
                            col("perc_linked_to_care_within_3_months") * 100).otherwise(col("perc_linked_to_care_within_3_months")))
hiv_silver = hiv_silver.withColumn("perc_viral_suppression", 
                       when(col("perc_viral_suppression").isNotNull() & 
                            (col("perc_viral_suppression") <= 1),
                            col("perc_viral_suppression") * 100).otherwise(col("perc_viral_suppression")))



In [0]:
int_columns = ["hiv_diagnoses", "concurrent_diagnoses", "aids_diagnoses", "deaths"]
for column in int_columns:
    hiv_silver = hiv_silver.withColumn(column, 
                           when(col(column).isNotNull(), col(column).cast("integer")).otherwise(None))


In [0]:
hiv_silver = hiv_silver.withColumn("race", 
                  when(col("race") == "All", "All Races")
                  .when(col("race") == "Black", "Black")
                  .when(col("race") == "White", "White")
                  .when(col("race") == "Latino/Hispanic", "Hispanic")
                  .when(col("race") == "Latinx/Hispanic", "Hispanic")
                  .when(col("race") == "Asian/Pacific Islander", "Asian")
                  .when(col("race") == "Other/Unknown", "Other")
                  .otherwise(col("race")))

In [0]:
hiv_silver = hiv_silver.withColumn("gender", 
                      when(col("gender") == "All", "All Genders")
                      .when(col("gender").isin("Male", "Men"), "Male")
                      .when(col("gender").isin("Female", "Women"), "Female")
                      .when(col("gender") == "Transgender", "Transgender")
                      .otherwise(col("gender")))


In [0]:
float_columns = ["hiv_diagnosis_rate", "perc_linked_to_care_within_3_months", 
                 "aids_diagnosis_rate", "plwdhi_prevalence", "perc_viral_suppression", 
                 "death_rate", "hiv_related_death_rate", "non_hiv_related_death_rate"]
for column in float_columns:
    hiv_silver = hiv_silver.withColumn(column, 
                           when(col(column).isNotNull(), col(column).cast("double")).otherwise(None))


In [0]:
try:
    write_to_uc_table(
    input_df=hiv_silver,
    table_name="hiv_data_clean",
    mode="overwrite",
    catalog="catalog_de",
    schema="silver"
    )
except Exception as e:
    print(f"Error writing to silver layer: {str(e)}")

Poverty

In [0]:
category_cols_ranges = {
    "cit": (1, 5),         
    "rel": (1, 20),        
    "sch": (1, 3),         
    "schg": (1, 25),       
    "schl": (1, 24),       
    "sex": (1, 2),         
    "esr": (1, 6),         
    "lanx": (1, 2),        
    "eng": (1, 5),        
    "msp": (1, 2),        
    "mar": (1, 5),        
    "wkw": (1, 3),         
    "dis": (1, 2),       
    "jwtr": (1, 12),       
    "ten": (1, 4),        
    "hht": (1, 7),        
    "agecateg": (1, 7),    
    "boro": (1, 5),       
    "citizenstatus": (1, 3),
    "educattain": (1, 5),   
    "ethnicity": (1, 5),   
    "famtype_pu": (1, 6),  
    "ftptwork": (1, 4),     
    "nycgov_pov_stat": (0, 1), 
    "off_pov_stat": (0, 1)  
}

In [0]:
category_cols_mappings = {
    "sex": {
        1: "Male",
        2: "Female"
    },
    "boro": {
        1: "Bronx",
        2: "Brooklyn",
        3: "Manhattan",
        4: "Queens",
        5: "Staten Island"
    },
    "citizenstatus": {
        1: "Citizen, native",
        2: "Citizen, naturalized",
        3: "Non-citizen"
    },
    "educattain": {
        1: "Less than high school",
        2: "High school diploma",
        3: "Some college",
        4: "Bachelor's degree",
        5: "Graduate degree"
    },
    "ethnicity": {
        1: "White",
        2: "Black",
        3: "Asian",
        4: "Hispanic",
        5: "Other"
    },
    "famtype_pu": {
        1: "Married couple",
        2: "Male householder, no spouse",
        3: "Female householder, no spouse",
        4: "Non-family household",
        5: "Individual",
        6: "Other"
    },
    "ftptwork": {
        1: "Full-time, year-round",
        2: "Part-time, year-round",
        3: "Full-time, part-year",
        4: "Part-time, part-year"
    },
    "dis": {
        1: "With disability",
        2: "Without disability"
    },
    "nycgov_pov_stat": {
        0: "Not in poverty",
        1: "In poverty"
    },
    "off_pov_stat": {
        0: "Not in poverty",
        1: "In poverty"
    },
    "ten": {
        1: "Owned with mortgage",
        2: "Owned free and clear",
        3: "Rented",
        4: "Occupied without payment"
    }
}

field_standardization = {
        "famtype_pu": "family_type",
        "ftptwork": "work_experience",
        "totalworkhrs_pu": "total_work_hours"
}

In [0]:
numeric_cols = [
    "pwgtp", "wgtp", "agep", "est_childcare", "est_commuting", "est_eitc", 
    "est_ficatax", "est_heap", "est_housing", "est_incometax", "est_moop", 
    "est_nutrition", "est_povgap", "est_povgapindex", "intp_adj", "mrgp_adj", 
    "nycgov_income", "nycgov_threshold", "off_threshold", "oi_adj", "pa_adj", 
    "pretaxincome_pu", "retp_adj", "rntp_adj", "semp_adj", "ssip_adj", "ssp_adj", 
    "totalworkhrs_pu", "wagp_adj"
]

In [0]:
def process_yearly_df(df, year):
    print(f"Starting processing for {year}")
    if df is None:
        print(f"Cannot process data for year {year}: DataFrame is None")
        return None
    
    print(f"Starting processing for year {year} with columns: {df.columns}")

    original_columns = df.columns
    for col_name in original_columns:
        df = df.withColumnRenamed(col_name, col_name.lower())

    
    initial_count = df.count()
    df = df.dropDuplicates(["serialno", "sporder"])
    final_count = df.count()
    if initial_count != final_count:
        print(f"Removed {initial_count - final_count} duplicate records")

    key_columns = ["serialno", "sporder", "pwgtp", "agep", "sex"]
    for col in key_columns:
        if col in df.columns:
            null_count = df.filter(F.col(col).isNull()).count()
            if null_count > 0:
                print(f"Found {null_count} NULL values in key column '{col}'")
    
    for col, (min_val, max_val) in category_cols_ranges.items():
            if col in df.columns:
                out_of_range_count = df.filter(
                    ~F.col(col).between(min_val, max_val) & 
                    F.col(col).isNotNull()
                ).count()
                
                if out_of_range_count > 0:
                    print(f"Column {col} has {out_of_range_count} values outside expected range ({min_val}-{max_val})")
                df = df.withColumn(
                    col,
                    F.when(
                        F.col(col).between(min_val, max_val) | F.col(col).isNull(),
                        F.col(col)
                    ).otherwise(None)
                )
        
    transformed_cols = []
    for col, value_map in category_cols_mappings.items():
        if col in df.columns:
            mapping_expr = F.col(col)
            for code, value in value_map.items():
                mapping_expr = F.when(F.col(col) == code, value).otherwise(mapping_expr)
                
            df = df.withColumn(col, mapping_expr)
            transformed_cols.append(col)
    print(f"Applied categorical mappings to {len(transformed_cols)} columns")

    field_standardization = {
            "famtype_pu": "family_type",
            "ftptwork": "work_experience",
            "totalworkhrs_pu": "total_work_hours"
        }
    
    for original_field, standardized_field in field_standardization.items():
            if original_field in df.columns:
                df = df.withColumn(standardized_field, F.col(original_field))

    if "dis" in df.columns:
        df = df.withColumn("disability_status", F.col("dis"))
    for col in numeric_cols:
        if col in df.columns:
            can_be_negative = ["semp_adj", "intp_adj"]        
            if col not in can_be_negative:
                df = df.withColumn(
                        col,
                        F.when(F.col(col) < 0, None).otherwise(F.col(col))
                    )
        df = df.withColumn(col, F.col(col).cast(DoubleType()))
    
    print(f"Standardized {len(numeric_cols)} numeric columns")

    df = df.withColumn("age_group", 
            F.when(F.col("agep") < 13, "Under 13")
             .when((F.col("agep") >= 13) & (F.col("agep") <= 19), "13-19")
             .when((F.col("agep") >= 20) & (F.col("agep") <= 29), "20-29")
             .when((F.col("agep") >= 30) & (F.col("agep") <= 39), "30-39")
             .when((F.col("agep") >= 40) & (F.col("agep") <= 49), "40-49")
             .when((F.col("agep") >= 50) & (F.col("agep") <= 59), "50-59")
             .when(F.col("agep") >= 60, "60+")
             .otherwise("Unknown"))
             
    return df

In [0]:
processed_dfs = []
for year in poverty_dfs.keys():
    try:
        processed_df = process_yearly_df(poverty_dfs[year], year)
        
        if processed_df is None:
            print(f"Skipping further processing for year {year} due to None DataFrame")
            continue
            
        core_columns = [
            "year", "serialno", "sporder", "pwgtp", "wgtp", "agep", "age_group", 
            "boro", "sex"
        ]

        optional_columns = [
            "agecateg", "citizenstatus", "educattain", "ethnicity", "family_type", 
            "work_experience", "total_work_hours", "disability_status",
            "est_housingstatus", "nycgov_income", "nycgov_pov_stat", "nycgov_threshold", 
            "off_pov_stat", "off_threshold", "pretaxincome_pu"
        ]

        financial_columns = [
            "est_childcare", "est_commuting", "est_ficatax", "est_heap", 
            "est_housing", "est_incometax", "est_moop", "est_nutrition", 
            "est_povgap", "est_povgapindex"
        ]

        all_columns = core_columns + [col for col in optional_columns + financial_columns 
                                     if col in processed_df.columns]

        processed_df = processed_df.select([col for col in all_columns 
                                           if col in processed_df.columns])

        processed_dfs.append(processed_df)

        print(f"Processed data for year {year}: {processed_df.count()} rows")
    except Exception as e:
        print(f"Error processing data for year {year}: {str(e)}")
        continue

if processed_dfs:
    combined_poverty_df = processed_dfs[0]
    for df in processed_dfs[1:]:
        combined_poverty_df = combined_poverty_df.unionByName(df, allowMissingColumns=True)
    
    print(f"Combined data: {combined_poverty_df.count()} rows")
else:
    print("No valid data to combine")


# WRITE TO Delta

In [0]:
silver_poverty_df = combined_poverty_df

try:
    write_to_uc_table(
        input_df=silver_poverty_df,
        table_name="poverty_measure",
        mode="overwrite",
        catalog="catalog_de",
        schema="silver"  
    )
    
    print(f"Successfully wrote {silver_poverty_df.count()} records to catalog_de.silver.poverty_measure")
except Exception as e:
    print(f"Error writing to silver layer: {str(e)}")


&copy; 2025 Thoughtworks. All rights reserved.<br/>