In [0]:
%sql
CREATE CATALOG IF NOT EXISTS retail_db;
CREATE SCHEMA IF NOT EXISTS retail_db.bronze;
CREATE SCHEMA IF NOT EXISTS retail_db.silver;
CREATE SCHEMA IF NOT EXISTS retail_db.gold;

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS retail_db.bronze.raw_sales_files;

In [0]:
from datetime import date
from pyspark.sql import Row

batch_1 = [
    Row(201,'ORD-2001','India','Laptop',1,72000,date(2026,1,1)),
    Row(202,'ORD-2002','USA','Phone',2,42000,date(2026,1,1)),
    Row(203,'ORD-2003','India','Tablet',1,30000,date(2026,1,1))
]

df_batch1 = spark.createDataFrame(batch_1,
 ['customer_id','order_id','country','product','quantity','amount','ingest_date'])

df_batch1.write.mode('overwrite').option('header','true').csv(
 '/Volumes/retail_db/bronze/raw_sales_files/batch_2026_01_01')

In [0]:
%sql
CREATE TABLE IF NOT EXISTS retail_db.bronze.sales_bronze (
 customer_id INT,
 order_id STRING,
 country STRING,
 product STRING,
 quantity INT,
 amount INT,
 ingest_date DATE
) USING DELTA;

