In [0]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import current_timestamp

# --- Dallas Bronze ---
@dlt.table(
    name="midterm_project.bronze.dallas_bronze",
    comment="Raw Dallas inspection data"
)
def dallas_bronze():
    df = (
        spark.read.format("csv")
        .option("header", True)
        .option("delimiter", "\t")
        .option("multiLine", True)
        .load("/Volumes/midterm_project/raw/d_store/Dallas_Raw.tsv")
        .withColumn("source_city", lit("Dallas"))
        .withColumn("load_dt", current_timestamp())
    )
    # Sanitize column names: replace spaces with underscores
    df = df.toDF(*[c.replace(" ", "_") for c in df.columns])
    return df


In [0]:
import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from functools import reduce
import operator


@dlt.table(
    name="midterm_project.silver.silver_dallas",
    comment="Cleansed and standardized Dallas Food Inspection Data (Silver Layer)",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("not_null_business_name", "BUSINESS_NAME IS NOT NULL")
@dlt.expect_or_drop("not_null_inspection_date", "INSPECTION_DATE IS NOT NULL")
@dlt.expect_or_drop("not_null_inspection_type", "INSPECTION_TYPE IS NOT NULL")
@dlt.expect_or_drop("valid_zip_code", "ZIP_CODE IS NOT NULL AND LENGTH(ZIP_CODE) = 5")
@dlt.expect_or_drop("score_within_range", "INSPECTION_SCORE >= 0 AND INSPECTION_SCORE <= 100")
@dlt.expect_or_drop("high_score_low_violations", "INSPECTION_SCORE < 90 OR VIOLATION_COUNT <= 3")
@dlt.expect_or_drop("min_one_violation", "VIOLATION_COUNT >= 1")
@dlt.expect_or_drop("no_pass_with_critical_urgent", "NOT (INSPECTION_SCORE >= 70 AND HAS_CRITICAL_URGENT = TRUE)")
@dlt.expect("unique_inspection_id", "INSPECTION_ID IS NOT NULL")
def silver_dallas():
    """
    Transform Dallas inspections from Bronze to Silver with comprehensive cleaning
    Drops rows where SCORE >= 70 AND any violation contains "CRITICAL" or "URGENT"
    """
    
    df = dlt.read("dallas_bronze")
    
    # STEP 1: RENAME COLUMNS - Remove spaces and standardize
    column_mapping = {col: col.strip().replace(' ', '_').upper() for col in df.columns}
    for old_col, new_col in column_mapping.items():
        df = df.withColumnRenamed(old_col, new_col)
    
    # STEP 2: PARSE LAT_LONG_LOCATION - Replace non-Dallas coords with ZIP center
    lat_long_col = None
    for col_name in ['LAT_LONG_LOCATION', 'LOCATION', 'LAT_LONG', 'GEOLOCATION']:
        if col_name in df.columns:
            lat_long_col = col_name
            break
    
    if lat_long_col:
        df = df.withColumn("LATITUDE_TEMP", F.regexp_extract(F.col(lat_long_col), r'\(([0-9.-]+),\s*([0-9.-]+)\)', 1).cast(DoubleType()))
        df = df.withColumn("LONGITUDE_TEMP", F.regexp_extract(F.col(lat_long_col), r'\(([0-9.-]+),\s*([0-9.-]+)\)', 2).cast(DoubleType()))
        
        zip_window = Window.partitionBy("ZIP_CODE")
        df = df.withColumn("ZIP_LAT_MEAN", F.avg("LATITUDE_TEMP").over(zip_window))
        df = df.withColumn("ZIP_LON_MEAN", F.avg("LONGITUDE_TEMP").over(zip_window))
        
        df = df.withColumn("LATITUDE",
                          F.when((F.col("LATITUDE_TEMP").isNull()) | (F.col("LATITUDE_TEMP") < 32.5) | (F.col("LATITUDE_TEMP") > 33.2),
                                F.coalesce(F.col("ZIP_LAT_MEAN"), F.lit(32.7767))).otherwise(F.col("LATITUDE_TEMP")))
        
        df = df.withColumn("LONGITUDE",
                          F.when((F.col("LONGITUDE_TEMP").isNull()) | (F.col("LONGITUDE_TEMP") < -97.5) | (F.col("LONGITUDE_TEMP") > -96.5),
                                F.coalesce(F.col("ZIP_LON_MEAN"), F.lit(-96.7970))).otherwise(F.col("LONGITUDE_TEMP")))
        
        df = df.drop("LATITUDE_TEMP", "LONGITUDE_TEMP", "ZIP_LAT_MEAN", "ZIP_LON_MEAN")
    else:
        df = df.withColumn("LATITUDE", F.lit(32.7767))
        df = df.withColumn("LONGITUDE", F.lit(-96.7970))
    
    # STEP 3: DATA TYPE CONVERSIONS
    if 'ZIP_CODE' in df.columns:
        df = df.withColumn("ZIP_CODE", 
                          F.when(F.col("ZIP_CODE").isNotNull(),
                                F.lpad(F.regexp_replace(F.col("ZIP_CODE").cast("string"), r'[^\d]', ''), 5, '0').cast(IntegerType()))
                          .otherwise(None))
    
    if 'STREET_NUMBER' in df.columns:
        df = df.withColumn("STREET_NUMBER",
                          F.when(F.col("STREET_NUMBER").isNotNull(),
                                F.regexp_replace(F.col("STREET_NUMBER").cast("string"), r'[^\d]', '').cast(IntegerType()))
                          .otherwise(None))
    
    if 'INSPECTION_DATE' in df.columns:
        df = df.withColumn("INSPECTION_DATE", F.to_date(F.col("INSPECTION_DATE")))
    
    score_found = False
    for score_col in ['SCORE', 'INSPECTION_SCORE', 'TOTAL_SCORE']:
        if score_col in df.columns:
            df = df.withColumn("INSPECTION_SCORE", F.coalesce(F.col(score_col).cast(IntegerType()), F.lit(0)))
            score_found = True
            break
    if not score_found:
        df = df.withColumn("INSPECTION_SCORE", F.lit(0))
    
    # STEP 4: HANDLE STREET_ADDRESS
    address_col = None
    for col_name in ['STREET_ADDRESS', 'ADDRESS_LINE1', 'ADDRESS', 'ADDRESS_LINE_1', 'STREET_ADDR']:
        if col_name in df.columns:
            address_col = col_name
            break
    
    if address_col:
        if address_col != 'STREET_ADDRESS':
            df = df.withColumnRenamed(address_col, 'STREET_ADDRESS')
        df = df.withColumn("STREET_ADDRESS",
                          F.when((F.col("STREET_ADDRESS").isNull()) | (F.trim(F.col("STREET_ADDRESS")) == ""),
                                F.concat(F.lit("Address in ZIP "), F.col("ZIP_CODE"))).otherwise(F.col("STREET_ADDRESS")))
    else:
        df = df.withColumn("STREET_ADDRESS", F.concat(F.lit("Address in ZIP "), F.col("ZIP_CODE")))
    
    # STEP 5: RENAME RESTAURANT_NAME TO BUSINESS_NAME AND CREATE AKA_NAME
    restaurant_col = None
    for col_name in ['RESTAURANT_NAME', 'BUSINESS_NAME', 'FACILITY_NAME', 'ESTABLISHMENT_NAME']:
        if col_name in df.columns:
            restaurant_col = col_name
            break
    
    if restaurant_col and restaurant_col != 'BUSINESS_NAME':
        df = df.withColumnRenamed(restaurant_col, 'BUSINESS_NAME')
    elif not restaurant_col:
        df = df.withColumn('BUSINESS_NAME', F.lit('Unknown'))
    
    # Create AKA_NAME as copy of BUSINESS_NAME
    df = df.withColumn("AKA_NAME", F.col("BUSINESS_NAME"))
    
    # Add FACILITY_TYPE column with value "Restaurant"
    df = df.withColumn("FACILITY_TYPE", F.lit("Restaurant"))
    
    # Add STATE column with value "TX"
    df = df.withColumn("STATE", F.lit("TX"))
    
    # Add CITY column with value "Dallas"
    df = df.withColumn("CITY", F.lit("Dallas"))
    
    # STEP 6: REMOVE UNNECESSARY COLUMNS
    columns_to_drop = ['INSPECTION_MONTH', 'INSPECTION_YEAR', 'ADDRESS_LINE1']
    for col in columns_to_drop:
        if col in df.columns:
            df = df.drop(col)
    
    # STEP 7: HANDLE NULL VALUES
    string_null_replacement = {
        'BUSINESS_NAME': 'Unknown', 'AKA_NAME': 'Unknown', 'STREET_ADDRESS': 'Not Available', 
        'ADDRESS_LINE2': '', 'CITY': 'Dallas', 'STATE': 'TX', 
        'FACILITY_TYPE': 'Restaurant', 'INSPECTION_TYPE': 'Unknown'
    }
    for col, replacement in string_null_replacement.items():
        if col in df.columns:
            df = df.withColumn(col, F.when((F.col(col).isNull()) | (F.trim(F.col(col)) == ""), F.lit(replacement)).otherwise(F.col(col)))
    
    numeric_null_replacement = {'INSPECTION_SCORE': 0}
    for col, replacement in numeric_null_replacement.items():
        if col in df.columns:
            df = df.withColumn(col, F.coalesce(F.col(col), F.lit(replacement)))
    
    # STEP 8: COUNT VIOLATIONS
    violation_desc_cols = [col for col in df.columns if 'VIOLATION_DESCRIPTION' in col]
    if violation_desc_cols:
        violation_count_expr = reduce(operator.add,
            [F.when((F.col(col).isNotNull()) & (F.trim(F.col(col)) != "") & (F.trim(F.col(col)) != "N/A - Not Applicable"), F.lit(1)).otherwise(F.lit(0))
             for col in violation_desc_cols])
        df = df.withColumn("VIOLATION_COUNT", violation_count_expr)
    else:
        df = df.withColumn("VIOLATION_COUNT", F.lit(0))
    
    # STEP 9: CHECK FOR "CRITICAL" OR "URGENT" IN ANY VIOLATION COLUMN
    violation_all_cols = [col for col in df.columns if 'VIOLATION_DESCRIPTION' in col or 'VIOLATION_DETAIL' in col or 'VIOLATION_MEMO' in col]
    
    if violation_all_cols:
        has_critical_urgent_conditions = []
        for col in violation_all_cols:
            has_critical_urgent_conditions.append(
                (F.upper(F.col(col)).contains("CRITICAL")) |
                (F.upper(F.col(col)).contains("URGENT"))
            )
        
        combined_condition = reduce(operator.or_, has_critical_urgent_conditions)
        df = df.withColumn("HAS_CRITICAL_URGENT", F.when(combined_condition, F.lit(True)).otherwise(F.lit(False)))
    else:
        df = df.withColumn("HAS_CRITICAL_URGENT", F.lit(False))
    
    # STEP 10: CREATE LOCATION COLUMN
    df = df.withColumn("LOCATION", F.concat(F.lit("("), F.col("LATITUDE").cast("string"), F.lit(", "), F.col("LONGITUDE").cast("string"), F.lit(")")))
    df = df.drop("LATITUDE", "LONGITUDE")
    
    # STEP 11: CALCULATE RISK LEVEL (keeping this for now as it may be used elsewhere)
    df = df.withColumn("RISK_LEVEL",
                      F.when((F.col("INSPECTION_SCORE") < 70) | (F.col("VIOLATION_COUNT") >= 5), F.lit("HIGH"))
                      .when((F.col("INSPECTION_SCORE") >= 70) & (F.col("INSPECTION_SCORE") <= 85) | ((F.col("VIOLATION_COUNT") >= 2) & (F.col("VIOLATION_COUNT") < 5)), F.lit("MEDIUM"))
                      .otherwise(F.lit("LOW")))
    
    # STEP 12: REMOVE DUPLICATES
    dedup_columns = ['BUSINESS_NAME', 'INSPECTION_DATE', 'STREET_ADDRESS', 'ZIP_CODE']
    dedup_columns = [col for col in dedup_columns if col in df.columns]
    if dedup_columns:
        df = df.dropDuplicates(dedup_columns)
    
    # STEP 13: CREATE LICENSE_NUMBER AND UNIQUE INSPECTION_ID
    # Generate LICENSE_NUMBER as a 9-digit unique identifier for each business
    df = df.withColumn("ROW_ID", F.monotonically_increasing_id())
    df = df.withColumn("LICENSE_NUMBER",
                      F.lpad(
                          (F.abs(F.crc32(F.concat_ws("_", F.col("BUSINESS_NAME"), F.col("ZIP_CODE").cast("string"), F.col("STREET_ADDRESS")))) % 1000000000).cast("string"),
                          9,
                          '0'
                      ).cast(LongType()))
    
    df = df.withColumn("INSPECTION_ID",
                      F.abs(F.crc32(F.concat_ws("_", F.col("BUSINESS_NAME"), F.col("INSPECTION_DATE").cast("string"),
                                               F.col("ZIP_CODE").cast("string"), F.col("STREET_ADDRESS"), F.col("ROW_ID").cast("string")))))
    df = df.drop("ROW_ID")
    
    # STEP 14: ADD METADATA
    df = df.withColumn("source_system", F.lit("Dallas_OpenData"))
    df = df.withColumn("pipeline_run_id", F.current_timestamp())
    
    # STEP 15: REORDER COLUMNS (temporary - will be cleaned up in violations table)
    ordered_cols = ["INSPECTION_ID", "LICENSE_NUMBER", "BUSINESS_NAME", "AKA_NAME", "FACILITY_TYPE", "INSPECTION_DATE", 
                   "INSPECTION_TYPE", "STREET_ADDRESS", "CITY", "STATE", "ZIP_CODE", "LOCATION", 
                   "INSPECTION_SCORE", "VIOLATION_COUNT", "RISK_LEVEL", "HAS_CRITICAL_URGENT"]
    remaining_cols = [col for col in df.columns if col not in ordered_cols]
    final_cols = [col for col in ordered_cols if col in df.columns] + remaining_cols
    df = df.select(*final_cols)
    
    return df


@dlt.table(
    name="midterm_project.silver.dallas_violations_silver",
    comment="Normalized and cleaned violation records - ONE ROW PER VIOLATION with SHARED INSPECTION_ID",
    table_properties={"quality": "silver", "pipelines.autoOptimize.zOrderCols": "inspection_id"}
)
@dlt.expect_or_drop("valid_violation", "VIOLATION_DESCRIPTION != 'N/A - Not Applicable' AND VIOLATION_DESCRIPTION != 'Unknown' AND VIOLATION_DESCRIPTION != ''")
@dlt.expect_or_drop("not_null_violation_desc", "VIOLATION_DESCRIPTION IS NOT NULL")
@dlt.expect("unique_inspection_id", "INSPECTION_ID IS NOT NULL")
def dallas_violations_silver():
    """
    EXPLODED VIEW: One row per violation with final schema
    - Drops columns: VIOLATION_COUNT, VIOLATION_ID, HAS_CRITICAL_URGENT, RISK_SCORE, 
                     INGESTION_TIMESTAMP, VIOLATION_DETAIL, VIOLATION_MEMO, VIOLATION_POINTS, VIOLATION_NUMBER
    - Renames: VIOLATION_CODE_EXTRACTED -> VIOLATION_CODE
    - Keeps only VIOLATION_DESCRIPTION (uppercase from VIOLATION_DESCRIPTION_NEW)
    - Adds INSPECTION_RESULT based on score ranges
    - Includes LICENSE_NUMBER
    """
    df = dlt.read("midterm_project.silver.silver_dallas")
    
    if "LOCATION" not in df.columns:
        raise Exception("LOCATION column not found in silver_dallas!")
    
    df_cols = set(df.columns)
    violation_arrays = []
    
    for i in range(1, 26):
        violation_number = F.lit(i)
        violation_desc_col = f"VIOLATION_DESCRIPTION_-_{i}"
        violation_detail_col = f"VIOLATION_DETAIL_-_{i}"
        violation_memo_col = f"VIOLATION_MEMO_-_{i}"
        violation_points_col = f"VIOLATION_POINTS_-_{i}"

        if violation_desc_col in df_cols:
            violation_desc = F.when((F.trim(F.col(violation_desc_col)) == "") | (F.trim(F.col(violation_desc_col)) == "N/A - Not Applicable") | (F.col(violation_desc_col).isNull()),
                                   F.lit("Unknown")).otherwise(F.col(violation_desc_col))
        else:
            violation_desc = F.lit("N/A - Not Applicable")
        
        if violation_detail_col in df_cols:
            violation_details = F.when((F.trim(F.col(violation_detail_col)) == "") | (F.col(violation_detail_col).isNull()),
                                      F.lit("Unknown")).otherwise(F.col(violation_detail_col))
        else:
            violation_details = F.lit("Unknown")
        
        if violation_memo_col in df_cols:
            violation_memo = F.when((F.trim(F.col(violation_memo_col)) == "") | (F.col(violation_memo_col).isNull()),
                                   F.lit("Unknown")).otherwise(F.col(violation_memo_col))
        else:
            violation_memo = F.lit("Unknown")
        
        if violation_points_col in df_cols:
            violation_points = F.coalesce(F.col(violation_points_col).cast(IntegerType()), F.lit(-1))
        else:
            violation_points = F.lit(-1)

        violation_struct = F.struct(
            violation_number.alias("violation_number"),
            violation_desc.alias("violation_description"),
            violation_details.alias("violation_details"),
            violation_memo.alias("violation_memo"),
            violation_points.alias("violation_points")
        )
        violation_arrays.append(violation_struct)

    df = df.withColumn("violations_array", F.array(*violation_arrays))

    key_cols = ["INSPECTION_ID", "LICENSE_NUMBER", "BUSINESS_NAME", "AKA_NAME", "FACILITY_TYPE", "INSPECTION_DATE", 
               "INSPECTION_TYPE", "STREET_ADDRESS", "CITY", "STATE", "ZIP_CODE", "LOCATION", 
               "INSPECTION_SCORE", "RISK_LEVEL"]
    existing_key_cols = [col for col in key_cols if col in df.columns]
    df_key = df.select(existing_key_cols + ["violations_array"])
    
    df_exploded = df_key.withColumn("violation", F.explode("violations_array"))
    
    available_cols = df_exploded.columns
    select_cols = []
    preferred_cols = ["INSPECTION_ID", "LICENSE_NUMBER", "BUSINESS_NAME", "AKA_NAME", "FACILITY_TYPE", "INSPECTION_DATE", 
                     "INSPECTION_TYPE", "STREET_ADDRESS", "CITY", "STATE", "ZIP_CODE", "LOCATION", 
                     "INSPECTION_SCORE", "RISK_LEVEL"]
    
    for col in preferred_cols:
        if col in available_cols:
            select_cols.append(col)
    
    select_cols.extend([
        F.col("violation.violation_description").alias("VIOLATION_DESCRIPTION_TEMP"),
        F.col("violation.violation_details").alias("VIOLATION_DETAILS"),
        F.col("violation.violation_memo").alias("VIOLATION_MEMO"),
        F.col("violation.violation_points").alias("VIOLATION_POINTS")
    ])
    
    df_violations = df_exploded.select(*select_cols)

    df_violations = df_violations.filter(
        (F.trim(F.col("VIOLATION_DESCRIPTION_TEMP")) != "N/A - Not Applicable") &
        (F.trim(F.col("VIOLATION_DESCRIPTION_TEMP")) != "") &
        (F.trim(F.col("VIOLATION_DESCRIPTION_TEMP")) != "Unknown") &
        (F.col("VIOLATION_DESCRIPTION_TEMP").isNotNull())
    )
    
    # EXTRACT VIOLATION CODE AND DESCRIPTION
    df_violations = df_violations.withColumn("DETAILS_CLEAN",
                      F.when((F.col("VIOLATION_DETAILS").isNotNull()) & (F.trim(F.col("VIOLATION_DETAILS")) != "") & (F.trim(F.col("VIOLATION_DETAILS")) != "Unknown"),
                            F.col("VIOLATION_DETAILS")).otherwise(F.lit("9999 Other Violations")))
    
    df_violations = df_violations.withColumn("CODE_PATTERN_1", F.regexp_extract(F.col("DETAILS_CLEAN"), r'Ch\.[\d.-]+(?:\([a-zA-Z0-9]+\))?', 0))
    df_violations = df_violations.withColumn("CODE_PATTERN_2", F.regexp_extract(F.col("DETAILS_CLEAN"), r'Sec\.\s*[\d.-]+(?:\([a-zA-Z0-9]+\))*', 0))
    df_violations = df_violations.withColumn("CODE_PATTERN_3", F.regexp_extract(F.col("DETAILS_CLEAN"), r'[^\w\s]*([\d]+\.[\d]+)\.?', 1))
    df_violations = df_violations.withColumn("CODE_PATTERN_4", F.regexp_extract(F.col("DETAILS_CLEAN"), r'^[^\w\s]*([\d]+)', 1))
    
    df_violations = df_violations.withColumn("VIOLATION_CODE",
                      F.coalesce(
                          F.when(F.trim(F.col("CODE_PATTERN_1")) != "", F.trim(F.col("CODE_PATTERN_1"))),
                          F.when(F.trim(F.col("CODE_PATTERN_2")) != "", F.trim(F.col("CODE_PATTERN_2"))),
                          F.when(F.trim(F.col("CODE_PATTERN_3")) != "", F.trim(F.col("CODE_PATTERN_3"))),
                          F.when(F.trim(F.col("CODE_PATTERN_4")) != "", F.trim(F.col("CODE_PATTERN_4"))),
                          F.lit("9999")
                      ))
    
    df_violations = df_violations.drop("CODE_PATTERN_1", "CODE_PATTERN_2", "CODE_PATTERN_3", "CODE_PATTERN_4")
    
    df_violations = df_violations.withColumn("VIOLATION_DESCRIPTION_NEW",
                      F.when(F.col("VIOLATION_CODE") != "9999",
                            F.expr("CASE WHEN INSTR(DETAILS_CLEAN, VIOLATION_CODE) > 0 THEN TRIM(SUBSTRING(DETAILS_CLEAN, INSTR(DETAILS_CLEAN, VIOLATION_CODE) + LENGTH(VIOLATION_CODE))) ELSE DETAILS_CLEAN END"))
                      .otherwise(F.lit("Other Violations")))
    
    df_violations = df_violations.withColumn("VIOLATION_DESCRIPTION_NEW",
                      F.when(F.col("VIOLATION_DESCRIPTION_NEW") != "Other Violations",
                            F.regexp_replace(F.col("VIOLATION_DESCRIPTION_NEW"), "^[\\s*\\)\\]#]+", ""))
                      .otherwise(F.col("VIOLATION_DESCRIPTION_NEW")))
    
    df_violations = df_violations.withColumn("VIOLATION_DESCRIPTION_NEW",
                      F.when(F.col("VIOLATION_DESCRIPTION_NEW") != "Other Violations",
                            F.trim(F.regexp_replace(F.col("VIOLATION_DESCRIPTION_NEW"), "\\s+", " ")))
                      .otherwise(F.col("VIOLATION_DESCRIPTION_NEW")))
    
    df_violations = df_violations.withColumn("VIOLATION_DESCRIPTION_NEW",
                      F.when((F.trim(F.col("VIOLATION_DESCRIPTION_NEW")) == "") | (F.col("VIOLATION_DESCRIPTION_NEW").isNull()) | (F.length(F.trim(F.col("VIOLATION_DESCRIPTION_NEW"))) < 3),
                            F.lit("Other Violations")).otherwise(F.col("VIOLATION_DESCRIPTION_NEW")))
    
    df_violations = df_violations.withColumn("VIOLATION_CODE",
                      F.when(F.col("VIOLATION_DESCRIPTION_NEW") == "Other Violations", F.lit("9999")).otherwise(F.col("VIOLATION_CODE")))
    
    # Convert VIOLATION_DESCRIPTION_NEW to uppercase and rename to VIOLATION_DESCRIPTION
    df_violations = df_violations.withColumn("VIOLATION_DESCRIPTION", F.upper(F.col("VIOLATION_DESCRIPTION_NEW")))
    
    df_violations = df_violations.drop("DETAILS_CLEAN", "VIOLATION_DESCRIPTION_TEMP", "VIOLATION_DESCRIPTION_NEW")
    
    # SPLIT LOCATION INTO LATITUDE AND LONGITUDE
    df_violations = df_violations.withColumn("LATITUDE", F.regexp_extract(F.col("LOCATION"), r'\(([0-9.-]+),\s*([0-9.-]+)\)', 1).cast(DoubleType()))
    df_violations = df_violations.withColumn("LONGITUDE", F.regexp_extract(F.col("LOCATION"), r'\(([0-9.-]+),\s*([0-9.-]+)\)', 2).cast(DoubleType()))
    
    zip_window = Window.partitionBy("ZIP_CODE")
    df_violations = df_violations.withColumn("ZIP_LAT_MEAN", F.avg("LATITUDE").over(zip_window))
    df_violations = df_violations.withColumn("ZIP_LON_MEAN", F.avg("LONGITUDE").over(zip_window))
    
    df_violations = df_violations.withColumn("LATITUDE",
                      F.when(F.col("LATITUDE").isNull(), F.coalesce(F.col("ZIP_LAT_MEAN"), F.lit(32.7767))).otherwise(F.col("LATITUDE")))
    df_violations = df_violations.withColumn("LONGITUDE",
                      F.when(F.col("LONGITUDE").isNull(), F.coalesce(F.col("ZIP_LON_MEAN"), F.lit(-96.7970))).otherwise(F.col("LONGITUDE")))
    
    df_violations = df_violations.drop("LOCATION", "ZIP_LAT_MEAN", "ZIP_LON_MEAN")
    
    # ADD INSPECTION_RESULT based on score ranges from the image
    df_violations = df_violations.withColumn("INSPECTION_RESULT",
                      F.when(F.col("INSPECTION_SCORE").between(90, 100), F.lit("Very Good"))
                      .when(F.col("INSPECTION_SCORE").between(80, 89), F.lit("Good"))
                      .when(F.col("INSPECTION_SCORE").between(70, 79), F.lit("Passing"))
                      .when(F.col("INSPECTION_SCORE").between(60, 69), F.lit("Failing"))
                      .when(F.col("INSPECTION_SCORE") < 60, F.lit("Unacceptable"))
                      .otherwise(F.lit("Unknown")))
    
    # DROP columns as per requirements
    columns_to_drop = ["VIOLATION_DETAILS", "VIOLATION_MEMO", "VIOLATION_POINTS"]
    for col in columns_to_drop:
        if col in df_violations.columns:
            df_violations = df_violations.drop(col)
    
    if "LATITUDE" not in df_violations.columns or "LONGITUDE" not in df_violations.columns:
        raise Exception("LATITUDE or LONGITUDE column missing after splitting LOCATION!")
    
    # FINAL COLUMN ORDER (VIOLATION_NUMBER removed)
    ordered_cols = [
        "INSPECTION_ID", "LICENSE_NUMBER", "BUSINESS_NAME", "AKA_NAME", "FACILITY_TYPE", 
        "INSPECTION_DATE", "INSPECTION_TYPE", "STREET_ADDRESS", "CITY", "STATE", "ZIP_CODE", 
        "LATITUDE", "LONGITUDE", "INSPECTION_SCORE", "INSPECTION_RESULT", "RISK_LEVEL",
        "VIOLATION_CODE", "VIOLATION_DESCRIPTION"
    ]
    
    remaining_cols = [col for col in df_violations.columns if col not in ordered_cols]
    final_cols = [col for col in ordered_cols if col in df_violations.columns] + remaining_cols
    df_violations = df_violations.select(*final_cols)

    return df_violations
