### Schema Evolution

##### Schema Evolution is the ability of a data system to adapt to changes in the structure of the data over time without breaking existing pipelines

##### In this notebook I have used a scenario where data loads from an existing table into another historical table on a monthly basis.

The existing table always changes over time, sometimes it has more number of columns, sometimes with changes in data types, sometimes the orders of the columns would be changed etc..i.e.the schema always evolves over time.

Lets see how Schema Evolution is handled in data bricks covering multiple scenarios. 



- Step 1: Create a Monthly Snapshot of the existing table with the same schema as of the existing table 

In [0]:
%sql
CREATE OR REPLACE TABLE default.daily_agg_orders 
(
  Region STRING,
  Country STRING,
  State STRING,
  Sales DOUBLE,
  Quantity INT
)

In [0]:
%sql
INSERT INTO default.daily_agg_orders
SELECT Region,
Country,
State,
SUM(sales) as Sales,
SUM(Quantity) as Quantity
from default.schema_evolution --this is just a simple superstore dataset
group by 1,2,3

num_affected_rows,num_inserted_rows
49,49


Step 1: From daily table, we need to create monthly snapshot on the first of every month(here i will be using random dates for demonstration purposes)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, lit
from datetime import datetime 

spark=SparkSession.builder.appName("SchemaEvolution").getOrCreate()

#Get daily data 
current_orders = spark.table("default.daily_agg_orders")

#Add date information 
Snapshot_date=datetime.now().strftime("%Y-%m-%d")
orders_with_snapshot=current_orders.withColumn("Snapshot_date",current_date()-1) #here i have used current_date()-1 for demo purposes

#Appending data to the monthly table - using schema evolution. If there exists no table it will create automatically 
orders_with_snapshot.write\
    .format("delta")\
    .option("mergeSchema","true")\
    .mode("append")\
    .saveAsTable("default.monthly_agg_orders")

print(f"Historical data updated sucessfully for {Snapshot_date}")

Historical data updated sucessfully for 2025-05-31


In [0]:
%sql
select * from default.monthly_agg_orders

Region,Country,State,Sales,Quantity,Snapshot_date
South,United States,Arkansas,11678.129999999996,240,2025-05-30
Central,United States,Michigan,76269.61400000002,946,2025-05-30
Central,United States,Kansas,2914.31,74,2025-05-30
South,United States,Virginia,70636.71999999999,893,2025-05-30
East,United States,Delaware,27451.068999999992,367,2025-05-30
East,United States,West Virginia,1209.824,18,2025-05-30
Central,United States,Iowa,4579.759999999999,112,2025-05-30
East,United States,Pennsylvania,116511.91400000003,2153,2025-05-30
South,United States,Louisiana,9217.03,156,2025-05-30
Central,United States,Texas,170188.04580000002,3724,2025-05-30


Scenario 1: Now lets say new column profit is added to the daily table

In [0]:
%sql
CREATE OR REPLACE TABLE default.daily_agg_orders 
(
  Region STRING,
  Country STRING,
  State STRING,
  Sales DOUBLE,
  Quantity INT,
  Profit DOUBLE
)

In [0]:
%sql
INSERT INTO default.daily_agg_orders
SELECT Region,
Country,
State,
SUM(sales) as Sales,
SUM(Quantity) as Quantity,
AVG(Profit) as avg_Profit
from default.schema_evolution --this is just a simple superstore dataset
group by 1,2,3

num_affected_rows,num_inserted_rows
49,49


Lets see how it is handled with schema evolution in the monthly snapshot for current_date()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, lit
from datetime import datetime 

spark=SparkSession.builder.appName("SchemaEvolution").getOrCreate()

#Get daily data 
current_orders = spark.table("default.daily_agg_orders")

#Add date information 
Snapshot_date=datetime.now().strftime("%Y-%m-%d")
orders_with_snapshot=current_orders.withColumn("Snapshot_date",current_date()) #here i have used current_date() for demo purposes

#Appending data to the monthly table - using schema evolution. If there exists no table it will create automatically 
orders_with_snapshot.write\
    .format("delta")\
    .option("mergeSchema","true")\
    .mode("append")\
    .saveAsTable("default.monthly_agg_orders")

print(f"Historical data updated sucessfully for {Snapshot_date}")

Historical data updated sucessfully for 2025-05-31


In [0]:
%sql
select * from default.monthly_agg_orders where Snapshot_date='2025-05-30' -- while checking observe that the Profit column exists but is null for the previous date 

