**schema evolution** allows users to change schema to support changing data structure. This is common use case for data ingestion.

**option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")**
- **failOnNewColumns** is STRICT schema implies will not accept any changes and stops the pipeline. change to either `addNewColumns` or `rescue` if you want to accept the evolution
- **addNewColumns** will automatically addes new columns and continues
- **rescue** doesnâ€™t evolve the schema. All unexpected fields fall into a special column called **_rescued_data** and stores data in JSON format

**Note:**
- `addNewColumns` mode is the **default** when a _**schema is not provided**_
- **none** is the default when _**you provide a schema**_ (`addNewColumns` is not allowed when the schema of the streamis provided)

Lets understand Expectations for **_rescued_data** IS NULL
- No errors occur during ingestion related to schema mismatch or data parsing 
- all incoming data should align with the defined schema
- Raise an alert / failure if we see any non conforming rows

This ensures strict data quality enforcement to accept only valid schema rows and rows that are procerly parsed. Additionally, it also helps us to identify error detection in early stages of the pipeline

**Note**: If _rescued_data IS NOT NULL implies that row is either does not adher to schema or some parsing error. 

**Medallion architecture**
- Bronze(dlt format) table -> Read from storage(aka raw data csv,tsv,json,xml,parquet,db source, etc) and create a bronze table which represents as-is data from source
- Silver(dlt format) table -> Apply required exceptions/validations/schema datatype changes/business policies/etc

**Files to be uploaded one after another for each run to show the demo**
- customer_data_1.json: Base file(first file that we upload)
- customer_data_2.json: Additional columns (age, gender,loyaltystatus) for existing customers values change plus new customer(s)
- customer_data_3.json: No structure changes however existing customers value changes plus additional row(s)
- customer_data_4.json: new column added (CreditScore) with existing customers value changes. No new row(s)




In [0]:
#Libraries management
from pyspark import pipelines as pl
from pyspark.sql.functions import *
from pyspark.sql.types import *

volume_path="/Volumes/workspace/damg7370/datastore/SchemaDrift/demo_smm/customer_*.json" 