In [0]:
spark.read.option('header','true').csv(
 '/Volumes/retail_db/bronze/raw_sales_files/batch_2026_01_01'
).write.mode('append').saveAsTable(
 'retail_db.bronze.sales_bronze')

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-5712747043100164>, line 3[0m
[1;32m      1[0m spark[38;5;241m.[39mread[38;5;241m.[39moption([38;5;124m'[39m[38;5;124mheader[39m[38;5;124m'[39m,[38;5;124m'[39m[38;5;124mtrue[39m[38;5;124m'[39m)[38;5;241m.[39mcsv(
[1;32m      2[0m  [38;5;124m'[39m[38;5;124m/Volumes/retail_db/bronze/raw_sales_files/batch_2026_01_01[39m[38;5;124m'[39m
[0;32m----> 3[0m )[38;5;241m.[39mwrite[38;5;241m.[39mmode([38;5;124m'[39m[38;5;124mappend[39m[38;5;124m'[39m)[38;5;241m.[39msaveAsTable(
[1;32m      4[0m  [38;5;124m'[39m[38;5;124mretail_db.bronze.sales_bronze[39m[38;5;124m'[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/readwriter.py:737[0m, in [0;36mDataFrameWriter.saveAsTable[0;34m(self, name, format, mode, partitionBy, **o

In [0]:
spark.read.option('header','true').csv(
 '/Volumes/retail_db/bronze/raw_sales_files/batch_2026_01_01'
).write.mode('append').saveAsTable(
 'retail_db.bronze.sales_bronze')

In [0]:
from pyspark.sql.types import *

sales_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("order_id", StringType(), True),
    StructField("country", StringType(), True),
    StructField("product", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("amount", IntegerType(), True),
    StructField("ingest_date", DateType(), True)
])


In [0]:
df_bronze_1 = (
    spark.read
         .schema(sales_schema)
         .option("header", "true")
         .csv("/Volumes/retail_db/bronze/raw_sales_files/batch_2026_01_01")
)

In [0]:
df_bronze_1.show(2)

+-----------+--------+-------+-------+--------+------+-----------+
|customer_id|order_id|country|product|quantity|amount|ingest_date|
+-----------+--------+-------+-------+--------+------+-----------+
|        201|ORD-2001|  India| Laptop|       1| 72000| 2026-01-01|
|        203|ORD-2003|  India| Tablet|       1| 30000| 2026-01-01|
+-----------+--------+-------+-------+--------+------+-----------+
only showing top 2 rows


In [0]:
df_bronze_1.write \
    .mode("append") \
    .saveAsTable("retail_db.bronze.sales_bronze")

In [0]:
%sql
select * from retail_db.bronze.sales_bronze

customer_id,order_id,country,product,quantity,amount,ingest_date
201,ORD-2001,India,Laptop,1,72000,2026-01-01
202,ORD-2002,USA,Phone,2,42000,2026-01-01
203,ORD-2003,India,Tablet,1,30000,2026-01-01
201,ORD-2001,India,Laptop,1,72000,2026-01-01
202,ORD-2002,USA,Phone,2,42000,2026-01-01
203,ORD-2003,India,Tablet,1,30000,2026-01-01


In [0]:
df_bronze_1.write \
    .mode("append") \
    .saveAsTable("retail_db.bronze.sales_bronze")


In [0]:
%sql
select * from retail_db.bronze.sales_bronze;

customer_id,order_id,country,product,quantity,amount,ingest_date
202,ORD-2002,USA,Phone,2,45000,2026-01-02
204,ORD-2004,UK,Laptop,1,70000,2026-01-02
204,ORD-2004,UK,Laptop,1,70000,2026-01-02
201,ORD-2001,India,Laptop,1,72000,2026-01-01
202,ORD-2002,USA,Phone,2,42000,2026-01-01
203,ORD-2003,India,Tablet,1,30000,2026-01-01
201,ORD-2001,India,Laptop,1,72000,2026-01-01
202,ORD-2002,USA,Phone,2,42000,2026-01-01
203,ORD-2003,India,Tablet,1,30000,2026-01-01
201,ORD-2001,India,Laptop,1,72000,2026-01-01


### Delta can allow schema evolution:
.option("mergeSchema", "true")
CSV has no strong typing
You want strict contracts at ingestion
So explicit schema > mergeSchema here.

In [0]:
batch_2 = [
    Row(202,'ORD-2002','USA','Phone',2,45000,date(2026,1,2)),
    Row(204,'ORD-2004','UK','Laptop',1,70000,date(2026,1,2)),
    Row(204,'ORD-2004','UK','Laptop',1,70000,date(2026,1,2))
]

df_batch2 = spark.createDataFrame(batch_2,
 ['customer_id','order_id','country','product','quantity','amount','ingest_date'])

df_batch2.write.mode('overwrite').option('header','true').csv(
 '/Volumes/retail_db/bronze/raw_sales_files/batch_2026_01_02')

In [0]:
df_bronze_2 = (
    spark.read
         .schema(sales_schema)
         .option("header", "true")
         .csv("/Volumes/retail_db/bronze/raw_sales_files/batch_2026_01_02")
)

In [0]:
df_bronze_2.write \
    .mode("append") \
    .saveAsTable("retail_db.bronze.sales_bronze")

In [0]:
%sql
select * from retail_db.bronze.sales_bronze

customer_id,order_id,country,product,quantity,amount,ingest_date
202,ORD-2002,USA,Phone,2,45000,2026-01-02
204,ORD-2004,UK,Laptop,1,70000,2026-01-02
204,ORD-2004,UK,Laptop,1,70000,2026-01-02
201,ORD-2001,India,Laptop,1,72000,2026-01-01
202,ORD-2002,USA,Phone,2,42000,2026-01-01
203,ORD-2003,India,Tablet,1,30000,2026-01-01
201,ORD-2001,India,Laptop,1,72000,2026-01-01
202,ORD-2002,USA,Phone,2,42000,2026-01-01
203,ORD-2003,India,Tablet,1,30000,2026-01-01
201,ORD-2001,India,Laptop,1,72000,2026-01-01


In [0]:
from pyspark.sql import Row
from datetime import date

bad_batch = [
    Row("ABC", "ORD-2005", "India", "Laptop", 1, "NOT_A_NUMBER", date(2026,1,3))
]

df_bad = spark.createDataFrame(
    bad_batch,
    ["customer_id","order_id","country","product","quantity","amount","ingest_date"]
)

df_bad.write.mode("overwrite").option("header","true").csv(
    "/Volumes/retail_db/bronze/raw_sales_files/batch_2026_01_03_bad"
)

In [0]:
from pyspark.sql.types import *

sales_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("order_id", StringType(), True),
    StructField("country", StringType(), True),
    StructField("product", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("amount", IntegerType(), True),
    StructField("ingest_date", DateType(), True)
])

spark.read \
    .schema(sales_schema) \
    .option("header","true") \
    .csv("/Volumes/retail_db/bronze/raw_sales_files/batch_2026_01_03_bad") \
    .write.mode("append") \
    .saveAsTable("retail_db.bronze.sales_bronze")


In [0]:
%sql
select * from retail_db.bronze.sales_bronze

customer_id,order_id,country,product,quantity,amount,ingest_date
202.0,ORD-2002,USA,Phone,2,45000.0,2026-01-02
204.0,ORD-2004,UK,Laptop,1,70000.0,2026-01-02
204.0,ORD-2004,UK,Laptop,1,70000.0,2026-01-02
201.0,ORD-2001,India,Laptop,1,72000.0,2026-01-01
202.0,ORD-2002,USA,Phone,2,42000.0,2026-01-01
203.0,ORD-2003,India,Tablet,1,30000.0,2026-01-01
201.0,ORD-2001,India,Laptop,1,72000.0,2026-01-01
202.0,ORD-2002,USA,Phone,2,42000.0,2026-01-01
203.0,ORD-2003,India,Tablet,1,30000.0,2026-01-01
201.0,ORD-2001,India,Laptop,1,72000.0,2026-01-01


In [0]:
spark.read \
  .schema(sales_schema) \
  .option("header", "true") \
  .option("mode", "FAILFAST") \
  .csv("/Volumes/retail_db/bronze/raw_sales_files/batch_2026_01_03_bad") \
    .write.mode("append") \
    .saveAsTable("retail_db.bronze.sales_bronze")


[0;31m---------------------------------------------------------------------------[0m
[0;31mSparkException[0m                            Traceback (most recent call last)
File [0;32m<command-5712747043100182>, line 7[0m
[1;32m      1[0m spark[38;5;241m.[39mread \
[1;32m      2[0m   [38;5;241m.[39mschema(sales_schema) \
[1;32m      3[0m   [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m) \
[1;32m      4[0m   [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mmode[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mFAILFAST[39m[38;5;124m"[39m) \
[1;32m      5[0m   [38;5;241m.[39mcsv([38;5;124m"[39m[38;5;124m/Volumes/retail_db/bronze/raw_sales_files/batch_2026_01_03_bad[39m[38;5;124m"[39m) \
[1;32m      6[0m     [38;5;241m.[39mwrite[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124mappend[39m[38;5;124m"[39m) \
[0;32m----> 7[0m     [38;5;241m.[39msaveAsTable([38;

In [0]:
%sql
ALTER TABLE retail_db.bronze.sales_bronze
ADD CONSTRAINT valid_customer CHECK (customer_id IS NOT NULL);

ALTER TABLE retail_db.bronze.sales_bronze
ADD CONSTRAINT valid_amount CHECK (amount IS NOT NULL);


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-5712747043100181>, line 1[0m
[0;32m----> 1[0m get_ipython()[38;5;241m.[39mrun_cell_magic([38;5;124m'[39m[38;5;124msql[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124mALTER TABLE retail_db.bronze.sales_bronze[39m[38;5;130;01m\n[39;00m[38;5;124mADD CONSTRAINT valid_customer CHECK (customer_id IS NOT NULL);[39m[38;5;130;01m\n[39;00m[38;5;130;01m\n[39;00m[38;5;124mALTER TABLE retail_db.bronze.sales_bronze[39m[38;5;130;01m\n[39;00m[38;5;124mADD CONSTRAINT valid_amount CHECK (amount IS NOT NULL);[39m[38;5;130;01m\n[39;00m[38;5;124m'[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/IPython/core/interactiveshell.py:2541[0m, in [0;36mInteractiveShell.run_cell_magic[0;34m(self, magic_name, line, cell)[0m
[1;32m   2539

In [0]:
input_path = "/Volumes/retail_db/bronze/raw_sales_files/*"

df_raw = (
    spark.read
         .schema(sales_schema)
         .option("header", "true")
         .csv(input_path)
)

In [0]:
from pyspark.sql.functions import col

df_good = df_raw.filter(
    col("customer_id").isNotNull() &
    col("amount").isNotNull() &
    (col("quantity") > 0)
)

df_bad = df_raw.filter(
    col("customer_id").isNull() |
    col("amount").isNull() |
    (col("quantity") <= 0)
)

In [0]:
df_good.write.mode("append").saveAsTable(
    "retail_db.bronze.sales_bronze"
)

df_bad.write.mode("append").saveAsTable(
    "retail_db.bronze.sales_quarantine"
)


In [0]:
%sql
select * from retail_db.bronze.sales_quarantine;

customer_id,order_id,country,product,quantity,amount,ingest_date
,ORD-2005,India,Laptop,1,,2026-01-03


In [0]:
%sql
CREATE TABLE IF NOT EXISTS retail_db.silver.sales_silver (
 customer_id INT,
 order_id STRING,
 country STRING,
 product STRING,
 quantity INT,
 amount INT
) USING DELTA;

In [0]:
%sql
MERGE INTO retail_db.silver.sales_silver t
USING (
 SELECT customer_id,order_id,country,product,quantity,amount
 FROM (
   SELECT *, ROW_NUMBER() OVER (
     PARTITION BY order_id ORDER BY ingest_date DESC
   ) rn
   FROM retail_db.bronze.sales_bronze
 ) WHERE rn = 1
) s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

In [0]:
%sql
select * from retail_db.silver.sales_silver

In [0]:
%sql
UPDATE retail_db.silver.sales_silver
SET amount = 1
WHERE country = 'India';

In [0]:
%sql
DESCRIBE HISTORY retail_db.silver.sales_silver;

In [0]:
%sql
DESCRIBE HISTORY retail_db.bronze.sales_bronze;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
7,2026-01-20T07:20:25.000Z,78469401222449,pavan@learnlytica.com,RESTORE,"Map(version -> 1, timestamp -> null)",,List(2012657292728716),0120-060315-iw5fag29-v2n,6.0,Serializable,False,"Map(numRestoredFiles -> 0, removedFilesSize -> 10229, numRemovedFiles -> 5, restoredFilesSize -> 0, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numOfFilesAfterRestore -> 1, tableSizeAfterRestore -> 2044)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
6,2026-01-20T07:06:35.000Z,78469401222449,pavan@learnlytica.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2012657292728716),0120-060315-iw5fag29-v2n,5.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 6, numOutputBytes -> 2163)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
5,2026-01-20T06:15:00.000Z,78469401222449,pavan@learnlytica.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2012657292728716),0120-060315-iw5fag29-v2n,4.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 1821)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
4,2026-01-20T05:28:45.000Z,78469401222449,pavan@learnlytica.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2012657292728716),0120-042231-t2gsphmu-v2n,3.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 3, numOutputBytes -> 2157)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
3,2026-01-20T05:26:56.000Z,78469401222449,pavan@learnlytica.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2012657292728716),0120-042231-t2gsphmu-v2n,2.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 3, numOutputBytes -> 2044)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
2,2026-01-20T05:23:33.000Z,78469401222449,pavan@learnlytica.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2012657292728716),0120-042231-t2gsphmu-v2n,1.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 3, numOutputBytes -> 2044)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
1,2026-01-20T05:05:21.000Z,78469401222449,pavan@learnlytica.com,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2012657292728716),0120-042231-t2gsphmu-v2n,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 3, numOutputBytes -> 2044)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
0,2026-01-20T04:48:56.000Z,78469401222449,pavan@learnlytica.com,CREATE TABLE,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true"",""delta.writePartitionColumnsToParquet"":""true"",""delta.enableRowTracking"":""true"",""delta.rowTracking.materializedRowCommitVersionColumnName"":""_row-commit-version-col-ec43441f-9795-48cb-ad98-c22af0051a03"",""delta.rowTracking.materializedRowIdColumnName"":""_row-id-col-f64a34b4-7570-43ad-884d-ceeaae35dfc3""}, statsOnLoad -> false)",,List(2012657292728716),0120-042231-t2gsphmu-v2n,,WriteSerializable,True,Map(),,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13


In [0]:
%sql
SELECT *
FROM retail_db.bronze.sales_bronze
VERSION AS OF 1;

customer_id,order_id,country,product,quantity,amount,ingest_date
201,ORD-2001,India,Laptop,1,72000,2026-01-01
202,ORD-2002,USA,Phone,2,42000,2026-01-01
203,ORD-2003,India,Tablet,1,30000,2026-01-01


In [0]:
%sql
RESTORE TABLE retail_db.bronze.sales_bronze TO VERSION AS OF 1;

table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
2044,1,5,0,10229,0


In [0]:
%sql
SELECT *
FROM retail_db.silver.sales_silver
VERSION AS OF 5;


In [0]:
%sql
SELECT *
FROM retail_db.silver.sales_silver
TIMESTAMP AS OF '2026-01-19T19:10:40.000+00:00';


In [0]:
%sql
-- Before bad update
SELECT * FROM retail_db.silver.sales_silver VERSION AS OF 5;

-- After bad update
SELECT * FROM retail_db.silver.sales_silver VERSION AS OF 6;


In [0]:
%sql
RESTORE TABLE retail_db.silver.sales_silver TO VERSION AS OF 5;

In [0]:
%sql
CREATE OR REPLACE TABLE retail_db.gold.country_sales_summary
USING DELTA AS
SELECT country,
 COUNT(DISTINCT order_id) total_orders,
 SUM(amount) total_revenue
FROM retail_db.silver.sales_silver
GROUP BY country;

In [0]:
%sql
select * from retail_db.gold.country_sales_summary

In [0]:
%sql
OPTIMIZE retail_db.silver.sales_silver ZORDER BY (country);

In [0]:
%sql
VACUUM retail_db.silver.sales_silver RETAIN 168 HOURS;