In [0]:
# Databricks notebook source
from pyspark import pipelines as pl
from pyspark.sql import functions as F
from pyspark.sql.types import (
    MapType, StringType, DoubleType, DateType
)


# Path where JSON files land
volume_path = "/Volumes/workspace/schema_drift/json"

# Define datatype fixes needed in silver
updated_datatypes = {
    "signupDate": "date",
    "CreditScore": "double"
}


# COMMAND ----------
# BRONZE TABLE
pl.create_streaming_table("demo_cust_bronze_sd")

@pl.append_flow(
    target="demo_cust_bronze_sd",
    name="demo_cust_bronze_sd_ingest_flow"
)
def demo_cust_bronze_sd_ingest_flow():
    df = (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "false")
            .option("cloudFiles.schemaEvolutionMode", "rescue")
            .option("rescuedDataColumn", "_rescued_data")
            .load(volume_path)
    )

    # Add lineage & ingestion metadata
    df = (
        df.withColumn("ingestion_datetime", F.current_timestamp())
          .withColumn("source_filename", F.col("_metadata.file_path"))
    )

    return df


# COMMAND ----------
# RESCUED DATA FIELD EXTRACTOR (FIXED VERSION)

def process__rescue_data_new_fields(df):
    """
    Safely extract new fields out of `_rescued_data` JSON STRING into real columns.
    Streaming-safe: no collect(), no toPandas().
    """
    # These are the new fields present in customer_data_2,3,4
    expected_fields = ["Age", "Gender", "LoyaltyStatus", "CreditScore"]

    if "_rescued_data" in df.columns:

        # Convert STRING â†’ MAP
        df = df.withColumn(
            "_rescued_map",
            F.from_json(F.col("_rescued_data"), MapType(StringType(), StringType()))
        )

        # Extract each expected key (if it doesn't already exist)
        for key in expected_fields:
            if key not in df.columns:
                df = df.withColumn(
                    key,
                    F.col("_rescued_map").getItem(key)
                )

        # Drop helper column
        df = df.drop("_rescued_map")

    return df


# COMMAND ----------
# DATATYPE FIXER

def process__rescue_data_datatype_change(df, target_schema_map: dict):
    """
    Adjust datatypes after rescued fields are created.
    Example: {"signupDate": "date", "CreditScore": "double"}
    """
    for col_name, target_type in target_schema_map.items():

        if col_name not in df.columns:
            continue  # don't fail on missing fields

        if target_type.lower() == "date":
            df = df.withColumn(col_name, F.to_date(F.col(col_name)))

        elif target_type.lower() in ("double", "float"):
            df = df.withColumn(col_name, F.col(col_name).cast("double"))

        elif target_type.lower() in ("int", "integer"):
            df = df.withColumn(col_name, F.col(col_name).cast("int"))

        elif target_type.lower() in ("long", "bigint"):
            df = df.withColumn(col_name, F.col(col_name).cast("bigint"))

        else:
            df = df.withColumn(col_name, F.col(col_name).cast("string"))

    return df


# COMMAND ----------
# SILVER TABLE
pl.create_streaming_table("demo_cust_silver_sd")

@pl.append_flow(
    target="demo_cust_silver_sd",
    name="demo_cust_silver_sd_clean_flow"
)
def demo_cust_silver_sd_clean_flow():
    df = spark.readStream.table("demo_cust_bronze_sd")
    df = process__rescue_data_new_fields(df)
    df = process__rescue_data_datatype_change(df, updated_datatypes)
    return df
