In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json

# Configure Path
BRONZE_PATH = "s3a://data-files-rjx/bronze/fire_calls"
SILVER_PATH = "s3a://data-files-rjx/silver/fire_calls"

# Define Standard Rules（Can be set to external json files）
STANDARD_RULES = {
    "null_handling": {
        "default": "fill_na",  # fill_na|drop|keep
        "columns": {
            "Call Number": "drop", # Drop Critical columns that need to exclude Null value
            "city": "drop",
            "box": "drop",
            "Station Area": "drop",
            "Zipcode of Incident": "drop",
            "Original Priority": "drop",
            }
    },
    "type_conversion": {
        # For Time
        "Call Date": "date",
        "Watch Date": "date",
        "Received DtTm": "timestamp",
        "Entry DtTm": "timestamp",
        "Dispatch DtTm": "timestamp",
        "Response DtTm": "timestamp",
        "On Scene DtTm": "timestamp",
        "Transport DtTm": "timestamp",
        "Hospital DtTm": "timestamp",
        "Available DtTm": "timestamp",
 
        # For Numbers
        "Call Number": "integer",
        "Incident Number": "integer",
        "Zipcode of Incident": "integer",
        "Station Area": "integer",
        "Box": "integer",
        "Original Priority": "integer",
        "Priority": "integer",
        "Final Priority": "integer",
        "Number of Alarms": "integer",
        "Unit sequence in call dispatch": "integer",
        "Fire Prevention District":"integer",
        "Supervisor District": "integer",

        # For Boolean
        "ALS Unit": "boolean"
    }
}


def clean_null_values(df, rules):
    """Handle Null values based on the STANDARD RULES"""
    from pyspark.sql.functions import lit
    
    # 1. Handle those to be dropped
    for col_name, action in rules["null_handling"]["columns"].items():
        if action == "drop":
            df = df.filter(col(col_name).isNotNull())
        elif action == "fill_na":
            df = df.na.fill({col_name: "N/A"})
    
    # 2. Handle remaining columns（Using the default rule fill_na）
    default_action = rules["null_handling"]["default"]
    if default_action == "fill_na":
        # Get the remaining columns
        remaining_cols = [c for c in df.columns 
                        if c not in rules["null_handling"]["columns"]]
        df = df.na.fill({c: "N/A" for c in remaining_cols})
    elif default_action == "drop":
        # Note: Use global drop with caution, as it may cause a large amount of data loss
        df = df.na.drop()
    
    return df



from pyspark.sql.functions import col, coalesce, to_date, to_timestamp

def convert_data_types(df, rules):
    """兼Compatible with Spark version"""
    for col_name, target_type in rules["type_conversion"].items():
        if target_type == "date":
            df = df.withColumn(
                col_name,
                coalesce(
                    to_date(col(col_name), "yyyy-MM-dd"),
                    to_date(col(col_name), "MM/dd/yyyy")
                )
            )
        elif target_type == "timestamp":
            df = df.withColumn(
                col_name,
                coalesce(
                    to_timestamp(col(col_name), "MM/dd/yyyy hh:mm:ss a"),
                    to_timestamp(col(col_name), "yyyy-MM-dd HH:mm:ss")
                )
            )
        else:
            # Use cast + exception handling
            df = df.withColumn(
                col_name,
                col(col_name).cast(target_type).alias(col_name))
    return df



def process_silver():
    try:
        # 1. Reading Bronze Data
        bronze_df = spark.read.parquet(BRONZE_PATH)
        
        # 2. Execute Data Cleansing using STANDARD_RULES
        silver_df = (bronze_df
            .transform(lambda df: clean_null_values(df, STANDARD_RULES))
            .transform(lambda df: convert_data_types(df, STANDARD_RULES))
            )
        
        
        # 3. Write into Delta Table（保留Schema演化能力）
        # Using Column Mapping（允许特殊字符）
        (silver_df.write
        .mode("overwrite")
        .format("delta")
        .option("delta.columnMapping.mode", "name")  
        .option("delta.minReaderVersion", "2")
        .option("delta.minWriterVersion", "5")
        .save(SILVER_PATH))
        
        # 4. Record Cleaning metadata
        meta = {
            "processed_time": str(current_timestamp()),
            "input_rows": bronze_df.count(),
            "output_rows": silver_df.count(),
            "rules_applied": STANDARD_RULES
        }
        dbutils.fs.put(f"{SILVER_PATH}/_processing_meta.json", json.dumps(meta))
        
        print("✅ Silver处理完成！")
        print("👉 清洗后Schema:")
        silver_df.printSchema()
        
        return True
    except Exception as e:
        print(f"❌ 处理失败: {str(e)}")
        raise


process_silver()