## SIMPLY LOAD DATA FROM SOURCE TO DESTINATION 
## FAILS WHEN NEW COLUMN DETECTED

In [0]:
spark.readStream.format('cloudFiles')\
    .option("cloudFiles.format", "csv")\
    .option("cloudFiles.schemaLocation", "dbfs:/mnt/autofolder/data/schema")\
    .load("dbfs:/mnt/autofolder/data")\
    .writeStream\
    .option('mergeSchema', 'true')\
    .option('checkpointLocation', "dbfs:/mnt/autofolder/output_data/checkpoint")\
    .option('path', "dbfs:/mnt/autofolder/output_data/")\
    .toTable("new_autoloader")

## allow new column but save  this columns  in table as rescued data

In [0]:
df = spark.readStream.format('cloudFiles')\
    .option("cloudFiles.format", "csv")\
    .option("cloudFiles.schemaLocation", "dbfs:/mnt/autofolder/data/schema")\
    .option("cloudFiles.schemaEvolutionMode","rescue")\
    .option('rescuedDataColumn',"_rescued_data")\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load("dbfs:/mnt/autofolder/data")\
    

In [0]:
df.writeStream\
    .option('mergeSchema', 'true')\
    .option('checkpointLocation', "dbfs:/mnt/autofolder/output_data/checkpoint")\
    .option('path', "dbfs:/mnt/autofolder/output_data/")\
    .toTable("new_autoloader")

In [0]:
%python
display(df)

## Backfill Old Rescued Data into New Columns

In [0]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType

# Step 1: Read the Delta table with rescued data
df_existing = spark.read.format("delta").option("header", "true").load("dbfs:/mnt/autofolder/output_data/")

# Step 2: Define schema for rescued JSON
rescued_schema = StructType() \
    .add("phone_no", StringType()) \

# Step 3: Parse the JSON inside _rescued_data
df_parsed = df_existing.withColumn(
    "rescued_parsed",
    from_json(col("_rescued_data"), rescued_schema)
)

# # Step 4: Extract fields into real columns
df_extracted = df_parsed \
    .withColumn("phone_no", col("rescued_parsed.phone_no")) \
    .drop("rescued_parsed")

# Initialize the directory as a Delta table if it is not already
spark.sql("CREATE TABLE IF NOT EXISTS updated_rescued_data USING DELTA LOCATION 'dbfs:/mnt/autofolder/output_data/updated_rescued_data'")

# df_extracted.show()
# # Step 5: Overwrite Delta table with merged schema
df_extracted.write \
    .format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .save("dbfs:/mnt/autofolder/output_data/updated_rescued_data")


## Incremental Backfill of Rescued Data

In [0]:
from pyspark.sql.functions import col, from_json, lit
from pyspark.sql.types import StructType, StringType

# Read existing Delta table data
df_existing = spark.read.format("delta").load("dbfs:/mnt/autofolder/output_data/")

# Filter rows where _rescued_data is NOT null (only these need backfill)
df_to_fix = df_existing.filter(col("_rescued_data").isNotNull())

# Define schema for rescued JSON fields
rescued_schema = StructType() \
    .add("city", StringType()) \
    .add("country", StringType())

# Parse the JSON in _rescued_data column
df_parsed = df_to_fix.withColumn(
    "rescued_parsed",
    from_json(col("_rescued_data"), rescued_schema)
)

# Extract fields as proper columns
df_extracted = df_parsed \
    .withColumn("city", col("rescued_parsed.city")) \
    .withColumn("country", col("rescued_parsed.country")) \
    .drop("rescued_parsed")

# Get rows where _rescued_data is null to keep them unchanged
df_unchanged = df_existing.filter(col("_rescued_data").isNull())

# Union fixed rows with unchanged rows
df_final = df_extracted.unionByName(df_unchanged)

# Overwrite Delta table with merged schema (only fixed rows updated)
df_final.write \
    .format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .save("dbfs:/mnt/autofolder/output_data/")


## SCHEMA EVOLUTION DIRECTLY HANDLING INCREMENTAL LOAD

In [0]:
df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.schemaLocation", "dbfs:/mnt/autofolder/data/schema")   # must stay consistent
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")                # enables new column addition
        .option("cloudFiles.rescuedDataColumn", "_rescued_data")                   # capture anything unmatched
        .option("header", "true")
        .load("dbfs:/mnt/autofolder/data/")
)




In [0]:
display(df)


In [0]:

df.writeStream.format("delta") \
    .option("checkpointLocation", "dbfs:/mnt/autofolder/output_data/_check_point") \
    .outputMode("append") \
    .start("dbfs:/mnt/autofolder/output_data/")

In [0]:
display(spark.read.format("delta").load("dbfs:/mnt/autofolder/output_data/"))
