# Delta Lake Operations

## The Story: From "Data Swamp" to Reliable Data

Your e-commerce company's previous data platform was a **Data Lake disaster**:

- Marketing updated customer segments, breaking downstream reports
- A failed job left half-written files - data corrupted for 2 days
- Finance asked for "data from last month" - no way to get it
- Data Scientists couldn't trust the data - "is this the latest version?"

**Delta Lake solves these problems.** This is the most important technology in modern Lakehouse.

## Theoretical Introduction

**Notebook Objective:** Comprehensive introduction to Delta Lake as a transactional storage layer over Data Lake

**Key Concepts:**
- **Delta Lake**: Open-source storage layer providing ACID transactions for Apache Spark
- **Delta Log**: Transactional log storing metadata about all table changes
- **Schema Enforcement & Evolution**: Automatic schema validation and controlled evolution
- **Time Travel**: Ability to access previous versions of data
- **Optimization**: Techniques to improve query performance (OPTIMIZE, Z-ORDER, Liquid Clustering)

**Why is this important?**
Delta Lake solves fundamental Data Lake problems: lack of transactions, schema drift, update difficulties, and quality assurance. It provides Data Warehouse reliability with Data Lake flexibility.

**Notebook Structure:**
1. **Section 1**: Delta Lake Core Features (Creating Tables, Schema Enforcement, Schema Evolution, Constraints)
2. **Section 2**: CRUD Operations & MERGE (INSERT, UPDATE, DELETE, MERGE INTO)
3. **Section 3**: Metadata and Analytics (DESCRIBE DETAIL, DESCRIBE HISTORY, Delta Log Internals)
4. **Section 4**: Time Travel & Disaster Recovery (VERSION AS OF, RESTORE, VACUUM implications)
5. **Section 5**: Optimization (Small Files Problem, Partitioning, Z-ORDER, Liquid Clustering)
6. **Section 6**: Change Data Feed & Change Data Capture (CDF vs CDC explained)

## Per-user Isolation

Run the initialization script for per-user catalog and schema isolation:

In [None]:
%run ../00_setup

## Configuration

Import libraries and set environment variables:

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime, timedelta

# Display user context
display(
    spark.createDataFrame([
        (CATALOG, BRONZE_SCHEMA, SILVER_SCHEMA, GOLD_SCHEMA)
    ], ['catalog', 'bronze_schema', 'silver_schema', 'gold_schema'])
)

# Set catalog and schema as default
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE SCHEMA {BRONZE_SCHEMA}")

---

## Section 1: Delta Lake Core Features

**Theoretical Introduction:**

Delta Lake is a transactional layer over Parquet that provides ACID properties (Atomicity, Consistency, Isolation, Durability). Every operation on a Delta table is recorded in the Delta Log - a JSON file containing metadata about changes.

**Key Concepts:**
- **ACID Transactions**: All operations are atomic and consistent
- **Delta Log**: `_delta_log/` folder with JSON files describing each transaction
- **Schema Enforcement**: Automatic schema validation - prevents bad data from entering
- **Schema Evolution**: Controlled addition of new columns without breaking existing pipelines
- **Constraints**: Data quality rules enforced at the table level

**Practical Application:**
- Transactional updates in Data Lake
- Ensuring data quality through schema validation and constraints
- Unified data access for batch and streaming workloads

### Example 1.1: Creating the First Delta Table

**Objective:** Demonstration of creating a Delta table and basic properties

**Approach:**
1. Load data from Unity Catalog Volume
2. Create a managed table in Delta format
3. Explore Delta Log and metadata

In [None]:
# Load customer data from Unity Catalog Volume
customers_df = (spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(f"{DATASET_BASE_PATH}/customers/customers.csv")
)

**Create managed Delta table:**

In [None]:
# Create managed Delta table
customers_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(f"{CATALOG}.{BRONZE_SCHEMA}.customers_delta")

**Display result:**

