In [0]:
from pyspark import pipelines as pl
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    MapType,
    LongType,
)


volume_path = "/Volumes/workspace/damg7370/datastore/schema_drift/schema_drift_fresh"

schema_location_addcols = "/Volumes/workspace/damg7370/datastore/schema_drift/schema_location_addcols"

bronze_schema_rescue = StructType([
    StructField("CustomerID",  StringType(), True),
    StructField("Name",        StringType(), True),
    StructField("Email",       StringType(), True),
    StructField("PhoneNumber", StringType(), True),
    StructField("City",        StringType(), True),
    StructField("signupDate",  StringType(), True),   # note: string here
    StructField("CreditScore", StringType(), True),   # can start as string
])

# Datatypes we want in Silver (for RESCUE path)
updated_datatypes = {
    "SignupDate": "date",
    "CreditScore": "double",
}

pl.create_streaming_table("demo_cust_bronze_sd")

@pl.append_flow(
    target="demo_cust_bronze_sd",
    name="demo_cust_bronze_sd_ingest_flow"
)
def demo_cust_bronze_sd_ingest_flow():
    df = (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "false")
            .option("cloudFiles.schemaEvolutionMode", "rescue")
            .option("rescuedDataColumn", "_rescued_data")
            .schema(bronze_schema_rescue)
            .load(volume_path)
    )

    # Add metadata columns
    df = (
        df.withColumn("ingestion_datetime", F.current_timestamp())
          .withColumn("source_filename", F.col("_metadata.file_path"))
    )

    return df


def process__rescue_data_new_fields(df):
    """
    Extract fields out of `_rescued_data` and populate proper columns.
    Only add new columns if they do not already exist in the DataFrame.
    """
    if "_rescued_data" not in df.columns:
        return df

    # Parse JSON string into MAP<string,string>
    df = df.withColumn(
        "_rescued_map",
        F.from_json(F.col("_rescued_data"), MapType(StringType(), StringType()))
    )

    # Map: key in JSON -> desired column name
    field_map = {
        "CustomerID":    "CustomerID",
        "CustomerId":    "CustomerID",
        "FullName":      "Name",
        "Name":          "Name",
        "Email":         "Email",
        "PhoneNumber":   "PhoneNumber",
        "City":          "City",
        "SignupDate":    "SignupDate",
        "signupDate":    "SignupDate",
        "Age":           "Age",
        "Gender":        "Gender",
        "LoyaltyStatus": "LoyaltyStatus",
        "CreditScore":   "CreditScore",
    }

    # Only add rescued columns if they do not already exist
    for src_key, dest_col in field_map.items():
        if dest_col in df.columns:
            # If column exists, update it with rescued value if null
            df = df.withColumn(
                dest_col,
                F.coalesce(
                    F.col(dest_col),
                    F.col("_rescued_map").getItem(src_key)
                )
            )
        else:
            # If column does not exist, add it from rescued map if present
            df = df.withColumn(
                dest_col,
                F.col("_rescued_map").getItem(src_key)
            )

    # Drop helper column
    df = df.drop("_rescued_map")

    return df


def process__rescue_data_datatype_change(df, target_schema_map: dict):
    """
    Convert selected columns to their target datatypes in Silver.
    """
    for col_name, target_type in target_schema_map.items():
        if col_name not in df.columns:
            continue

        if target_type.lower() == "date":
            df = df.withColumn(col_name, F.to_date(F.col(col_name)))
        elif target_type.lower() in ("double", "float"):
            df = df.withColumn(col_name, F.col(col_name).cast("double"))
        elif target_type.lower() in ("int", "integer"):
            df = df.withColumn(col_name, F.col(col_name).cast("int"))
        else:
            df = df.withColumn(col_name, F.col(col_name).cast("string"))

    return df

pl.create_streaming_table("demo_cust_silver_rescue")

@pl.append_flow(
    target="demo_cust_silver_rescue",
    name="demo_cust_silver_rescue_flow"
)
def demo_cust_silver_rescue_flow():
    # Use pl.read_stream so graph shows Bronze -> Silver link
    df = pl.read_stream("demo_cust_bronze_sd")

    # 1) Pull fields out of _rescued_data
    df = process__rescue_data_new_fields(df)

    # 2) Fix datatypes (SignupDate -> date, CreditScore -> double)
    df = process__rescue_data_datatype_change(df, updated_datatypes)

    return df