Region,Country,State,Sales,Quantity,Snapshot_date,Profit
South,United States,Arkansas,11678.129999999996,240,2025-05-30,
Central,United States,Michigan,76269.61400000002,946,2025-05-30,
Central,United States,Kansas,2914.31,74,2025-05-30,
South,United States,Virginia,70636.71999999999,893,2025-05-30,
East,United States,Delaware,27451.068999999992,367,2025-05-30,
East,United States,West Virginia,1209.824,18,2025-05-30,
Central,United States,Iowa,4579.759999999999,112,2025-05-30,
East,United States,Pennsylvania,116511.91400000003,2153,2025-05-30,
South,United States,Louisiana,9217.03,156,2025-05-30,
Central,United States,Texas,170188.04580000002,3724,2025-05-30,


scenario 2: Lets say a column has been removed. 

In [0]:
%sql
ALTER TABLE default.daily_agg_orders SET TBLPROPERTIES (
  'delta.columnMapping.mode' = 'name',
  'delta.minReaderVersion' = '2',
  'delta.minWriterVersion' = '5'
)

In [0]:
%sql
ALTER TABLE default.daily_agg_orders DROP COLUMN Quantity

Now lets see how schema evolution handles this scenario, observe that i will be using current_date()+1 for this 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, lit
from datetime import datetime 

spark=SparkSession.builder.appName("SchemaEvolution").getOrCreate()

#Get daily data 
current_orders = spark.table("default.daily_agg_orders")

#Add date information 
Snapshot_date=datetime.now().strftime("%Y-%m-%d")
orders_with_snapshot=current_orders.withColumn("Snapshot_date",current_date()+1) #here i have used current_date()+1 for demo purposes

#Appending data to the monthly table - using schema evolution. If there exists no table it will create automatically 
orders_with_snapshot.write\
    .format("delta")\
    .option("mergeSchema","true")\
    .mode("append")\
    .saveAsTable("default.monthly_agg_orders")

print(f"Historical data updated sucessfully for {Snapshot_date}")

Historical data updated sucessfully for 2025-05-31


In [0]:
%sql
select * from monthly_agg_orders where Snapshot_date='2025-06-01'

Region,Country,State,Sales,Quantity,Snapshot_date,Profit
South,United States,Arkansas,11678.129999999996,,2025-06-01,66.81145166666667
Central,United States,Michigan,76269.61400000002,,2025-06-01,95.93406901960782
Central,United States,Kansas,2914.31,,2025-06-01,34.8518125
South,United States,Virginia,70636.71999999999,,2025-06-01,83.02656428571429
East,United States,Delaware,27451.068999999992,,2025-06-01,103.9309875
East,United States,West Virginia,1209.824,,2025-06-01,46.4804
Central,United States,Iowa,4579.759999999999,,2025-06-01,39.46039666666668
East,United States,Pennsylvania,116511.91400000003,,2025-06-01,-26.507598466780266
South,United States,Louisiana,9217.03,,2025-06-01,52.28815000000002
Central,United States,Texas,170188.04580000002,,2025-06-01,-26.12117390862944


Scenario 3: Now lets say Quantity column in the source has been modified from int to double(doesnt make a real sense but just for demo purposes)

In [0]:
%sql
CREATE OR REPLACE TABLE default.daily_agg_orders 
(
  Region STRING,
  Country STRING,
  State STRING,
  Sales DOUBLE,
  Quantity DOUBLE,-- here 
  Profit DOUBLE
)

In [0]:
%sql
INSERT INTO default.daily_agg_orders
SELECT Region,
Country,
State,
SUM(sales) as Sales,
SUM(Quantity) as Quantity,
AVG(Profit) as avg_Profit
from default.schema_evolution --this is just a simple superstore dataset
group by 1,2,3

num_affected_rows,num_inserted_rows
49,49


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, col
from datetime import datetime

# Initialize Spark session
spark = SparkSession.builder.appName("SchemaAlignmentWrite").getOrCreate()

# Define table names
source_table = "default.daily_agg_orders"
target_table = "default.monthly_agg_orders"

# Read source table
source_df = spark.table(source_table)

# Add snapshot date (demo: current_date + 2)
snapshot_date = datetime.now().strftime("%Y-%m-%d")
source_df = source_df.withColumn("Snapshot_date", current_date() + 2)

# Try reading the target table and aligning schemas
try:
    target_df = spark.table(target_table)
    target_schema = {field.name: field.dataType for field in target_df.schema}

    # Align source columns to match target schema types
    aligned_columns = []
    for field in source_df.schema:
        column_name = field.name
        if column_name in target_schema:
            target_type = target_schema[column_name]
            aligned_columns.append(col(column_name).cast(target_type).alias(column_name))
        else:
            aligned_columns.append(col(column_name))
    
    aligned_df = source_df.select(*aligned_columns)

except Exception as e:
    # If target table doesn't exist yet, proceed with source as-is
    aligned_df = source_df
    print(f"Target table not found or not readable: {e}")

# Write using schema evolution (append mode)
aligned_df.write \
    .format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .saveAsTable(target_table)

print(f"Historical data updated successfully for snapshot: {snapshot_date}")