In [None]:
display(spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.customers_delta").limit(5))

**Explanation:**

A managed Delta table was created in Unity Catalog. The Delta format automatically:
- Created `_delta_log/` folder with transaction metadata
- Registered table schema in Unity Catalog
- Applied Parquet compression with additional Delta features

In [None]:
# Write customers_df as an external Delta table to a specified path
external_path = f"{DATASET_BASE_PATH}/external/customers_delta"
customers_df.write.format("delta").mode("overwrite").save(external_path)

### Example 1.2: Schema Enforcement in Action

**Objective:** Demonstration of automatic schema validation during data insertion

Schema Enforcement is a critical feature that prevents "garbage in, garbage out" scenarios. Delta Lake compares incoming data schema with the target table schema and **rejects incompatible writes**.

In [None]:
# Check current table schema
spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.customers_delta").printSchema()

In [None]:
# Attempt to insert data with invalid schema (missing columns)
invalid_data = spark.createDataFrame([
    ("CUST999999", "Test", "Customer", "invalid_email", "+48 123 456 789")  # Missing city, state, country, registration_date, customer_segment
], ["customer_id", "first_name", "last_name", "email", "phone"])

try:
    invalid_data.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable(f"{CATALOG}.{BRONZE_SCHEMA}.customers_delta")
except Exception as e:
    display(
        spark.createDataFrame([
            ("Schema enforcement in action", str(e)[:200] + "...")
        ], ["message", "error"])
    )

**Explanation:**

Schema enforcement automatically rejected data with incompatible schema. Delta Lake compares the new data schema with the table schema and blocks incompatible insertions, ensuring consistency.

### Example 1.3: Identity and Generated Columns

**Objective:** Demonstrate advanced column features - auto-generated surrogate keys and computed columns

Delta Lake supports:
- **IDENTITY columns**: Auto-incrementing surrogate keys (unique, increasing, but not contiguous)
- **GENERATED columns**: Computed columns derived from other columns

In [None]:
# Create table with Identity Column and Generated Column
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {CATALOG}.{BRONZE_SCHEMA}.orders_modern (
    order_sk BIGINT GENERATED ALWAYS AS IDENTITY,  -- Surrogate Key
    order_id STRING,
    total_amount DOUBLE,
    order_timestamp TIMESTAMP,
    order_date DATE GENERATED ALWAYS AS (CAST(order_timestamp AS DATE)) -- Auto-calculated
) USING DELTA
""")

> **Note:** In a distributed environment like Databricks (Spark/Delta Lake), `GENERATED ALWAYS AS IDENTITY` has specific behaviors. It guarantees **uniqueness** and an **increasing trend**, but does NOT guarantee contiguous numbering.

In [None]:
display(spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.orders_modern"))

Now we will insert data. Note that in the `INSERT` query we omit `order_sk` and `order_date` columns:
- `order_sk`: will be generated automatically (unique number)
- `order_date`: will be calculated based on `order_timestamp`

In [None]:
# Insert data without specifying generated columns
spark.sql(f"""
INSERT INTO {CATALOG}.{BRONZE_SCHEMA}.orders_modern (order_id, total_amount, order_timestamp)
VALUES 
    ('ORD-001', 150.50, current_timestamp()),
    ('ORD-002', 200.00, current_timestamp())
""")

In [None]:
display(spark.table("orders_modern"))

### Example 1.4: Schema Evolution

**Objective:** Demonstration of automatic schema evolution when adding new columns

Schema Evolution allows for controlled addition of new columns to existing Delta tables without interrupting application operations. Delta Lake supports additive schema changes automatically when enabled with `mergeSchema` option.

In [None]:
# Data with additional column (customer_tier)
extended_customers = spark.createDataFrame([
    ("CUST010001", "New", "Customer", "new@example.com", "+48 111 222 333", "Warsaw", "MZ", "Poland", "2023-12-01", "Basic", "Premium"),
    ("CUST010002", "Another", "Customer", "another@example.com", "+48 444 555 666", "Krakow", "MP", "Poland", "2023-12-02", "Premium", "Standard")
], ["customer_id", "first_name", "last_name", "email", "phone", "city", "state", "country", "registration_date", "customer_segment", "customer_tier"])

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import DateType

# Cast registration_date to proper type
extended_customers = extended_customers.withColumn("registration_date", col("registration_date").cast(DateType()))

In [None]:
# Enable automatic schema evolution with mergeSchema option
extended_customers.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .saveAsTable(f"{CATALOG}.{BRONZE_SCHEMA}.customers_delta")

In [None]:
# Check new schema - notice the new customer_tier column
spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.customers_delta").printSchema()

In [None]:
# Verify data - new column has NULL for old records
display(
    spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.customers_delta")
    .select("customer_id", "first_name", "last_name", "customer_tier")
)

### Example 1.5: Data Quality with Constraints

**Objective:** Enforce data quality rules at the table level using CHECK constraints

Delta Lake allows defining **Constraints** that guarantee data quality at the table level. This works similarly to traditional SQL databases.

**Constraint Types:**
- `NOT NULL`: Enforces the presence of a value
- `CHECK`: Enforces any logical condition (e.g., `age > 0`, `customer_id LIKE 'CUST%'`)

In [None]:
# Add CHECK constraint: customer_id must start with CUST
try:
    spark.sql(f"""
        ALTER TABLE {CATALOG}.{BRONZE_SCHEMA}.customers_delta
        ADD CONSTRAINT valid_customer_id CHECK (customer_id LIKE 'CUST%')
    """)
    print("Constraint 'valid_customer_id' added successfully.")
except Exception as e:
    print(f"Info: {e}")

Now let's try to insert data that violates the constraint (customer_id does not start with 'CUST').
We expect Delta Lake to block this operation and return a `CheckConstraintViolation` error.

In [None]:
# Attempt to insert invalid data (customer_id does not start with CUST)
try:
    spark.sql(f"""
        INSERT INTO {CATALOG}.{BRONZE_SCHEMA}.customers_delta (customer_id, first_name, last_name, email, phone, city, state, country, registration_date, customer_segment)
        VALUES ('INVALID123', 'Bad', 'Customer', 'bad@example.com', '+48 000 000 000', 'Test', 'TS', 'Poland', '2023-01-01', 'Basic')
    """)
except Exception as e:
    print(f"Expected Data Quality error:\n{str(e)[:300]}...")

---

## Section 2: CRUD Operations & MERGE

**Theoretical Introduction:**

Delta Lake supports the full range of CRUD operations (Create, Read, Update, Delete), making it ideal for transactional workloads in Data Lake. All operations are:
- **Atomic**: Either fully complete or fully rolled back
- **ACID-compliant**: Ensuring data consistency
- **Recorded in Delta Log**: Full audit trail of all changes

Additionally, Delta Lake provides the powerful **MERGE INTO** operation (also known as "upsert") that combines INSERT and UPDATE in a single atomic transaction - essential for CDC (Change Data Capture) scenarios.

### Example 2.1: INSERT Operation

**Objective:** Adding new records to an existing table

In [None]:
# INSERT new customers
spark.sql(f"""
    INSERT INTO {CATALOG}.{BRONZE_SCHEMA}.customers_delta
    (customer_id, first_name, last_name, email, phone, city, state, country, registration_date, customer_segment, customer_tier)
    VALUES 
        ('CUST020001', 'Insert', 'Customer1', 'insert1@example.com', '+48 111 111 111', 'Warsaw', 'MZ', 'Poland', '2023-12-10', 'Premium', 'Gold'),
        ('CUST020002', 'Insert', 'Customer2', 'insert2@example.com', '+48 222 222 222', 'Gdansk', 'PM', 'Poland', '2023-12-11', 'Basic', 'Silver')
""")

In [None]:
# Verify insertion
display(
    spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.customers_delta")
    .filter(F.col("customer_id").like("CUST02%"))
    .orderBy("customer_id")
)

### Example 2.2: UPDATE Operation

**Objective:** Updating existing records in the table

In [None]:
# UPDATE customer tier for specific customers
spark.sql(f"""
    UPDATE {CATALOG}.{BRONZE_SCHEMA}.customers_delta
    SET customer_tier = 'Platinum'
    WHERE customer_id IN ('CUST010001', 'CUST020001')
""")

In [None]:
# Verify update
display(
    spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.customers_delta")
    .filter(F.col("customer_tier") == "Platinum")
)

### Example 2.3: DELETE Operation

**Objective:** Deleting records from a Delta table

In [None]:
# DELETE specific customer
spark.sql(f"""
    DELETE FROM {CATALOG}.{BRONZE_SCHEMA}.customers_delta
    WHERE customer_id = 'CUST020002'
""")

In [None]:
# Verify deletion
deleted_check = spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.customers_delta") \
    .filter(F.col("customer_id") == "CUST020002") \
    .count()

display(
    spark.createDataFrame([
        ("Records with customer_id CUST020002", deleted_check)
    ], ["description", "count"])
)

### Example 2.4: MERGE INTO (Upsert)

**Objective:** Demonstration of upsert operation - update existing and insert new records in a single atomic transaction

MERGE INTO is especially useful when processing changes from transactional systems (CDC patterns). It allows you to:
- **Update** existing records when a match is found
- **Insert** new records when no match exists
- **Delete** records based on conditions (optional)

In [None]:
# Prepare data for merge (mix of updates and new records)
merge_data = spark.createDataFrame([
    ("CUST010001", "Updated", "Name", "updated@example.com", "+48 999 999 999", "Poznan", "WP", "Poland", "2023-12-01", "VIP", "Diamond"),  # Update existing
    ("CUST030001", "Brand", "New", "brand.new@example.com", "+48 777 777 777", "Wroclaw", "DS", "Poland", "2023-12-15", "Basic", "Bronze"),   # Insert new
    ("CUST030002", "Another", "New", "another.new@example.com", "+48 888 888 888", "Lodz", "LD", "Poland", "2023-12-16", "Premium", "Silver") # Insert new
], ["customer_id", "first_name", "last_name", "email", "phone", "city", "state", "country", "registration_date", "customer_segment", "customer_tier"])

# Create temporary view for merge operation
merge_data.createOrReplaceTempView("customer_updates")

In [None]:
# MERGE INTO operation (Upsert)
spark.sql(f"""
    MERGE INTO {CATALOG}.{BRONZE_SCHEMA}.customers_delta AS target
    USING customer_updates AS source
    ON target.customer_id = source.customer_id
    
    WHEN MATCHED THEN
        UPDATE SET
            first_name = source.first_name,
            last_name = source.last_name,
            email = source.email,
            phone = source.phone,
            city = source.city,
            state = source.state,
            country = source.country,
            customer_segment = source.customer_segment,
            customer_tier = source.customer_tier
    
    WHEN NOT MATCHED THEN
        INSERT (customer_id, first_name, last_name, email, phone, city, state, country, registration_date, customer_segment, customer_tier)
        VALUES (source.customer_id, source.first_name, source.last_name, source.email, source.phone, source.city, source.state, source.country, source.registration_date, source.customer_segment, source.customer_tier)
""")

In [None]:
# Verify MERGE results
display(
    spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.customers_delta")
    .filter(F.col("customer_id").isin(["CUST010001", "CUST030001", "CUST030002"]))
    .orderBy("customer_id")
)

---

## Section 3: Metadata and Analytics

**Theoretical Introduction:**

Delta Lake offers rich metadata about tables and operations that enables:
- **Auditing**: Who changed what and when
- **Debugging**: Understanding operation performance and metrics
- **Compliance**: Meeting regulatory requirements for data lineage

**Key Commands:**
- `DESCRIBE DETAIL`: File structure, partitioning, table properties
- `DESCRIBE HISTORY`: Complete audit trail of all operations
- `SHOW TBLPROPERTIES`: Table configuration and settings

### Example 3.1: DESCRIBE DETAIL

**Objective:** Analysis of Delta table metadata and physical storage details

In [None]:
# Detailed table information
display(
    spark.sql(f"DESCRIBE DETAIL {CATALOG}.{BRONZE_SCHEMA}.customers_delta")
)

### Example 3.2: Operation History Analysis

**Objective:** Deeper analysis of history and operation metrics

In [None]:
# History with additional metrics
history_df = spark.sql(f"DESCRIBE HISTORY {CATALOG}.{BRONZE_SCHEMA}.customers_delta")

display(
    history_df.select(
        "version", 
        "timestamp", 
        "operation", 
        "operationMetrics.numTargetRowsInserted",
        "operationMetrics.numTargetRowsUpdated",
        "operationMetrics.numTargetRowsDeleted"
    )
)

### Example 3.3: Delta Log Internals (Deep Dive)

**Objective:** Understanding how Delta Lake ensures ACID by looking "under the hood" at JSON files in `_delta_log`

The Delta Log is a transaction log stored in the `_delta_log/` folder. Each transaction creates a new JSON file containing:
- `add`: Adding a new Parquet file with data
- `remove`: Logical deletion of a file (e.g., during DELETE or OPTIMIZE)
- `commitInfo`: Metadata about the transaction (who, when, what operation)

In [None]:
# Get table path
table_details = spark.sql(f"DESCRIBE DETAIL {CATALOG}.{BRONZE_SCHEMA}.customers_delta")
display(table_details.select("location", "numFiles", "sizeInBytes"))

In [None]:
# View complete Delta table history
display(spark.sql(f"DESCRIBE HISTORY {CATALOG}.{BRONZE_SCHEMA}.customers_delta"))

---

## Section 4: Time Travel and Disaster Recovery

**Theoretical Introduction:**

Time Travel is a fundamental Delta Lake feature enabling access to previous versions of data. It is based on the **Copy-on-Write** mechanism - every change creates a new version of files, while old versions remain available until cleaned up by VACUUM.

**Key Capabilities:**
- **VERSION AS OF**: Query data at a specific version number
- **TIMESTAMP AS OF**: Query data at a specific point in time
- **RESTORE**: Rollback table to a previous state
- **Audit**: Compare data between versions

**Important Consideration:** VACUUM removes old files and directly impacts Time Travel capabilities. Understanding this relationship is crucial for data retention strategies.

### Example 4.1: Creating a Dedicated Table for Time Travel Demo

**Objective:** Create a fresh table to demonstrate Time Travel features clearly

In [None]:
# Create a new table specifically for Time Travel demonstration
spark.sql(f"""
CREATE OR REPLACE TABLE {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo (
    id INT,
    name STRING,
    status STRING,
    updated_at TIMESTAMP
) USING DELTA
""")

# Version 0: Insert initial data
spark.sql(f"""
INSERT INTO {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo VALUES
    (1, 'Alice', 'active', current_timestamp()),
    (2, 'Bob', 'active', current_timestamp()),
    (3, 'Charlie', 'active', current_timestamp())
""")

print("Version 0: Initial data inserted")
display(spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.time_travel_demo"))

In [None]:
# Version 1: Update some records
spark.sql(f"""
UPDATE {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo
SET status = 'premium', updated_at = current_timestamp()
WHERE name = 'Alice'
""")

print("Version 1: Alice upgraded to premium")
display(spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.time_travel_demo"))

In [None]:
# Version 2: Insert new record
spark.sql(f"""
INSERT INTO {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo VALUES
    (4, 'Diana', 'new', current_timestamp())
""")

print("Version 2: Diana added")
display(spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.time_travel_demo"))

In [None]:
# Version 3: Delete a record
spark.sql(f"""
DELETE FROM {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo
WHERE name = 'Charlie'
""")

print("Version 3: Charlie deleted")
display(spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.time_travel_demo"))

### Example 4.2: Table History Exploration

**Objective:** Use DESCRIBE HISTORY to analyze all operations on the table

In [None]:
# Show complete history of all operations
display(
    spark.sql(f"DESCRIBE HISTORY {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo")
)

### Example 4.3: Time Travel Queries

**Objective:** Access previous versions of data using VERSION AS OF and TIMESTAMP AS OF

In [None]:
# Access data from version 0 (initial state)
print("Version 0 - Initial data (before any changes):")
display(
    spark.sql(f"SELECT * FROM {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo VERSION AS OF 0")
)

In [None]:
# Access data from version 1 (after Alice upgrade)
print("Version 1 - After Alice upgrade:")
display(
    spark.sql(f"SELECT * FROM {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo VERSION AS OF 1")
)

In [None]:
# Compare record counts between versions
version_counts = []
for v in range(4):
    count = spark.sql(f"SELECT COUNT(*) as cnt FROM {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo VERSION AS OF {v}").first()[0]
    version_counts.append((f"Version {v}", count))

display(spark.createDataFrame(version_counts, ["version", "record_count"]))

### Example 4.4: Disaster Recovery - Accidental Deletion

**Objective:** Simulate accidental data deletion and recover using RESTORE

In [None]:
# DISASTER! Accidental deletion of ALL data
spark.sql(f"DELETE FROM {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo")

print("Oh no! All data deleted!")
print("Record count after deletion:", spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.time_travel_demo").count())

In [None]:
# Check history to find the last good version
history = spark.sql(f"DESCRIBE HISTORY {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo")
display(history.select("version", "timestamp", "operation"))

In [None]:
# RESTORE to version before the accidental deletion
# The last good version is the one before DELETE (version 3 in our case)
last_good_version = spark.sql(f"""
    SELECT version FROM (
        SELECT version, operation, 
               ROW_NUMBER() OVER (ORDER BY version DESC) as rn
        FROM (DESCRIBE HISTORY {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo)
        WHERE operation != 'DELETE'
    ) WHERE rn = 1
""").first()[0]

print(f"Restoring to version: {last_good_version}")
spark.sql(f"RESTORE TABLE {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo TO VERSION AS OF {last_good_version}")

In [None]:
# Verify restoration
print("Data restored successfully!")
print("Record count after RESTORE:", spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.time_travel_demo").count())
display(spark.table(f"{CATALOG}.{BRONZE_SCHEMA}.time_travel_demo"))

### Example 4.5: VACUUM and Its Impact on Time Travel

**Objective:** Understand how VACUUM affects Time Travel capabilities

**Critical Concept:** VACUUM removes old data files that are no longer referenced by the current version of the table. Once vacuumed, **Time Travel to those versions becomes impossible**.

**Default Retention:** 7 days (168 hours)
- This means you can Time Travel to any version within the last 7 days
- After VACUUM, only versions within the retention period are accessible

In [None]:
# Check current table size and files BEFORE VACUUM
print("=== BEFORE VACUUM ===")
before_vacuum = spark.sql(f"DESCRIBE DETAIL {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo")
display(before_vacuum.select("numFiles", "sizeInBytes"))

In [None]:
# Let's see what versions are available BEFORE vacuum
print("Available versions before VACUUM:")
display(spark.sql(f"DESCRIBE HISTORY {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo").select("version", "timestamp", "operation"))

In [None]:
# VACUUM with 0 hours retention (DEMO ONLY - requires disabling safety check)
# In production, NEVER use 0 hours - use default 7 days or more!
spark.sql("SET spark.databricks.delta.retentionDurationCheck.enabled = false")

vacuum_result = spark.sql(f"""
    VACUUM {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo RETAIN 0 HOURS
""")

display(vacuum_result)

In [None]:
# Check table size AFTER VACUUM
print("=== AFTER VACUUM ===")
after_vacuum = spark.sql(f"DESCRIBE DETAIL {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo")
display(after_vacuum.select("numFiles", "sizeInBytes"))

In [None]:
# Now try to access an old version - this will fail!
try:
    spark.sql(f"SELECT * FROM {CATALOG}.{BRONZE_SCHEMA}.time_travel_demo VERSION AS OF 0").show()
except Exception as e:
    print("⚠️ Time Travel failed after VACUUM!")
    print(f"Error: {str(e)[:200]}...")

**Key Takeaway:** 
- VACUUM is essential for storage optimization (removes orphaned files)
- But it **permanently destroys** the ability to Time Travel to vacuumed versions
- Always set appropriate retention period based on your recovery requirements
- Default 7 days is a good balance for most use cases

---

## Section 5: Optimization

**Theoretical Introduction:**

As data grows, query performance can degrade due to several factors:
- **Small Files Problem**: Too many small files increase metadata overhead
- **Data Layout**: Data not organized for common query patterns
- **Predicate Pushdown Inefficiency**: Scanning more data than necessary

Delta Lake provides several optimization techniques:

| Technique | Description | When to Use |
|-----------|-------------|-------------|
| **OPTIMIZE** | Compacts small files into larger ones | After many small writes |
| **Partitioning** | Physical data separation by column values | High-cardinality filter columns |
| **Z-ORDER** | Co-locates related data for better pruning | Frequently filtered columns |
| **Liquid Clustering** | Modern alternative to partitioning + Z-ORDER | New tables (recommended) |

### Example 5.1: The Small Files Problem

**Objective:** Demonstrate how many small files impact performance and how OPTIMIZE solves it

The "small files problem" occurs when:
- Streaming jobs write many small files
- Frequent small batch inserts
- High-concurrency writes

This leads to:
- Increased metadata overhead
- Slower query performance
- Higher storage costs (metadata per file)

In [None]:
# Create a table with many small files (simulating streaming ingestion)
spark.sql(f"""
CREATE OR REPLACE TABLE {CATALOG}.{BRONZE_SCHEMA}.small_files_demo (
    id INT,
    data STRING,
    created_at TIMESTAMP
) USING DELTA
""")

# Insert data in many small batches (simulating streaming)
from pyspark.sql.functions import lit, current_timestamp
import random
import string

print("Inserting 500 small batches to simulate streaming ingestion...")

for i in range(500):
    # Each batch has only 10-50 records
    batch_size = random.randint(10, 50)
    data = [(j, ''.join(random.choices(string.ascii_letters, k=20)), None) 
            for j in range(batch_size)]
    df = spark.createDataFrame(data, ["id", "data", "created_at"]) \
        .withColumn("created_at", current_timestamp())
    df.write.format("delta").mode("append").saveAsTable(f"{CATALOG}.{BRONZE_SCHEMA}.small_files_demo")

print("Done! 500 small batches inserted.")

In [None]:
# Check the number of files BEFORE optimization
print("=== BEFORE OPTIMIZE ===")
before_optimize = spark.sql(f"DESCRIBE DETAIL {CATALOG}.{BRONZE_SCHEMA}.small_files_demo")
display(before_optimize.select("numFiles", "sizeInBytes"))

In [None]:
# Run OPTIMIZE to compact small files
optimize_result = spark.sql(f"""
    OPTIMIZE {CATALOG}.{BRONZE_SCHEMA}.small_files_demo
""")

display(optimize_result)

In [None]:
# Check the number of files AFTER optimization
print("=== AFTER OPTIMIZE ===")
after_optimize = spark.sql(f"DESCRIBE DETAIL {CATALOG}.{BRONZE_SCHEMA}.small_files_demo")
display(after_optimize.select("numFiles", "sizeInBytes"))

### Example 5.2: Partitioning

**Objective:** Demonstrate how partitioning improves query performance for filtered queries

Partitioning physically separates data into directories based on column values. This enables:
- **Partition Pruning**: Skip entire partitions that don't match the filter
- **Parallel Processing**: Process partitions independently
- **Efficient Deletes/Updates**: Only touch affected partitions

**Best Practices:**
- Use low-cardinality columns (e.g., date, country, status)
- Avoid over-partitioning (too many small partitions)
- Consider partition size: aim for 1GB+ per partition

In [None]:
# Create a partitioned table
spark.sql(f"""
CREATE OR REPLACE TABLE {CATALOG}.{BRONZE_SCHEMA}.orders_partitioned (
    order_id STRING,
    customer_id STRING,
    product_id STRING,
    order_date DATE,
    amount DOUBLE,
    status STRING
) 
USING DELTA
PARTITIONED BY (order_date)
""")

In [None]:
# Insert sample data across multiple dates
from datetime import date, timedelta

orders_data = []
base_date = date(2024, 1, 1)

for day_offset in range(30):  # 30 days of data
    order_date = base_date + timedelta(days=day_offset)
    for i in range(100):  # 100 orders per day
        orders_data.append((
            f"ORD-{day_offset:02d}-{i:04d}",
            f"CUST{i % 50:04d}",
            f"PROD{i % 20:03d}",
            order_date,
            round(50 + (i * 2.5), 2),
            "completed" if i % 3 != 0 else "pending"
        ))

orders_df = spark.createDataFrame(orders_data, 
    ["order_id", "customer_id", "product_id", "order_date", "amount", "status"])

orders_df.write.format("delta").mode("append").partitionBy("order_date") \
    .saveAsTable(f"{CATALOG}.{BRONZE_SCHEMA}.orders_partitioned")

print(f"Inserted {len(orders_data)} orders across 30 days")

In [None]:
# Check partitioning structure
display(spark.sql(f"DESCRIBE DETAIL {CATALOG}.{BRONZE_SCHEMA}.orders_partitioned"))

In [None]:
# Query with partition filter - only scans relevant partitions
# Check the Spark UI to see partition pruning in action
result = spark.sql(f"""
    SELECT * FROM {CATALOG}.{BRONZE_SCHEMA}.orders_partitioned
    WHERE order_date = '2024-01-15'
""")

print("Query for single date (should scan only 1 partition):")
display(result)

### Example 5.3: Z-ORDER (Data Skipping)

**Objective:** Demonstrate how Z-ORDER improves query performance by co-locating related data

Z-ORDER is a multi-dimensional clustering technique that:
- Co-locates related data in the same files
- Enables efficient data skipping based on file-level statistics
- Works best with high-cardinality columns used in filters

**When to use Z-ORDER:**
- Columns frequently used in WHERE clauses
- Columns with high cardinality
- Can specify up to 4 columns (effectiveness decreases with more)

In [None]:
# Create a table for Z-ORDER demonstration
spark.sql(f"""
CREATE OR REPLACE TABLE {CATALOG}.{BRONZE_SCHEMA}.sales_zorder_demo (
    sale_id STRING,
    customer_id STRING,
    product_id STRING,
    store_id STRING,
    sale_date DATE,
    amount DOUBLE,
    quantity INT
) USING DELTA
""")

In [None]:
# Insert sample data
from datetime import date
import random

sales_data = []
for i in range(100000):  # 100K records
    sales_data.append((
        f"SALE-{i:08d}",
        f"CUST{random.randint(1, 1000):04d}",
        f"PROD{random.randint(1, 500):03d}",
        f"STORE{random.randint(1, 50):02d}",
        date(2024, random.randint(1, 12), random.randint(1, 28)),
        round(random.uniform(10, 500), 2),
        random.randint(1, 10)
    ))

sales_df = spark.createDataFrame(sales_data, 
    ["sale_id", "customer_id", "product_id", "store_id", "sale_date", "amount", "quantity"])

sales_df.write.format("delta").mode("append") \
    .saveAsTable(f"{CATALOG}.{BRONZE_SCHEMA}.sales_zorder_demo")

print("Inserted 100,000 sales records")

In [None]:
# Check file statistics BEFORE Z-ORDER
print("=== BEFORE Z-ORDER ===")
display(spark.sql(f"DESCRIBE DETAIL {CATALOG}.{BRONZE_SCHEMA}.sales_zorder_demo"))

In [None]:
# Apply Z-ORDER on frequently filtered columns
# In this case: customer_id and product_id are common filter columns
zorder_result = spark.sql(f"""
    OPTIMIZE {CATALOG}.{BRONZE_SCHEMA}.sales_zorder_demo
    ZORDER BY (customer_id, product_id)
""")

display(zorder_result)

In [None]:
# Now queries filtering by customer_id or product_id will skip more files
# Example query that benefits from Z-ORDER
result = spark.sql(f"""
    SELECT * FROM {CATALOG}.{BRONZE_SCHEMA}.sales_zorder_demo
    WHERE customer_id = 'CUST0042' AND product_id = 'PROD123'
""")

print("Query with Z-ORDER optimized columns (check Spark UI for data skipping):")
display(result)

### Example 5.4: Liquid Clustering (Modern Approach)

**Objective:** Introduce Liquid Clustering as a modern replacement for partitioning and Z-ORDER

**Liquid Clustering** is Databricks' newest optimization technique that:
- Automatically manages data layout
- Adapts to changing query patterns
- Eliminates need for manual partitioning decisions
- Works incrementally (no need to re-cluster entire table)

**Key Benefits:**
- No upfront partitioning decisions required
- Can change clustering columns without rewriting data
- Better performance for evolving workloads
- Simpler to manage than partitioning + Z-ORDER combo

In [None]:
# Create a table with Liquid Clustering
spark.sql(f"""
CREATE OR REPLACE TABLE {CATALOG}.{BRONZE_SCHEMA}.sales_liquid_clustering (
    sale_id STRING,
    customer_id STRING,
    product_id STRING,
    region STRING,
    sale_date DATE,
    amount DOUBLE,
    quantity INT
) 
USING DELTA
CLUSTER BY (customer_id, region)  -- Liquid Clustering columns
""")

In [None]:
# Insert sample data
from datetime import date
import random

regions = ['North', 'South', 'East', 'West', 'Central']

sales_data = []
for i in range(50000):
    sales_data.append((
        f"SALE-{i:08d}",
        f"CUST{random.randint(1, 500):04d}",
        f"PROD{random.randint(1, 200):03d}",
        random.choice(regions),
        date(2024, random.randint(1, 12), random.randint(1, 28)),
        round(random.uniform(10, 500), 2),
        random.randint(1, 10)
    ))

sales_df = spark.createDataFrame(sales_data, 
    ["sale_id", "customer_id", "product_id", "region", "sale_date", "amount", "quantity"])

sales_df.write.format("delta").mode("append") \
    .saveAsTable(f"{CATALOG}.{BRONZE_SCHEMA}.sales_liquid_clustering")

print("Inserted 50,000 sales records into Liquid Clustering table")

In [None]:
# OPTIMIZE automatically applies Liquid Clustering
# No need to specify ZORDER - it's built into the table definition!
optimize_result = spark.sql(f"""
    OPTIMIZE {CATALOG}.{BRONZE_SCHEMA}.sales_liquid_clustering
""")

display(optimize_result)

In [None]:
# Check clustering information
display(spark.sql(f"DESCRIBE DETAIL {CATALOG}.{BRONZE_SCHEMA}.sales_liquid_clustering"))

In [None]:
# Queries filtering by clustering columns are automatically optimized
result = spark.sql(f"""
    SELECT region, COUNT(*) as sales_count, SUM(amount) as total_amount
    FROM {CATALOG}.{BRONZE_SCHEMA}.sales_liquid_clustering
    WHERE customer_id LIKE 'CUST00%' AND region = 'North'
    GROUP BY region
""")

display(result)

**Comparison: Partitioning vs Z-ORDER vs Liquid Clustering**

| Feature | Partitioning | Z-ORDER | Liquid Clustering |
|---------|-------------|---------|-------------------|
| When to choose | Low-cardinality columns | High-cardinality filter columns | General purpose (recommended) |
| Data layout | Directory per partition | Co-located in files | Automatic clustering |
| Schema change | Requires rewrite | Easy to change | Easy to change |
| Maintenance | Manual | Manual OPTIMIZE | Automatic with OPTIMIZE |
| Best for | Date/Region filters | Multi-column filters | Evolving workloads |

---

## Section 6: Change Data Feed vs Change Data Capture

**Theoretical Introduction:**

Two terms are often confused in the data engineering world: **Change Data Feed (CDF)** and **Change Data Capture (CDC)**. Understanding the difference is crucial:

### Change Data Capture (CDC)
**What it is:** A *pattern/technique* for capturing changes from source systems (databases, APIs, etc.)

**Characteristics:**
- Source-side technology
- Captures INSERT, UPDATE, DELETE from operational databases
- Tools: Debezium, AWS DMS, Fivetran, Qlik Replicate
- Produces a stream of change events

**Example:** Capturing changes from PostgreSQL and streaming them to Kafka

### Change Data Feed (CDF)
**What it is:** A *Delta Lake feature* that records row-level changes within Delta tables

**Characteristics:**
- Delta Lake native feature
- Tracks changes that happen WITHIN Delta tables
- Provides `_change_type`, `_commit_version`, `_commit_timestamp` columns
- Enables efficient incremental processing

**Example:** Reading only the rows that changed since the last pipeline run

### How They Work Together
```
[Source DB] --CDC--> [Bronze Delta] --CDF--> [Silver Delta] --CDF--> [Gold Delta]
     ^                    ^                      ^                       ^
     |                    |                      |                       |
   CDC captures      CDF tracks            CDF tracks              CDF tracks
   source changes    Bronze changes        Silver changes          Gold changes
```

### Example 6.1: Enabling Change Data Feed

**Objective:** Enable CDF on a Delta table and understand what metadata is captured

In [None]:
# Create a table with CDF enabled from the start
spark.sql(f"""
CREATE OR REPLACE TABLE {CATALOG}.{BRONZE_SCHEMA}.cdf_demo (
    user_id STRING,
    name STRING,
    email STRING,
    status STRING,
    updated_at TIMESTAMP
) 
USING DELTA
TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

print("Table created with Change Data Feed enabled")

In [None]:
# Verify CDF is enabled
properties = spark.sql(f"SHOW TBLPROPERTIES {CATALOG}.{BRONZE_SCHEMA}.cdf_demo")
display(properties.filter(F.col("key").like("%change%")))

### Example 6.2: Generating and Tracking Changes

**Objective:** Execute various DML operations and see how CDF records them

In [None]:
# INSERT some initial data
spark.sql(f"""
INSERT INTO {CATALOG}.{BRONZE_SCHEMA}.cdf_demo VALUES
    ('U001', 'Alice', 'alice@example.com', 'active', current_timestamp()),
    ('U002', 'Bob', 'bob@example.com', 'active', current_timestamp()),
    ('U003', 'Charlie', 'charlie@example.com', 'active', current_timestamp())
""")
print("Version 1: Initial INSERT completed")

In [None]:
# UPDATE a record
spark.sql(f"""
UPDATE {CATALOG}.{BRONZE_SCHEMA}.cdf_demo
SET status = 'premium', updated_at = current_timestamp()
WHERE user_id = 'U001'
""")
print("Version 2: UPDATE completed - Alice upgraded to premium")

In [None]:
# DELETE a record
spark.sql(f"""
DELETE FROM {CATALOG}.{BRONZE_SCHEMA}.cdf_demo
WHERE user_id = 'U002'
""")
print("Version 3: DELETE completed - Bob removed")

In [None]:
# INSERT new record
spark.sql(f"""
INSERT INTO {CATALOG}.{BRONZE_SCHEMA}.cdf_demo VALUES
    ('U004', 'Diana', 'diana@example.com', 'trial', current_timestamp())
""")
print("Version 4: INSERT completed - Diana added")

### Example 6.3: Reading Change Data Feed

**Objective:** Query the Change Data Feed to see all recorded changes

In [None]:
# Read all changes from the beginning
changes = spark.read \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .table(f"{CATALOG}.{BRONZE_SCHEMA}.cdf_demo")

# Show changes with CDF metadata columns
display(
    changes.select(
        "user_id", "name", "status",
        "_change_type",        # insert, update_preimage, update_postimage, delete
        "_commit_version",     # Delta version number
        "_commit_timestamp"    # When the change occurred
    ).orderBy("_commit_version", "user_id")
)

**Understanding `_change_type` values:**

| Change Type | Description |
|-------------|-------------|
| `insert` | New row inserted |
| `update_preimage` | Row value BEFORE update |
| `update_postimage` | Row value AFTER update |
| `delete` | Row that was deleted |

This enables powerful incremental processing patterns - you can process only what changed!

In [None]:
# Example: Get only new inserts since version 2
new_inserts = spark.read \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 2) \
    .table(f"{CATALOG}.{BRONZE_SCHEMA}.cdf_demo") \
    .filter(F.col("_change_type") == "insert")

print("New inserts since version 2:")
display(new_inserts.select("user_id", "name", "status", "_commit_version"))

In [None]:
# Example: Get all deletions for audit purposes
deletions = spark.read \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .table(f"{CATALOG}.{BRONZE_SCHEMA}.cdf_demo") \
    .filter(F.col("_change_type") == "delete")

print("All deleted records (for audit):")
display(deletions.select("user_id", "name", "_commit_version", "_commit_timestamp"))

### Example 6.4: CDF for Incremental ETL

**Objective:** Demonstrate how CDF enables efficient incremental processing in ETL pipelines

Instead of reprocessing entire tables, use CDF to process only changed rows:

In [None]:
# Simulate an incremental ETL pipeline
# First run: Process all data (startingVersion = 0)
# Subsequent runs: Process only changes since last processed version

# Store the last processed version (in practice, save this to a checkpoint table)
last_processed_version = 0

# Read incremental changes
incremental_changes = spark.read \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", last_processed_version) \
    .table(f"{CATALOG}.{BRONZE_SCHEMA}.cdf_demo")

# Apply transformations only to changed records
transformed = incremental_changes \
    .filter(F.col("_change_type").isin(["insert", "update_postimage"])) \
    .withColumn("processed_at", F.current_timestamp()) \
    .withColumn("email_domain", F.split(F.col("email"), "@")[1])

print("Incremental processing - only changed records:")
display(transformed.select("user_id", "name", "email_domain", "status", "_change_type", "processed_at"))

**Key Takeaways - CDF vs CDC:**

1. **CDC** captures changes FROM source systems INTO your lakehouse
2. **CDF** tracks changes WITHIN Delta Lake tables
3. Use CDC tools (Debezium, DMS) to ingest data into Bronze
4. Use CDF to build efficient incremental Silver and Gold layers
5. CDF eliminates need for expensive full-table scans in pipelines

---

## Summary

### What Has Been Achieved:

| Section | Key Learnings |
|---------|--------------|
| **Section 1** | Delta table creation, Schema Enforcement, Schema Evolution, Constraints |
| **Section 2** | Complete CRUD operations, MERGE INTO for upserts |
| **Section 3** | Metadata exploration with DESCRIBE DETAIL/HISTORY, Delta Log internals |
| **Section 4** | Time Travel queries, Disaster Recovery with RESTORE, VACUUM implications |
| **Section 5** | Small Files Problem, Partitioning, Z-ORDER, Liquid Clustering |
| **Section 6** | Change Data Feed vs CDC - understanding the difference |

### Key Takeaways:

1. **Delta Lake = Data Lake + ACID**: Combines Data Lake flexibility with transactional reliability
2. **Schema Evolution safely**: Additive changes are automatic, breaking changes require planning
3. **Time Travel + Copy-on-Write**: Every version is preserved, enabling rollback and audit
4. **VACUUM trade-off**: Storage optimization vs Time Travel capability
5. **Optimization matters**: Choose the right technique (Partitioning, Z-ORDER, Liquid Clustering)
6. **CDF ≠ CDC**: CDC ingests from sources, CDF tracks Delta Lake changes

### Quick Reference - Key Commands:

| Operation | SQL | PySpark |
|-----------|-----|---------|
| Create Delta Table | `CREATE TABLE USING DELTA` | `df.write.format("delta").saveAsTable()` |
| Time Travel | `SELECT * FROM table VERSION AS OF 1` | `.option("versionAsOf", 1)` |
| Restore | `RESTORE TABLE table TO VERSION AS OF 1` | N/A |
| MERGE | `MERGE INTO target USING source` | `DeltaTable.forName().merge()` |
| Optimize | `OPTIMIZE table` | N/A |
| Z-ORDER | `OPTIMIZE table ZORDER BY (col)` | N/A |
| VACUUM | `VACUUM table RETAIN X HOURS` | N/A |
| History | `DESCRIBE HISTORY table` | N/A |
| Enable CDF | `ALTER TABLE SET TBLPROPERTIES (delta.enableChangeDataFeed = true)` | N/A |
| Read CDF | N/A | `.option("readChangeFeed", "true")` |


---

## Resource Cleanup

Clean up resources created during the notebook:

In [None]:
# Optional test resource cleanup
# NOTE: Run only if you want to delete all created data

# Tables to clean up:
cleanup_tables = [
    "customers_delta",
    "orders_modern", 
    "time_travel_demo",
    "small_files_demo",
    "orders_partitioned",
    "sales_zorder_demo",
    "sales_liquid_clustering",
    "cdf_demo"
]

# Uncomment below to execute cleanup:
# for table in cleanup_tables:
#     spark.sql(f"DROP TABLE IF EXISTS {CATALOG}.{BRONZE_SCHEMA}.{table}")
#     print(f"Dropped: {table}")

# spark.sql("DROP VIEW IF EXISTS customer_updates")
# spark.catalog.clearCache()

# print("All resources cleaned up!")