In [0]:
from pyspark.sql.functions import col, lower, when, coalesce

# 1. Standardize User Profile (Fixing your M/F/null issue)
df_usr_clean = df_usr.select(
    "user_id", "age_range", "state", "city", "phone_price_range",
    when(lower(col("gender")).isin("m", "male"), "Male")
    .when(lower(col("gender")).isin("f", "female"), "Female")
    .otherwise("Other/Unknown").alias("gender")
)

# 2. Perform the Master Join
unified_view = (df_funnel.alias("f")
    .join(df_usr_clean.alias("u"), "user_id", "left")
    .join(df_genre.alias("g"), "user_id", "left")
    .join(df_camp.alias("c"), "campaign_id", "left")
    .select(
        "f.id_md5", "f.timestamp", "f.hour_of_day", "f.is_impression", "f.is_click",
        "u.gender", "u.age_range", "u.state", "u.city",
        "g.primary_genre", "c.campaign_type", "c.billing_rate"
    ))

unified_view.write.mode("overwrite").saveAsTable(f"{schema}.gold_unified_dashboard")

In [0]:
from pyspark.sql.functions import col, upper, lower, when, coalesce, lit

# 1. Load Tables
schema = "workspace.ad_tables"
df_req = spark.read.table(f"{schema}.requests")
df_funnel = spark.read.table(f"{schema}.silver_ad_funnel")
df_profile = spark.read.table(f"{schema}.user_profile")
df_app = spark.read.table(f"{schema}.user_app_genre")
df_camp = spark.read.table(f"{schema}.campaigns")

# 2. Pre-Clean the Profile Table (The source of your Demo/Geo data)
df_profile_clean = df_profile.select(
    col("user_id").alias("u_user_id"), # Rename to avoid join collision
    coalesce(upper(col("state")), lit("UNKNOWN")).alias("clean_state"),
    coalesce(col("age_range"), lit("Unknown-Age")).alias("clean_age"),
    when(lower(col("gender")).isin("m", "male"), "Male")
    .when(lower(col("gender")).isin("f", "female"), "Female")
    .otherwise("Unknown-Gender").alias("clean_gender")
)

# 3. Create the Unified View with Left Joins
# We join the Funnel to the Profile, App Interests, and Campaigns
gold_unified = (df_funnel.alias("f")
    .join(df_profile_clean, df_funnel.user_id == df_profile_clean.u_user_id, "left")
    .join(df_app.alias("g"), "user_id", "left")
    .join(df_camp.alias("c"), "campaign_id", "left")
    .select(
        "f.id_md5", 
        "f.timestamp", 
        "f.hour_of_day",
        col("clean_state").alias("state"),
        col("clean_age").alias("age_range"),
        col("clean_gender").alias("gender"),
        coalesce(col("g.primary_genre"), lit("General")).alias("primary_genre"),
        "c.campaign_type", 
        "c.billing_rate", 
        "f.is_impression", 
        "f.is_click"
    ))

# 4. Save and Overwrite
gold_unified.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"{schema}.gold_unified_dashboard")

print("Fixed! Your 'gold_unified_dashboard' table is now clean with NO nulls in Geo/Demo columns.")