Historical data updated successfully for snapshot: 2025-05-31


In [0]:
%sql
select * from daily_agg_orders -- you can see that the Quantity column is of type double

Region,Country,State,Sales,Quantity,Profit
South,United States,Arkansas,11678.129999999996,240.0,66.81145166666667
Central,United States,Michigan,76269.61400000002,946.0,95.93406901960782
Central,United States,Kansas,2914.31,74.0,34.8518125
South,United States,Virginia,70636.71999999999,893.0,83.02656428571429
East,United States,Delaware,27451.068999999992,367.0,103.9309875
East,United States,West Virginia,1209.824,18.0,46.4804
Central,United States,Iowa,4579.759999999999,112.0,39.46039666666668
East,United States,Pennsylvania,116511.91400000003,2153.0,-26.507598466780266
South,United States,Louisiana,9217.03,156.0,52.28815000000002
Central,United States,Texas,170188.04580000002,3724.0,-26.12117390862944


In [0]:
%sql
select * from monthly_agg_orders where Snapshot_date='2025-06-02'--you can see that the Quantity column is integer here

Region,Country,State,Sales,Quantity,Snapshot_date,Profit
South,United States,Arkansas,11678.129999999996,240,2025-06-02,66.81145166666667
Central,United States,Michigan,76269.61400000002,946,2025-06-02,95.93406901960782
Central,United States,Kansas,2914.31,74,2025-06-02,34.8518125
South,United States,Virginia,70636.71999999999,893,2025-06-02,83.02656428571429
East,United States,Delaware,27451.068999999992,367,2025-06-02,103.9309875
East,United States,West Virginia,1209.824,18,2025-06-02,46.4804
Central,United States,Iowa,4579.759999999999,112,2025-06-02,39.46039666666668
East,United States,Pennsylvania,116511.91400000003,2153,2025-06-02,-26.507598466780266
South,United States,Louisiana,9217.03,156,2025-06-02,52.28815000000002
Central,United States,Texas,170188.04580000002,3724,2025-06-02,-26.12117390862944


Scenario 4: What if the order of the source columns has changed

In [0]:
%sql
CREATE OR REPLACE TABLE default.daily_agg_orders 
(
  Region STRING,
  Country STRING,
  State STRING,
  Sales DOUBLE,
  Profit DOUBLE,
  Quantity DOUBLE -- previously, it was before Profit here 
)

In [0]:
%sql
INSERT INTO default.daily_agg_orders
SELECT Region,
Country,
State,
SUM(sales) as Sales,
AVG(Profit) as avg_Profit,
SUM(Quantity) as Quantity
from default.schema_evolution --this is just a simple superstore dataset
group by 1,2,3

num_affected_rows,num_inserted_rows
49,49


Lets see how schema evolution handles for current_date()+3 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, lit
from datetime import datetime 

spark=SparkSession.builder.appName("SchemaEvolution").getOrCreate()

#Get daily data 
current_orders = spark.table("default.daily_agg_orders")

#target schema
target_table="default.monthly_agg_orders"
target_schema=spark.table(target_table).schema 

#Add date information 
Snapshot_date=datetime.now().strftime("%Y-%m-%d")
orders_with_snapshot=current_orders.withColumn("Snapshot_date",current_date()+3) #here i have used current_date()+3 for demo purposes

orders_with_snapshot=orders_with_snapshot.select(
    *[col(field.name).cast(field.dataType) for field in target_schema]
)

#Appending data to the monthly table - using schema evolution. If there exists no table it will create automatically 
orders_with_snapshot.write\
    .format("delta")\
    .option("mergeSchema","true")\
    .mode("append")\
    .saveAsTable(target_table)
print(f"Historical data updated sucessfully for {Snapshot_date}")

Historical data updated sucessfully for 2025-05-31


In [0]:
%sql
select * from monthly_agg_orders where Snapshot_date='2025-06-03'

Region,Country,State,Sales,Quantity,Snapshot_date,Profit
South,United States,Arkansas,11678.129999999996,240,2025-06-03,66.81145166666667
Central,United States,Michigan,76269.61400000002,946,2025-06-03,95.93406901960782
Central,United States,Kansas,2914.31,74,2025-06-03,34.8518125
South,United States,Virginia,70636.71999999999,893,2025-06-03,83.02656428571429
East,United States,Delaware,27451.068999999992,367,2025-06-03,103.9309875
East,United States,West Virginia,1209.824,18,2025-06-03,46.4804
Central,United States,Iowa,4579.759999999999,112,2025-06-03,39.46039666666668
East,United States,Pennsylvania,116511.91400000003,2153,2025-06-03,-26.507598466780266
South,United States,Louisiana,9217.03,156,2025-06-03,52.28815000000002
Central,United States,Texas,170188.04580000002,3724,2025-06-03,-26.12117390862944
