In [0]:
import pyspark.pipelines as dp
import pyspark.sql.functions as sf
from pyspark.sql.window import Window

# ============================================
# BRONZE LAYER
# ============================================

@dp.table()
def lapd_bronze():
    df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "false") \
        .load("/Volumes/workspace/damg7370/lapd/Cleaned_LAPD.csv")
    
    # Force correct types
    df = df.withColumn("DR_NO", sf.col("DR_NO").cast("string"))
    df = df.withColumn("TIME_OCC", sf.trim(sf.col("TIME_OCC")).cast("string"))
    
    return df

In [0]:
# ============================================
# SILVER LAYER
# ============================================

@dp.table()
@dp.expect_or_drop("dr_no_nn", "DR_NO is not null")
def lapd_silver():
    df = (spark.readStream
          .option("skipChangeCommits", "true")
          .table('lapd_bronze'))
    
    # Dates are already in correct format, just ensure they're date type
    df = df.withColumn("DATE_RPTD", sf.col("DATE_RPTD").cast("date"))
    df = df.withColumn("DATE_OCC", sf.col("DATE_OCC").cast("date"))
    
    # TIME FIX
    df = df.withColumn("TIME_OCC", sf.col("TIME_OCC"))
    df = df.withColumn("TIME_OCC_HOUR", 
        sf.expr("try_cast(substr(TIME_OCC,1,2) AS INT)"))
    
    # Add required columns
    df = df.withColumn("INCIDENT_COUNT", sf.lit(1))
    df = df.withColumn("last_updated", sf.current_timestamp())
    
    return df

In [0]:
# ============================================
# DIMENSION TABLES
# ============================================