[0;31m---------------------------------------------------------------------------[0m
[0;31mModuleNotFoundError[0m                       Traceback (most recent call last)
File [0;32m<command-5395825073688072>, line 1[0m
[0;32m----> 1[0m [38;5;28;01mimport[39;00m [38;5;21;01mdlt[39;00m
[1;32m      2[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01msql[39;00m[38;5;21;01m.[39;00m[38;5;21;01mfunctions[39;00m [38;5;28;01mimport[39;00m [38;5;241m*[39m
[1;32m      3[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01msql[39;00m[38;5;21;01m.[39;00m[38;5;21;01mtypes[39;00m [38;5;28;01mimport[39;00m [38;5;241m*[39m

[0;31mModuleNotFoundError[0m: No module named 'dlt'

In [0]:
#bronze layer table: cust_bronze_sd
pl.create_streaming_table("cust_bronze_sd")

# Ingest the raw data into the bronze table using append flow
@pl.append_flow(
  target = "cust_bronze_sd", #object name
  name = "cust_bronze_sd_ingest_flow" #flow name
)
def cust_bronze_sd_ingest_flow():
  df = (
      spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.format", "json")
          .option("cloudFiles.inferColumnTypes", "true") #auto scan schema 
          #.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns") # schema customer_data_1.json is different than customer_data_2.json so it fails with  [UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_RECORD_WITH_FILE_PATH] excetion and stops processing
          .option("cloudFiles.schemaEvolutionMode", "rescue")
          .load(f"{volume_path}")
  )
  return df.withColumn("ingestion_datetime", current_timestamp())\
           .withColumn("source_filename", col("_metadata.file_path")) 


[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
File [0;32m<command-5395825073688071>, line 1[0m
[0;32m----> 1[0m [38;5;129m@dlt[39m[38;5;241m.[39mtable(
[1;32m      2[0m name[38;5;241m=[39m[38;5;124mf[39m[38;5;124m"[39m[38;5;124mbronze_user[39m[38;5;124m"[39m
[1;32m      3[0m )
[1;32m      4[0m [38;5;28;01mdef[39;00m [38;5;21mbronze_table[39m():
[1;32m      5[0m   df [38;5;241m=[39m (spark[38;5;241m.[39mreadStream[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcloudFiles[39m[38;5;124m"[39m)
[1;32m      6[0m       [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mcloudFiles.format[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mparquet[39m[38;5;124m"[39m)
[1;32m      7[0m       [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mcloudFiles.inferColumnTypes[39m[38;5;124m"[39m, [38;5;28;01mTrue[39;00m)
[1;

In [0]:
# Function to handle DATATYPE changes
# Logic to process the fields if data type changes. There are many ways it can be handled
#    (*) Without Overwrite of data in silver layer
#        Create additional table every time a colmn datatype changes and then create view on top of it as UNION to all new tables
#            PROS: Technically we are not overwriting the data hence no reloading
#            CONS: New table will created every time datatype changes 
#
#    (*) Overwrite data in both Bronze and Silver layers
#        I am not sure how this works for streams. I have not done much exploration in this method. Hope it works
#        Here aswell we use _rescued_data column to check qualit expectation for schema update
#            PROS: No need to reload bronze layer table as _rescued_data has all desired changed and can be used to process
#            CONS: Less code changes because _rescued_data doesnt need any additional logic to hendle. 
#                  However all raw data to be stored at begining. Reload of both tables
#
#    (*) Merge and overwrite data in silver layer (below function example does the same implementation)
#            PROS: No need to reload bronze layer table as _rescued_data has all desired changed and can be used to process
#            CONS: Table in silver need to be completely reloaded
# NOTE: The above options technically doesnt handle column renames. We need to write additional logic to handle column renames
#       I would say we could follow the views logic to load renamed column as new field and then in view drop old column and use new renamed column
#       However, we need to merge the data in silver layer and hence we need to reload the silver layer table

def process__rescue_data_datatype_change(df, target_schema: StructType):
    #Parse the _rescued_data json to a MAP (Key,Value) type and store in _rescued_data_modified column
    df = df.withColumn("_rescued_data_modified", from_json(col("_rescued_data"), MapType(StringType(), StringType())))
    
    for field in target_schema.fields:
        data_type = field.dataType
        column_name = field.name

        # Check if "_rescue_data" is not null and if the key exists
        # pyspark.sql.functions.map_contains_key function in PySpark is used to check if a specified key exists within a MapType column in a DataFrame. returns T/F
        key_condition = expr(f"_rescued_data_modified IS NOT NULL AND map_contains_key(_rescued_data_modified, '{column_name}')")
        
        # Extract the rescued value for this column, if it exists, and cast it to the target data type
        rescued_value = when(key_condition, col("_rescued_data_modified").getItem(column_name).cast(data_type)).otherwise(col(column_name).cast(data_type))
        
        # Update the DataFrame with the merged column
        df = df.withColumn(column_name, rescued_value)
        df = df.withColumn(column_name, col(column_name).cast(data_type))
        
    df = df.drop('_rescued_data_modified')

    # Setting the _rescued_data to null after processing since we use the column to check qualit expectation for schema update
    df = df.withColumn('_rescued_data', lit(None).cast(StringType()))
    return df


In [0]:
def discover_columns_from_rescued_data():
    bronze_batch = spark.read.table("cust_bronze_sd")
    rows_with_rescued = bronze_batch.filter(col("_rescued_data").isNotNull())
    
    if rows_with_rescued.count() == 0:
        return []
    
    df_parsed = rows_with_rescued.withColumn(
        "_rescued_map",
        from_json(col("_rescued_data"), MapType(StringType(), StringType()))
    )
    
    df_keys = df_parsed.select(
        explode(map_keys(col("_rescued_map"))).alias("rescued_key")
    ).distinct()
    
    return [row["rescued_key"] for row in df_keys.collect() 
            if row["rescued_key"] != "_file_path"]

# Function to handle adding NEW FIELDS 
def process__rescue_data_new_fields(df):

    #Add all fields from _rescued_data to key map
    df = df.withColumn(
        "_rescued_data_json_to_map", 
        from_json(
            col("_rescued_data"), 
            MapType(StringType(), StringType())
        )
    )

    # Extract all keys from _rescued_data_map_keys
    df = df.withColumn("_rescued_data_map_keys", map_keys(col("_rescued_data_json_to_map")))

    # Get all keys in all rows as a new DataFrame
    df_keys = df.select(
        explode(
            map_keys(col("_rescued_data_json_to_map"))
        ).alias("rescued_key")
    ).distinct()

    # Collect keys as a list (only if df is not streaming)
    # If streaming, you must provide the list of possible keys another way
    if not df.isStreaming:
        new_keys = [row["rescued_key"] for row in df_keys.collect()]
    else:
        new_keys = discover_columns_from_rescued_data()
    
    existing_columns = set(df.columns)

    # Add new columns for each key
    for key in new_keys:
        if key != "_file_path" and key not in existing_columns:
            df = df.withColumn(
                key,
                when(
                    col("_rescued_data_json_to_map").isNotNull(),
                    col("_rescued_data_json_to_map").getItem(key)
                ).otherwise(lit(None)).cast(StringType())
            )

    #***Ehnancement can be done by adding additional logic 
    #***  to exclude columns that are already in dataframe(Substract those columns)
    #***  to infer datatype for new columns and use infered datatype instead of static stringtype
    #***  additionally check if each column exists and dataframe has rows on each transformation and raise exception before using it

    df = df.drop("_rescued_data_json_to_map", "_rescued_data_map_keys")
    
    return df

In [0]:
# # -----------------------------------------------------------------------------------------------------
# #plain implementation without processing _rescue_data field. Use this when you upload customer_data_1.json
# # -----------------------------------------------------------------------------------------------------
# pl.create_streaming_table(
# name = "cust_silver_sd",
# expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "CustomerID IS NOT NULL"}
# )
# @pl.append_flow(
#   target = "cust_silver_sd",
#   name = "cust_silver_sd_clean_flow"
# # 
# def cust_silver_sd_clean_flow():
#   return (
# spark.readStream.table("cust_bronze_sd")
# )

In [0]:
# # -----------------------------------------------------------------------------------------------------
# # uncomment this code before uploading customer_data_2.json. Then upload the file and run the pipeline
# # -----------------------------------------------------------------------------------------------------
# # we know that when we process customer_data_2.json file there are new fields in schema to be added and at same time we are planing for datatype chage
# # for an already existing field that came with customer_data_1.json file. Since there is a datatype change for existing field so we need to perform
# # full refresh (Run pipeline with full table refresh)
updated_datatypes = StructType([
  # define the column signuoDate as DATE type and also make it nullable (Make 3rd argument False if you want to make it non nullable)
  StructField("signupDate", DateType(), True) 
])

pl.create_streaming_table(
  name = "cust_silver_sd",
  expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "CustomerID IS NOT NULL"}
)

@pl.append_flow(
  target = "cust_silver_sd",
  name = "cust_silver_sd_clean_flow"
)
def cust_silver_sd_clean_flow():
      df = (
        spark.readStream.table("cust_bronze_sd")
            )
      df = process__rescue_data_new_fields(df)
      df = process__rescue_data_datatype_change(df, updated_datatypes)
      return df



In [0]:

# PART 1B: addNewColumns Mode Implementation
# Different path and different table names

volume_path_addnew = "/Volumes/workspace/damg7370/datastore/SchemaDrift/addNewColumn/customer_*.json"

In [0]:
# Bronze table with addNewColumns mode - DIFFERENT TABLE NAME
pl.create_streaming_table("cust_bronze_addnew")

@pl.append_flow(
  target = "cust_bronze_addnew",
  name = "cust_bronze_addnew_ingest_flow"
)
def cust_bronze_addnew_ingest_flow():
  df = (
      spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.format", "json")
          .option("cloudFiles.inferColumnTypes", "true")
          .option("cloudFiles.schemaLocation", "/Volumes/workspace/damg7370/datastore/SchemaDrift/_schema_addnew")
          # KEY DIFFERENCE: addNewColumns instead of rescue
          .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
          .load(f"{volume_path_addnew}")
  )
  return df.withColumn("ingestion_datetime", current_timestamp())\
           .withColumn("source_filename", col("_metadata.file_path"))

In [0]:
updated_datatypes_addnew = StructType([
    StructField("SignupDate", DateType(), True)  
])

pl.create_streaming_table(
    name = "cust_silver_addnew",
    expect_all_or_drop = {"valid_id": "CustomerID IS NOT NULL"}
)

@pl.append_flow(
    target = "cust_silver_addnew",
    name = "cust_silver_addnew_clean_flow"
)
def cust_silver_addnew_clean_flow():
    df = spark.readStream.table("cust_bronze_addnew")
    
    # Matching the bronze column name: SignupDate (capital S)
    if "SignupDate" in df.columns:
        df = df.withColumn("SignupDate", col("SignupDate").cast(DateType()))
    
    return df