# DIM_DATE
dp.create_streaming_table(
    name="dim_date_lapd",
    schema="""
        DATE_KEY bigint generated always as identity (start with 1 increment by 1),
        FULL_DATE date,
        YEAR int,
        QUARTER_NUMBER int,
        MONTH_NUMBER int,
        MONTH_NAME string,
        DAY_OF_MONTH int,
        DAY_OF_WEEK int,
        DAY_OF_WEEK_NAME string,
        last_updated timestamp
    """,
    table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.view()
def gold_dim_date():
    df = spark.readStream.table('lapd_silver')
    
    df = df.selectExpr(
        "DATE_OCC as FULL_DATE",
        "year(DATE_OCC) as YEAR",
        "quarter(DATE_OCC) as QUARTER_NUMBER",
        "month(DATE_OCC) as MONTH_NUMBER",
        "date_format(DATE_OCC, 'MMMM') as MONTH_NAME",
        "dayofmonth(DATE_OCC) as DAY_OF_MONTH",
        "dayofweek(DATE_OCC) as DAY_OF_WEEK",
        "date_format(DATE_OCC, 'EEEE') as DAY_OF_WEEK_NAME",
        "last_updated"
    )
    
    return df

dp.create_auto_cdc_flow(
    target="dim_date_lapd",
    source="gold_dim_date",
    keys=["FULL_DATE"],
    sequence_by="last_updated",
    ignore_null_updates=True
)

# DIM_TIME
dp.create_streaming_table(
    name="dim_time_lapd",
    schema="""
        TIME_KEY bigint generated always as identity (start with 1 increment by 1),
        HOUR int,
        MINUTE int,
        TIME_BAND string,
        last_updated timestamp
    """,
    table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.view()
def gold_dim_time():
    df = spark.readStream.table('lapd_silver')
    
    df = df.filter(sf.col("TIME_OCC").isNotNull() & (sf.col("TIME_OCC") != ""))
    df = df.withColumn("HOUR", sf.split(sf.col("TIME_OCC"), ":")[0].cast("int"))
    df = df.withColumn("MINUTE", sf.split(sf.col("TIME_OCC"), ":")[1].cast("int"))
    
    df = df.withColumn("TIME_BAND",
        sf.when((sf.col("HOUR") >= 5) & (sf.col("HOUR") < 12), "Morning")
        .when((sf.col("HOUR") >= 12) & (sf.col("HOUR") < 17), "Afternoon")
        .when((sf.col("HOUR") >= 17) & (sf.col("HOUR") < 21), "Evening")
        .otherwise("Night"))
    
    df = df.selectExpr("HOUR", "MINUTE", "TIME_BAND", "last_updated")
    
    return df

dp.create_auto_cdc_flow(
    target="dim_time_lapd",
    source="gold_dim_time",
    keys=["HOUR", "MINUTE"],
    sequence_by="last_updated",
    ignore_null_updates=True
)

# DIM_AREA
dp.create_streaming_table(
    name="dim_area_lapd",
    schema="""
        AREA_KEY bigint generated always as identity (start with 1 increment by 1),
        AREA_CODE string,
        AREA_NAME string,
        BUREAU_NAME string,
        last_updated timestamp
    """,
    table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.view()
def gold_dim_area():
    df = spark.readStream.table('lapd_silver')
    
    df = df.withColumn("AREA_CODE", sf.col("AREA").cast("string"))
    df = df.withColumn("BUREAU_NAME",
        sf.when(sf.col("AREA").isin(1,2,3,4,5,6), "Central Bureau")
        .when(sf.col("AREA").isin(7,8,9,10,17), "Valley Bureau")
        .when(sf.col("AREA").isin(11,14,15,16), "West Bureau")
        .when(sf.col("AREA").isin(12,13,18), "South Bureau")
        .otherwise("Unknown Bureau"))
    
    df = df.selectExpr("AREA_CODE", "AREA_NAME", "BUREAU_NAME", "last_updated")
    
    return df

dp.create_auto_cdc_flow(
    target="dim_area_lapd",
    source="gold_dim_area",
    keys=["AREA_CODE"],
    sequence_by="last_updated",
    ignore_null_updates=True
)

# DIM_STATUS
dp.create_streaming_table(
    name="dim_status_lapd",
    schema="""
        STATUS_KEY bigint generated always as identity (start with 1 increment by 1),
        STATUS_CODE string,
        STATUS_DESC string,
        last_updated timestamp
    """,
    table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.view()
def gold_dim_status():
    df = spark.readStream.table('lapd_silver')
    df = df.selectExpr("STATUS as STATUS_CODE", "STATUS_DESC", "last_updated")
    return df

dp.create_auto_cdc_flow(
    target="dim_status_lapd",
    source="gold_dim_status",
    keys=["STATUS_CODE"],
    sequence_by="last_updated",
    ignore_null_updates=True
)

# DIM_WEAPON
dp.create_streaming_table(
    name="dim_weapon_lapd",
    schema="""
        WEAPON_KEY bigint generated always as identity (start with 1 increment by 1),
        WEAPON_USED_CD string,
        WEAPON_DESC string,
        WEAPON_GROUP string,
        last_updated timestamp
    """,
    table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.view()
def gold_dim_weapon():
    df = spark.readStream.table('lapd_silver')
    
    df = df.withColumn("WEAPON_GROUP",
        sf.when(sf.col("WEAPON_DESC").like("%GUN%"), "Firearm")
        .when(sf.col("WEAPON_DESC").like("%KNIFE%"), "Sharp Object")
        .when(sf.col("WEAPON_DESC").like("%BLUNT%"), "Blunt Object")
        .when(sf.col("WEAPON_DESC").like("%STRANGLE%"), "Strangulation")
        .otherwise("Other / Unknown"))
    
    df = df.selectExpr("WEAPON_USED_CD", "WEAPON_DESC", "WEAPON_GROUP", "last_updated")
    return df

dp.create_auto_cdc_flow(
    target="dim_weapon_lapd",
    source="gold_dim_weapon",
    keys=["WEAPON_USED_CD"],
    sequence_by="last_updated",
    ignore_null_updates=True
)

# DIM_PREMISE
dp.create_streaming_table(
    name="dim_premise_lapd",
    schema="""
        PREMISE_KEY bigint generated always as identity (start with 1 increment by 1),
        PREMIS_CD string,
        PREMIS_DESC string,
        PREMISE_CATEGORY string,
        last_updated timestamp
    """,
    table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.view()
def gold_dim_premise():
    df = spark.readStream.table('lapd_silver')
    
    df = df.withColumn("PREMISE_CATEGORY",
        sf.when(sf.col("PREMIS_DESC").rlike("(?i)PARKING|GARAGE"), "Outside")
        .when(sf.col("PREMIS_DESC").rlike("(?i)SINGLE|MULTI|APARTMENT|RESIDENCE|HOME"), "Residence")
        .when(sf.col("PREMIS_DESC").rlike("(?i)STORE|MARKET|SHOP|BUSINESS|OFFICE|BANK"), "Commercial")
        .otherwise("Other"))
    
    df = df.selectExpr("PREMIS_CD", "PREMIS_DESC", "PREMISE_CATEGORY", "last_updated")
    return df

dp.create_auto_cdc_flow(
    target="dim_premise_lapd",
    source="gold_dim_premise",
    keys=["PREMIS_CD"],
    sequence_by="last_updated",
    ignore_null_updates=True
)

# DIM_CRIME
dp.create_streaming_table(
    name="dim_crime_lapd",
    schema="""
        CRIME_KEY bigint generated always as identity (start with 1 increment by 1),
        CRM_CD_1 string,
        CRM_CD_2 string,
        CRM_CD_3 string,
        CRM_CD_4 string,
        CRM_CD_DESC string,
        PART_1_2 string,
        CRIME_CATEGORY string,
        CRIME_SUBCATEGORY string,
        last_updated timestamp
    """,
    table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.view()
def gold_dim_crime():
    df = spark.readStream.table('lapd_silver')
    
    df = df.withColumn("CRIME_CATEGORY",
        sf.when(sf.col("CRM_CD_DESC").rlike("(?i)ASSAULT|BATTERY"), "Assault & Battery")
        .when(sf.col("CRM_CD_DESC").rlike("(?i)ROBBERY"), "Robbery")
        .when(sf.col("CRM_CD_DESC").rlike("(?i)BURGLARY"), "Burglary")
        .when(sf.col("CRM_CD_DESC").rlike("(?i)THEFT|LARCENY|SHOPLIFTING"), "Theft & Larceny")
        .when(sf.col("CRM_CD_DESC").rlike("(?i)FRAUD|ID THEFT"), "Fraud & Identity")
        .when(sf.col("CRM_CD_DESC").rlike("(?i)HOMICIDE|MURDER"), "Homicide")
        .when(sf.col("CRM_CD_DESC").rlike("(?i)SEXUAL"), "Sexual Crimes")
        .when(sf.col("CRM_CD_DESC").rlike("(?i)ARSON"), "Arson")
        .when(sf.col("CRM_CD_DESC").rlike("(?i)WEAPON"), "Weapons Offense")
        .otherwise("Other / Miscellaneous"))
    
    df = df.withColumn("CRIME_SUBCATEGORY",
        sf.when(sf.col("CRM_CD_DESC").rlike("(?i)SIMPLE ASSAULT"), "Simple Assault")
        .when(sf.col("CRM_CD_DESC").rlike("(?i)AGGRAVATED ASSAULT"), "Aggravated Assault")
        .when(sf.col("CRM_CD_DESC").rlike("(?i)STRONG-ARM"), "Strong-Arm Robbery")
        .when(sf.col("CRM_CD_DESC").rlike("(?i)BURGLARY"), "Burglary")
        .when(sf.col("CRM_CD_DESC").rlike("(?i)AUTO THEFT|VEHICLE - STOLEN"), "Auto Theft")
        .when(sf.col("CRM_CD_DESC").rlike("(?i)SHOPLIFTING"), "Shoplifting")
        .when(sf.col("CRM_CD_DESC").rlike("(?i)FRAUD|ID THEFT"), "Fraud / Identity Theft")
        .otherwise("General Offense"))
    
    df = df.selectExpr("CRM_CD_1", "CRM_CD_2", "CRM_CD_3", "CRM_CD_4", "CRM_CD_DESC", "PART_1_2", "CRIME_CATEGORY", "CRIME_SUBCATEGORY", "last_updated")
    return df

dp.create_auto_cdc_flow(
    target="dim_crime_lapd",
    source="gold_dim_crime",
    keys=["CRM_CD_1"],
    sequence_by="last_updated",
    ignore_null_updates=True
)

# DIM_MODUS_OPERANDI
dp.create_streaming_table(
    name="dim_modus_operandi_lapd",
    schema="""
        MODUS_OPERANDI_KEY bigint generated always as identity (start with 1 increment by 1),
        MOCODES string,
        MO_DESC string,
        MO_TYPE string,
        last_updated timestamp
    """,
    table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.view()
def gold_dim_modus_operandi():
    df = spark.readStream.table('lapd_silver')
    
    df = df.filter(sf.col("MOCODES").isNotNull())
    
    df = df.withColumn("MO_DESC",
        sf.when(sf.col("MOCODES").like("%00%"), "General Behavior")
        .when(sf.col("MOCODES").like("%10%"), "Threat or Intimidation")
        .when(sf.col("MOCODES").like("%20%"), "Physical Force")
        .when(sf.col("MOCODES").like("%30%"), "Weapon Use")
        .when(sf.col("MOCODES").like("%40%"), "Entry or Break-in")
        .otherwise("Other / Unknown"))
    
    df = df.withColumn("MO_TYPE",
        sf.when(sf.col("MO_DESC").contains("Threat"), "Threatening")
        .when(sf.col("MO_DESC").contains("Force"), "Force")
        .when(sf.col("MO_DESC").contains("Weapon"), "Weapon-related")
        .when(sf.col("MO_DESC").contains("Entry"), "Entry-related")
        .otherwise("General"))
    
    df = df.selectExpr("MOCODES", "MO_DESC", "MO_TYPE", "last_updated")
    return df

dp.create_auto_cdc_flow(
    target="dim_modus_operandi_lapd",
    source="gold_dim_modus_operandi",
    keys=["MOCODES"],
    sequence_by="last_updated",
    ignore_null_updates=True
)

# DIM_LOCATION
dp.create_streaming_table(
    name="dim_location_lapd",
    schema="""
        LOCATION_KEY bigint generated always as identity (start with 1 increment by 1),
        RPT_DIST_NO string,
        AREA string,
        LOCATION string,
        CROSS_STREET string,
        LAT double,
        LON double,
        last_updated timestamp
    """,
    table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.view()
def gold_dim_location():
    df = spark.readStream.table('lapd_silver')
    
    # CAST LAT and LON to double (THIS IS THE FIX)
    df = df.withColumn("LAT", sf.col("LAT").cast("double"))
    df = df.withColumn("LON", sf.col("LON").cast("double"))
    
    df = df.selectExpr("RPT_DIST_NO", "AREA", "LOCATION", "CROSS_STREET", "LAT", "LON", "last_updated")
    return df

dp.create_auto_cdc_flow(
    target="dim_location_lapd",
    source="gold_dim_location",
    keys=["RPT_DIST_NO", "AREA"],
    sequence_by="last_updated",
    ignore_null_updates=True
)

# DIM_VICTIM
dp.create_streaming_table(
    name="dim_victim_lapd",
    schema="""
        VICTIM_KEY bigint generated always as identity (start with 1 increment by 1),
        VICTIM_AGE_RAW int,
        VICTIM_SEX string,
        VICTIM_DESCENT string,
        VICTIM_AGE_GRP string,
        VICTIM_DESCENT_DESC string,
        last_updated timestamp
    """,
    table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.view()
def gold_dim_victim():
    df = spark.readStream.table('lapd_silver')
    
    # Descent mapping
    descent_mapping = sf.create_map(
        sf.lit("W"), sf.lit("White"),
        sf.lit("H"), sf.lit("Hispanic/Latin/Mexican"),
        sf.lit("B"), sf.lit("Black/African American"),
        sf.lit("A"), sf.lit("Other Asian"),
        sf.lit("C"), sf.lit("Chinese"),
        sf.lit("D"), sf.lit("Cambodian"),
        sf.lit("F"), sf.lit("Filipino"),
        sf.lit("G"), sf.lit("Guamanian"),
        sf.lit("I"), sf.lit("American Indian/Alaskan Native"),
        sf.lit("J"), sf.lit("Japanese"),
        sf.lit("K"), sf.lit("Korean"),
        sf.lit("L"), sf.lit("Laotian"),
        sf.lit("P"), sf.lit("Pacific Islander"),
        sf.lit("S"), sf.lit("Samoan"),
        sf.lit("U"), sf.lit("Unknown"),
        sf.lit("V"), sf.lit("Vietnamese"),
        sf.lit("Z"), sf.lit("Asian Indian"),
        sf.lit("X"), sf.lit("Other")
    )
    
    df = df.withColumn("VICTIM_AGE_RAW", sf.col("VICT_AGE").cast("int"))
    df = df.withColumn("VICTIM_SEX", sf.col("VICT_SEX"))
    df = df.withColumn("VICTIM_DESCENT", sf.col("VICT_DESCENT"))
    
    df = df.withColumn("VICTIM_AGE_GRP",
        sf.when(sf.col("VICT_AGE").between(0, 12), "Child")
        .when(sf.col("VICT_AGE").between(13, 19), "Teen")
        .when(sf.col("VICT_AGE").between(20, 35), "Young Adult")
        .when(sf.col("VICT_AGE").between(36, 55), "Adult")
        .when(sf.col("VICT_AGE") > 55, "Senior")
        .otherwise("Unknown"))
    
    df = df.withColumn("VICTIM_DESCENT_DESC", descent_mapping[sf.col("VICT_DESCENT")])

    df = df.selectExpr("VICTIM_AGE_RAW", "VICTIM_SEX", "VICTIM_DESCENT", "VICTIM_AGE_GRP", "VICTIM_DESCENT_DESC", "last_updated")
    return df

dp.create_auto_cdc_flow(
    target="dim_victim_lapd",
    source="gold_dim_victim",
    keys=["VICTIM_AGE_RAW", "VICTIM_SEX", "VICTIM_DESCENT"],
    sequence_by="last_updated",
    ignore_null_updates=True
)

In [0]:
# ============================================
# FACT TABLE
# ============================================
dp.create_streaming_table(
    name="fact_crime_incident_lapd",
    schema="""
        CRIME_INCIDENT_KEY bigint generated always as identity (start with 1 increment by 1),
        DR_NO string,
        AREA_KEY bigint,
        LOCATION_KEY bigint,
        CRIME_KEY bigint,
        VICTIM_KEY bigint,
        PREMISE_KEY bigint,
        STATUS_KEY bigint,
        WEAPON_KEY bigint,
        TIME_OCC_KEY bigint,
        DATE_RPT_KEY bigint,
        DATE_OCC_KEY bigint,
        MODUS_OPERANDI_KEY bigint,
        INCIDENT_COUNT int,
        created_at timestamp
    """,
    table_properties={"delta.enableChangeDataFeed": "true"}
)

@dp.append_flow(
    target="fact_crime_incident_lapd",
    name="fact_crime_incident_flow"
)
def gold_fact_crime_incident():
    df = spark.sql("""
        SELECT
            s.DR_NO,
            COALESCE(a.AREA_KEY, -1) as AREA_KEY,
            COALESCE(l.LOCATION_KEY, -1) as LOCATION_KEY,
            COALESCE(c.CRIME_KEY, -1) as CRIME_KEY,
            COALESCE(v.VICTIM_KEY, -1) as VICTIM_KEY,
            COALESCE(p.PREMISE_KEY, -1) as PREMISE_KEY,
            COALESCE(st.STATUS_KEY, -1) as STATUS_KEY,
            COALESCE(w.WEAPON_KEY, -1) as WEAPON_KEY,
            COALESCE(t.TIME_KEY, -1) as TIME_OCC_KEY,
            COALESCE(dr.DATE_KEY, -1) as DATE_RPT_KEY,
            COALESCE(do.DATE_KEY, -1) as DATE_OCC_KEY,
            COALESCE(mo.MODUS_OPERANDI_KEY, -1) as MODUS_OPERANDI_KEY,
            1 as INCIDENT_COUNT,
            s.last_updated as created_at
        FROM STREAM(lapd_silver) s
        LEFT JOIN dim_area_lapd a ON s.AREA = a.AREA_CODE
        LEFT JOIN dim_crime_lapd c ON s.CRM_CD_1 = c.CRM_CD_1
        LEFT JOIN dim_premise_lapd p ON s.PREMIS_CD = p.PREMIS_CD
        LEFT JOIN dim_status_lapd st ON s.STATUS = st.STATUS_CODE
        LEFT JOIN dim_weapon_lapd w ON s.WEAPON_USED_CD = w.WEAPON_USED_CD
        LEFT JOIN dim_time_lapd t ON s.TIME_OCC_HOUR = t.HOUR
        LEFT JOIN dim_date_lapd dr ON s.DATE_RPTD = dr.FULL_DATE
        LEFT JOIN dim_date_lapd do ON s.DATE_OCC = do.FULL_DATE
        LEFT JOIN dim_modus_operandi_lapd mo ON s.MOCODES = mo.MOCODES
        LEFT JOIN dim_location_lapd l ON s.RPT_DIST_NO = l.RPT_DIST_NO AND s.AREA = l.AREA
        LEFT JOIN dim_victim_lapd v ON cast(s.VICT_AGE as int) = v.VICTIM_AGE_RAW 
            AND s.VICT_SEX = v.VICTIM_SEX 
            AND s.VICT_DESCENT = v.VICTIM_DESCENT
    """)
    return df