Deltalake:
* Delta Lake is the optimized storage layer that provides the foundation for table

* Supported features

  - Schema enforcement and evolution
  - **Time travel (Data versoning)**
  - Data compaction (Optimize)
  - Unified Batch and Streaming Workloads
  - **Efficient Upserts and Deletes (MERGE operation)**
  - Scalability and Performance
  - Data Reliability and Checkpoints 
  - Compliance and Auditing 



Demo table:
- `customers` - Customer data with CDC history
- `products` - Product catalog
- `orders` - Order records
- `order_items` - Order line items
- `cdc_events` - Raw CDC events (audit log)

---
## 1. Setup and Configuration

In [4]:
# Configuration
DELTA_LAKE_PATH = "../deltalake"  # Delta Lake tables at project root (up one level from notebooks/)

# Available tables
TABLES = ["customers", "products", "orders", "order_items", "cdc_events"]

In [5]:
# Import libraries
import os

import pandas as pd
from deltalake import DeltaTable

# Display settings
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 50)
pd.set_option('display.width', None)

print("Libraries imported successfully!")
print(f"Delta Lake path: {os.path.abspath(DELTA_LAKE_PATH)}")

Libraries imported successfully!
Delta Lake path: /Users/anh.nguyen/Documents/poc/deltalake_poc/deltalake


In [6]:
# Helper functions
def get_table_path(table_name: str) -> str:
    """Get full path to Delta table."""
    return os.path.join(DELTA_LAKE_PATH, table_name)

def table_exists(table_name: str) -> bool:
    """Check if Delta table exists."""
    path = get_table_path(table_name)
    return os.path.exists(path) and os.path.exists(os.path.join(path, "_delta_log"))

def load_table(table_name: str, version: int = None) -> pd.DataFrame:
    """Load Delta table as pandas DataFrame."""
    path = get_table_path(table_name)
    if version is not None:
        dt = DeltaTable(path, version=version)
    else:
        dt = DeltaTable(path)
    return dt.to_pandas()

def get_history(table_name: str) -> list:
    """Get table history."""
    path = get_table_path(table_name)
    dt = DeltaTable(path)
    return dt.history()

def get_schema(table_name: str) -> dict:
    """Get table schema."""
    path = get_table_path(table_name)
    dt = DeltaTable(path)
    return dt.schema().to_pyarrow()

print("Helper functions defined!")

Helper functions defined!


---
## 2. Check Available Tables

In [50]:
# Check which tables exist
print("Available Delta Lake Tables:")
print("=" * 40)

for table in TABLES:
    exists = table_exists(table)
    status = "‚úÖ Available" if exists else "‚ùå Not found"
    
    if exists:
        try:
            df = load_table(table)
            row_count = len(df)
            history = get_history(table)
            version_count = len(history)
            print(f"{table:15} {status} ({row_count} rows, {version_count} versions)")
        except Exception as e:
            print(f"{table:15} {status} (error: {e})")
    else:
        print(f"{table:15} {status}")

Available Delta Lake Tables:
customers       ‚úÖ Available (error: Json error: whilst decoding field 'minValues': whilst decoding field 'created_at': failed to parse "+57947-06-07T01:15:05.000Z" as Timestamp(Microsecond, Some("UTC")): Parser error: Error parsing timestamp from '+57947-06-07T01:15:05.000Z': error parsing date)
products        ‚úÖ Available (error: Json error: whilst decoding field 'minValues': whilst decoding field 'created_at': failed to parse "+57947-06-07T01:15:06.000Z" as Timestamp(Microsecond, Some("UTC")): Parser error: Error parsing timestamp from '+57947-06-07T01:15:06.000Z': error parsing date)
orders          ‚úÖ Available (error: Json error: whilst decoding field 'minValues': whilst decoding field 'order_date': failed to parse "+57947-06-07T01:15:06.000Z" as Timestamp(Microsecond, Some("UTC")): Parser error: Error parsing timestamp from '+57947-06-07T01:15:06.000Z': error parsing date)
order_items     ‚úÖ Available (error: Json error: whilst decoding field 'm

---
## 3. Query Current Data (delta-rs)

**Method:** Python-native `deltalake` library (delta-rs)  
**Pros:** Fast, lightweight, no JVM overhead  
**Cons:** Limited to basic operations (basic write, read, time travel, history)

In [51]:
# Query customers table
if table_exists("customers"):
    df_customers = load_table("customers")
    print(f"Customers Table ({len(df_customers)} rows)")
    print("=" * 60)
    display(df_customers)
else:
    print("Customers table not found. Run the CDC pipeline first.")

Exception: Json error: whilst decoding field 'minValues': whilst decoding field 'created_at': failed to parse "+57947-06-07T01:15:05.000Z" as Timestamp(Microsecond, Some("UTC")): Parser error: Error parsing timestamp from '+57947-06-07T01:15:05.000Z': error parsing date

In [52]:
# Query products table
if table_exists("products"):
    df_products = load_table("products")
    print(f"Products Table ({len(df_products)} rows)")
    print("=" * 60)
    display(df_products)
else:
    print("Products table not found. Run the CDC pipeline first.")

Exception: Json error: whilst decoding field 'minValues': whilst decoding field 'created_at': failed to parse "+57947-06-07T01:15:06.000Z" as Timestamp(Microsecond, Some("UTC")): Parser error: Error parsing timestamp from '+57947-06-07T01:15:06.000Z': error parsing date

In [13]:
# Query orders table
if table_exists("orders"):
    df_orders = load_table("orders")
    print(f"Orders Table ({len(df_orders)} rows)")
    print("=" * 60)
    display(df_orders)
else:
    print("Orders table not found. Run the CDC pipeline first.")

Orders Table (3 rows)


Unnamed: 0,id,customer_id,order_date,status,total_amount,shipping_address,created_at,updated_at,__cdc_operation,__cdc_timestamp,__processed_at
0,3,3,1970-01-21 10:41:11+00:00,pending,259.97,"789 Pine Rd, Chicago, IL 60601",1970-01-21 10:41:11+00:00,1970-01-21 10:41:11+00:00,r,2025-12-23 06:29:50+00:00,2025-12-23 06:43:20+00:00
1,2,2,1970-01-21 10:41:11+00:00,shipped,79.99,"456 Oak Ave, Los Angeles, CA 90001",1970-01-21 10:41:11+00:00,1970-01-21 10:41:11+00:00,r,2025-12-23 06:29:50+00:00,2025-12-23 06:43:19+00:00
2,1,1,1970-01-21 10:41:11+00:00,completed,1349.98,"123 Main St, New York, NY 10001",1970-01-21 10:41:11+00:00,1970-01-21 10:41:11+00:00,r,2025-12-23 06:29:50+00:00,2025-12-23 06:43:17+00:00


In [16]:
# Detailed version analysis - see what changed in each version
TABLE_TO_ANALYZE = "customers"

if table_exists(TABLE_TO_ANALYZE):
    print(f"=== Detailed Version Analysis: {TABLE_TO_ANALYZE} ===\n")
    
    history = get_history(TABLE_TO_ANALYZE)
    
    print(f"Total versions: {len(history)}\n")
    print("Version Details:")
    print("=" * 100)
    
    for entry in history[:15]:  # Show last 15 versions
        version = entry.get('version', 'N/A')
        timestamp = entry.get('timestamp', 'N/A')
        operation = entry.get('operation', 'N/A')
        
        # Get operation metrics if available
        metrics = entry.get('operationMetrics', {})
        num_output_rows = metrics.get('numOutputRows', 'N/A')
        num_updated_rows = metrics.get('numTargetRowsUpdated', 'N/A')
        num_inserted_rows = metrics.get('numTargetRowsInserted', 'N/A')
        
        print(f"v{version:2} | {timestamp} | {operation:15} | Rows: out={num_output_rows}, updated={num_updated_rows}, inserted={num_inserted_rows}")
    
    print("\n" + "=" * 100)
    print("\nKey Metrics:")
    print("- 'numOutputRows': Total rows after operation")
    print("- 'numTargetRowsUpdated': Rows updated by MERGE")
    print("- 'numTargetRowsInserted': Rows inserted by MERGE")
else:
    print(f"Table '{TABLE_TO_ANALYZE}' not found.")

=== Detailed Version Analysis: customers ===

Total versions: 6

Version Details:
v 5 | 1766472191050 | MERGE           | Rows: out=1, updated=0, inserted=1
v 4 | 1766472189552 | MERGE           | Rows: out=1, updated=0, inserted=1
v 3 | 1766472187860 | MERGE           | Rows: out=1, updated=0, inserted=1
v 2 | 1766472186208 | MERGE           | Rows: out=1, updated=0, inserted=1
v 1 | 1766472184122 | MERGE           | Rows: out=1, updated=0, inserted=1
v 0 | 1766472180826 | CREATE TABLE    | Rows: out=N/A, updated=N/A, inserted=N/A


Key Metrics:
- 'numOutputRows': Total rows after operation
- 'numTargetRowsUpdated': Rows updated by MERGE
- 'numTargetRowsInserted': Rows inserted by MERGE


In [17]:
# Compare row count across versions to see when records were added
TABLE_TO_ANALYZE = "customers"

if table_exists(TABLE_TO_ANALYZE):
    print(f"=== Row Count Evolution: {TABLE_TO_ANALYZE} ===\n")
    
    history = get_history(TABLE_TO_ANALYZE)
    
    for entry in history[:15]:
        version = entry.get('version', 'N/A')
        
        try:
            df = load_table(TABLE_TO_ANALYZE, version=version)
            row_count = len(df)
            
            # Get operation info
            operation = entry.get('operation', 'N/A')
            timestamp = entry.get('timestamp', 'N/A')
            
            print(f"Version {version:2} | {operation:15} | {row_count} rows | {timestamp}")
        except Exception as e:
            print(f"Version {version:2} | Error: {e}")
    
    print("\nüí° Observation:")
    print("   - If row count doesn't change between versions, that MERGE was an UPDATE")
    print("   - If row count increases, that MERGE added a new record")
else:
    print(f"Table '{TABLE_TO_ANALYZE}' not found.")

=== Row Count Evolution: customers ===

Version  5 | MERGE           | 5 rows | 1766472191050
Version  4 | MERGE           | 4 rows | 1766472189552
Version  3 | MERGE           | 3 rows | 1766472187860
Version  2 | MERGE           | 2 rows | 1766472186208
Version  1 | MERGE           | 1 rows | 1766472184122
Version  0 | CREATE TABLE    | 0 rows | 1766472180826

üí° Observation:
   - If row count doesn't change between versions, that MERGE was an UPDATE
   - If row count increases, that MERGE added a new record


In [18]:
# Time travel: Query specific version
TABLE_TO_EXPLORE = "customers"
VERSION = 0 

if table_exists(TABLE_TO_EXPLORE):
    try:
        df_historical = load_table(TABLE_TO_EXPLORE, version=VERSION)
        print(f"'{TABLE_TO_EXPLORE}' at Version {VERSION}")
        print("=" * 60)
        display(df_historical)
    except Exception as e:
        print(f"Error loading version {VERSION}: {e}")
else:
    print(f"Table '{TABLE_TO_EXPLORE}' not found.")

'customers' at Version 0


Unnamed: 0,id,first_name,last_name,email,phone,created_at,updated_at,__cdc_operation,__cdc_timestamp,__processed_at


In [19]:
# Compare two versions
TABLE_TO_COMPARE = "customers"
VERSION_OLD = 0
VERSION_NEW = 4  # None = latest

if table_exists(TABLE_TO_COMPARE):
    try:
        df_old = load_table(TABLE_TO_COMPARE, version=VERSION_OLD)
        df_new = load_table(TABLE_TO_COMPARE, version=VERSION_NEW)
        
        print(f"Version {VERSION_OLD}: {len(df_old)} rows")
        print(f"Latest version: {len(df_new)} rows")
        
        # Show side by side if small enough
        if len(df_old) <= 10 and len(df_new) <= 10:
            print(f"\n------------------------------- Version {VERSION_OLD} -------------------------------")
            display(df_old)
            print("\n------------------------------- Latest -------------------------------")
            display(df_new)
    except Exception as e:
        print(f"Error comparing versions: {e}")
else:
    print(f"Table '{TABLE_TO_COMPARE}' not found.")

Version 0: 0 rows
Latest version: 4 rows

------------------------------- Version 0 -------------------------------


Unnamed: 0,id,first_name,last_name,email,phone,created_at,updated_at,__cdc_operation,__cdc_timestamp,__processed_at



------------------------------- Latest -------------------------------


Unnamed: 0,id,first_name,last_name,email,phone,created_at,updated_at,__cdc_operation,__cdc_timestamp,__processed_at
0,4,Alice,Williams,alice.williams@example.com,+1-555-0104,1970-01-21 10:41:11+00:00,1970-01-21 10:41:11+00:00,r,2025-12-23 06:29:50+00:00,2025-12-23 06:43:08+00:00
1,3,Bob,Johnson,bob.johnson@example.com,+1-555-0103,1970-01-21 10:41:11+00:00,1970-01-21 10:41:11+00:00,r,2025-12-23 06:29:50+00:00,2025-12-23 06:43:06+00:00
2,2,Jane,Smith,jane.smith@example.com,+1-555-0102,1970-01-21 10:41:11+00:00,1970-01-21 10:41:11+00:00,r,2025-12-23 06:29:50+00:00,2025-12-23 06:43:04+00:00
3,1,John,Doe,john.doe@example.com,+1-555-0101,1970-01-21 10:41:11+00:00,1970-01-21 10:41:11+00:00,r,2025-12-23 06:29:50+00:00,2025-12-23 06:43:00+00:00


In [21]:
# Analyze CDC events by source table
if table_exists("cdc_events"):
    df_events = load_table("cdc_events")
    
    print("CDC Events by Source Table")
    print("=" * 40)
    
    table_counts = df_events['source_table'].value_counts()
    for table, count in table_counts.items():
        print(f"{table:30} {count:5} events")
else:
    print("CDC events table not found.")

CDC Events by Source Table
public.products                    5 events
public.customers                   5 events
public.order_items                 4 events
public.orders                      3 events


In [22]:
# View recent CDC events
if table_exists("cdc_events"):
    df_events = load_table("cdc_events")
    
    print("Recent CDC Events (last 10)")
    print("=" * 60)
    
    # Sort by processed_at if available, otherwise by event_id
    if 'processed_at' in df_events.columns:
        df_recent = df_events.sort_values('processed_at', ascending=False).head(10)
    else:
        df_recent = df_events.tail(10)
    
    display(df_recent[['event_id', 'source_table', 'operation', 'record_id', 'event_timestamp']])
else:
    print("CDC events table not found.")

Recent CDC Events (last 10)


Unnamed: 0,event_id,source_table,operation,record_id,event_timestamp
0,cdc.public.products-0-4,public.products,r,5,2025-12-23 06:29:50.604000+00:00
1,cdc.public.products-0-3,public.products,r,4,2025-12-23 06:29:50.604000+00:00
2,cdc.public.products-0-2,public.products,r,3,2025-12-23 06:29:50.603000+00:00
3,cdc.public.products-0-1,public.products,r,2,2025-12-23 06:29:50.603000+00:00
4,cdc.public.products-0-0,public.products,r,1,2025-12-23 06:29:50.603000+00:00
5,cdc.public.orders-0-2,public.orders,r,3,2025-12-23 06:29:50.607000+00:00
6,cdc.public.orders-0-1,public.orders,r,2,2025-12-23 06:29:50.607000+00:00
11,cdc.public.orders-0-0,public.orders,r,1,2025-12-23 06:29:50.607000+00:00
9,cdc.public.order_items-0-3,public.order_items,r,4,2025-12-23 06:29:50.610000+00:00
16,cdc.public.order_items-0-2,public.order_items,r,3,2025-12-23 06:29:50.609000+00:00


---
## 4. PySpark SQL Queries

**Method:** PySpark with Delta Lake SQL  
**Pros:** Full SQL support, complex transformations, aggregations  
**Cons:** Requires JVM, heavier resource usage  
**Python 3.14 Note:** Use SQL-based operations, avoid `createDataFrame()` due to serialization issues

### Read

In [1]:
# Initialize PySpark with Delta Lake
USE_SPARK = True  # Set to True to enable PySpark

if USE_SPARK:
    from pyspark.sql import SparkSession
    from delta import configure_spark_with_delta_pip

    builder = (
        SparkSession.builder
        .appName("DeltaLakeNotebook")
        .master("local[*]")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    )
    spark = configure_spark_with_delta_pip(builder).getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    print("SparkSession created!")
    print(f"Spark version: {spark.version}")
else:
    print("PySpark disabled. Set USE_SPARK = True to enable.")

:: loading settings :: url = jar:file:/opt/anaconda3/envs/deltalake_poc/lib/python3.14/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/anh.nguyen/.ivy2/cache
The jars for the packages stored in: /Users/anh.nguyen/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9ef23fc6-f580-4e35-9a52-1938efe8b98b;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 97ms :: artifacts dl 3ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.0 from central in [default]
	io.delta#delta-storage;3.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3

SparkSession created!
Spark version: 3.5.0


In [7]:
# PySpark SQL: Query with SQL
if USE_SPARK and 'spark' in dir():
    # Load Delta tables
    customers_path = get_table_path("customers")
    
    if table_exists("customers"):
        # Method 1: Read as DataFrame
        df_spark = spark.read.format("delta").load(customers_path)
        df_spark.createOrReplaceTempView("customers")
        
        # Method 2: Run SQL query
        print("------------------------ PySpark SQL Query ------------------------ ")
        result = spark.sql("""
            SELECT *
            FROM customers
            ORDER BY id
        """)
        result.show()
        
        print(f"\nTotal customers: {result.count()}")
    else:
        print("Customers table not found.")
else:
    print("Spark not enabled or customers table not found.")

------------------------ PySpark SQL Query ------------------------ 


                                                                                

+---+----------+---------+--------------------+-----------+--------------------+--------------------+---------------+--------------------+--------------------+
| id|first_name|last_name|               email|      phone|          created_at|          updated_at|__cdc_operation|     __cdc_timestamp|      __processed_at|
+---+----------+---------+--------------------+-----------+--------------------+--------------------+---------------+--------------------+--------------------+
|  1|      Test|    User1|test.user@example...|+1-555-0101|+57949-09-15 11:3...|+57949-09-19 10:1...|              u|2025-12-24 11:06:...|2025-12-24 11:06:...|
|  2|      Jane|    Smith|jane.smith@exampl...|+1-555-0102|+57949-09-15 11:3...|+57949-09-15 11:3...|              r|2025-12-24 11:02:...|2025-12-24 11:05:...|
|  3|       Bob|  Johnson|bob.johnson@examp...|+1-555-0103|+57949-09-15 11:3...|+57949-09-15 11:3...|              r|2025-12-24 11:02:...|2025-12-24 11:05:...|
|  4|     Alice| Williams|alice.williams

In [8]:
# PySpark SQL: Time travel query
if USE_SPARK and 'spark' in dir():
    customers_path = get_table_path("customers")
    
    if table_exists("customers"):
        print("------------------------  PySpark Time Travel ------------------------\n")
        
        # Query version 0 (first snapshot)
        print("Version 0 (Initial Snapshot):")
        df_v0 = spark.read.format("delta").option("versionAsOf", 0).load(customers_path)
        df_v0.show()
        
        # Query latest version
        print("\nLatest Version:")
        df_latest = spark.read.format("delta").option("versionAsOf", 1).load(customers_path)
        df_latest.show()
        
        print(f"\nVersion 0 rows: {df_v0.count()}")
        print(f"Latest rows: {df_latest.count()}")
    else:
        print("Customers table not found.")
else:
    print("Spark not enabled or customers table not found.")

------------------------  PySpark Time Travel ------------------------

Version 0 (Initial Snapshot):
+---+----------+---------+--------------------+-----------+--------------------+--------------------+---------------+--------------------+--------------------+
| id|first_name|last_name|               email|      phone|          created_at|          updated_at|__cdc_operation|     __cdc_timestamp|      __processed_at|
+---+----------+---------+--------------------+-----------+--------------------+--------------------+---------------+--------------------+--------------------+
|  1|      John|      Doe|john.doe@example.com|+1-555-0101|+57949-09-15 11:3...|+57949-09-15 11:3...|              r|2025-12-24 11:02:...|2025-12-24 11:05:...|
|  2|      Jane|    Smith|jane.smith@exampl...|+1-555-0102|+57949-09-15 11:3...|+57949-09-15 11:3...|              r|2025-12-24 11:02:...|2025-12-24 11:05:...|
|  3|       Bob|  Johnson|bob.johnson@examp...|+1-555-0103|+57949-09-15 11:3...|+57949-09-15 11:3.

In [9]:
# PySpark SQL: View Delta table history (on actual CDC tables)
if USE_SPARK and 'spark' in dir() and table_exists("customers"):
    from delta import DeltaTable
    
    customers_path = get_table_path("customers")
    
    print("------------------------------- Delta Table History (Real CDC Data) -------------------------------\n")
    print("Showing version history for 'customers' table:\n")
    
    dt = DeltaTable.forPath(spark, customers_path)
    dt.history().select("version", "timestamp", "operation", "operationMetrics").show(truncate=False)
else:
    print("Spark not enabled or customers table not found.")

------------------------------- Delta Table History (Real CDC Data) -------------------------------

Showing version history for 'customers' table:

+-------+-----------------------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|timestamp              |operation|operationMetrics                                                                                                                    

In [10]:
# PySpark: Compare versions of customers table (Time Travel)
if USE_SPARK and 'spark' in dir() and table_exists("customers"):
    customers_path = get_table_path("customers")
    
    print("=== Time Travel: Compare Customer Versions ===\n")
    
    try:
        # Get version count
        from delta import DeltaTable
        dt = DeltaTable.forPath(spark, customers_path)
        history = dt.history().select("version").collect()
        max_version = max([row.version for row in history])
        
        print(f"Available versions: 0 to {max_version}\n")
        
        # Compare version 0 vs latest
        print("----------------------------- Version 0 (Initial) -----------------------------")
        df_v0 = spark.read.format("delta").option("versionAsOf", 0).load(customers_path)
        print(f"Row count: {df_v0.count()}")
        df_v0.show(5)
        
        print(f"\n----------------------------- Latest Version (v{max_version}) -----------------------------")
        df_v6 = spark.read.format("delta").option("versionAsOf", 6).load(customers_path)
        print(f"Row count: {df_v0.count()}")
        df_v6.show(5)
        
    except Exception as e:
        print(f"Error: {e}")
else:
    print("Spark not enabled or customers table not found.")

=== Time Travel: Compare Customer Versions ===

Available versions: 0 to 1

----------------------------- Version 0 (Initial) -----------------------------
Row count: 5
+---+----------+---------+--------------------+-----------+--------------------+--------------------+---------------+--------------------+--------------------+
| id|first_name|last_name|               email|      phone|          created_at|          updated_at|__cdc_operation|     __cdc_timestamp|      __processed_at|
+---+----------+---------+--------------------+-----------+--------------------+--------------------+---------------+--------------------+--------------------+
|  1|      John|      Doe|john.doe@example.com|+1-555-0101|+57949-09-15 11:3...|+57949-09-15 11:3...|              r|2025-12-24 11:02:...|2025-12-24 11:05:...|
|  2|      Jane|    Smith|jane.smith@exampl...|+1-555-0102|+57949-09-15 11:3...|+57949-09-15 11:3...|              r|2025-12-24 11:02:...|2025-12-24 11:05:...|
|  3|       Bob|  Johnson|bob.j

In [11]:
from delta import DeltaTable

customers_path = get_table_path("customers")

print("=== Time Travel by Timestamp: Version 5 vs Version 6 ===\n")

# Load Delta table
dt = DeltaTable.forPath(spark, customers_path)

# Get history with timestamps
history_df = (
    dt.history(10)
      .select("version", "timestamp", "operation")
      .orderBy("version")
)

# history_df.show(truncate=False)

# Extract timestamps for v5 and v6
history = history_df.collect()

ts_v5 = next(row.timestamp for row in history if row.version == 0)
ts_v6 = next(row.timestamp for row in history if row.version == 1)

print(f"Version 0 timestamp: {ts_v5}")
print(f"Version 1 timestamp: {ts_v6}\n")

# Read data AS OF version 0 (by timestamp)
df_v5 = (
    spark.read.format("delta")
    .option("timestampAsOf", ts_v5)
    .load(customers_path)
)

# Read data AS OF version 6 (by timestamp)
df_v6 = (
    spark.read.format("delta")
    .option("timestampAsOf", ts_v6)
    .load(customers_path)
)

# Compare
print("----------------------------- Version 0 -----------------------------")
print(f"Row count: {df_v5.count()}")
df_v5.show(5, truncate=False)

print("----------------------------- Version 1 -----------------------------")
print(f"Row count: {df_v6.count()}")
df_v6.show(5, truncate=False)


=== Time Travel by Timestamp: Version 5 vs Version 6 ===

Version 0 timestamp: 2025-12-24 11:05:52.398000
Version 1 timestamp: 2025-12-24 11:06:36.392000

----------------------------- Version 0 -----------------------------
Row count: 5
+---+----------+---------+--------------------------+-----------+---------------------+---------------------+---------------+-----------------------+-----------------------+
|id |first_name|last_name|email                     |phone      |created_at           |updated_at           |__cdc_operation|__cdc_timestamp        |__processed_at         |
+---+----------+---------+--------------------------+-----------+---------------------+---------------------+---------------+-----------------------+-----------------------+
|1  |John      |Doe      |john.doe@example.com      |+1-555-0101|+57949-09-15 11:39:04|+57949-09-15 11:39:04|r              |2025-12-24 11:02:20.804|2025-12-24 11:05:44.769|
|2  |Jane      |Smith    |jane.smith@example.com    |+1-555-0102|+

In [12]:
# Track all changes on a specific customer record using Change Data Feed
if USE_SPARK and 'spark' in dir() and table_exists("customers"):
    from delta import DeltaTable
    
    customers_path = get_table_path("customers")
    
    try:
        # Enable Change Data Feed on the table (if not already enabled)
        dt = DeltaTable.forPath(spark, customers_path)
        
        # all changes (insert, update, delete) for each record
        changes_df = (
            spark.read.format("delta")
            .option("readChangeFeed", "true")
            .option("startingVersion", 0)
            .load(customers_path)
        )
        
        # changes on a specific customer (ID = 1)
        customer_id = 1
        customer_changes = changes_df.filter(f"id = {customer_id}").orderBy("_commit_version")
        
        # Show key columns including change metadata
        customer_changes.select(
            "id", 
            "first_name", 
            "last_name", 
            "email",
            "_change_type",      # insert, update_preimage, update_postimage, delete
            "_commit_version",   # Delta version number
            "_commit_timestamp"  # When change occurred
        ).show(truncate=False)
        
        print(f"\nTotal changes tracked: {customer_changes.count()}")
        
    except Exception as e:
        print(f"Error: {e}")
else:
    print("Spark not enabled or customers table not found.")

+---+----------+---------+---------------------+----------------+---------------+-----------------------+
|id |first_name|last_name|email                |_change_type    |_commit_version|_commit_timestamp      |
+---+----------+---------+---------------------+----------------+---------------+-----------------------+
|1  |John      |Doe      |john.doe@example.com |insert          |0              |2025-12-24 11:05:52.398|
|1  |John      |Doe      |john.doe@example.com |update_preimage |1              |2025-12-24 11:06:36.392|
|1  |Test      |User1    |test.user@example.com|update_postimage|1              |2025-12-24 11:06:36.392|
+---+----------+---------+---------------------+----------------+---------------+-----------------------+


Total changes tracked: 3


In [13]:
if USE_SPARK and 'spark' in dir() and table_exists("customers"):
    
    customers_path = get_table_path("customers")
    
    try:
        changes_df = (
            spark.read.format("delta")
            .option("readChangeFeed", "true")
            .option("startingVersion", 0)
            .load(customers_path)
        )
        
        customer_id = 1
        
        final_states = changes_df.filter(
            f"id = {customer_id} AND _change_type IN ('insert', 'update_postimage', 'delete')"
        ).orderBy("_commit_version")
        
        print(f"Customer ID {customer_id} - Final States Only (cleaner view):")
        print("=" * 100)
        
        final_states.select(
            "_commit_version",
            "_change_type",
            "id", 
            "first_name", 
            "last_name", 
            "email",
            "_commit_timestamp"
        ).show(truncate=False)
        
        print(f"\n‚úì This shows only the 'after' state of each change")
        print(f"  Total state changes: {final_states.count()}")
        
    except Exception as e:
        print(f"Error: {e}")
else:
    print("Spark not enabled or customers table not found.")

Customer ID 1 - Final States Only (cleaner view):
+---------------+----------------+---+----------+---------+---------------------+-----------------------+
|_commit_version|_change_type    |id |first_name|last_name|email                |_commit_timestamp      |
+---------------+----------------+---+----------+---------+---------------------+-----------------------+
|0              |insert          |1  |John      |Doe      |john.doe@example.com |2025-12-24 11:05:52.398|
|1              |update_postimage|1  |Test      |User1    |test.user@example.com|2025-12-24 11:06:36.392|
+---------------+----------------+---+----------+---------+---------------------+-----------------------+


‚úì This shows only the 'after' state of each change
  Total state changes: 2


In [68]:
from delta import DeltaTable

dt = DeltaTable.forPath(spark, customers_path)
history_df = dt.history()

history_df.show(truncate=False)


+-------+-----------------------+------+--------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp   

In [69]:
# PySpark: Product analysis with order items
if USE_SPARK and 'spark' in dir() and table_exists("products") and table_exists("order_items"):
    products_path = get_table_path("products")
    items_path = get_table_path("order_items")
    
    df_products = spark.read.format("delta").load(products_path)
    df_items = spark.read.format("delta").load(items_path)
    
    df_products.createOrReplaceTempView("products")
    df_items.createOrReplaceTempView("order_items")
    
    print("----------------------------- Product Popularity: Most Ordered Items -----------------------------\n")
    
    spark.sql("""
        SELECT 
            p.name as product_name,
            p.category,
            COUNT(oi.id) as times_ordered,
            SUM(oi.quantity) as total_quantity_sold
        FROM products p
        LEFT JOIN order_items oi ON p.id = oi.product_id
        GROUP BY p.id, p.name, p.category
        ORDER BY total_quantity_sold DESC
        LIMIT 10
    """).show()
else:
    print("Spark not enabled or tables not found.")

----------------------------- Product Popularity: Most Ordered Items -----------------------------

+-------------------+-----------+-------------+-------------------+
|       product_name|   category|times_ordered|total_quantity_sold|
+-------------------+-----------+-------------+-------------------+
|Mechanical Keyboard|Electronics|            1|                  2|
|          USB-C Hub|Electronics|            1|                  1|
|         Laptop Pro|Electronics|            1|                  1|
|     Wireless Mouse|Electronics|            1|                  1|
|      Monitor Stand|     Office|            0|               NULL|
|       Test Product|       Test|            0|               NULL|
+-------------------+-----------+-------------+-------------------+



In [39]:
# PySpark: Analyze order details
if USE_SPARK and 'spark' in dir() and table_exists("orders") and table_exists("order_items"):
    orders_path = get_table_path("orders")
    items_path = get_table_path("order_items")
    
    df_orders = spark.read.format("delta").load(orders_path)
    df_items = spark.read.format("delta").load(items_path)
    
    df_orders.createOrReplaceTempView("orders")
    df_items.createOrReplaceTempView("order_items")
    
    print("----------------------------- Order Analysis: Items per Order -----------------------------\n")
    
    spark.sql("""
        SELECT 
            o.id as order_id,
            o.customer_id,
            o.order_date,
            COUNT(oi.id) as item_count,
            SUM(oi.quantity) as total_quantity
        FROM orders o
        LEFT JOIN order_items oi ON o.id = oi.order_id
        GROUP BY o.id, o.customer_id, o.order_date
        ORDER BY total_quantity DESC
        LIMIT 10
    """).show()
else:
    print("Spark not enabled or tables not found.")

----------------------------- Order Analysis: Items per Order -----------------------------

+--------+-----------+-------------------+----------+--------------+
|order_id|customer_id|         order_date|item_count|total_quantity|
+--------+-----------+-------------------+----------+--------------+
|       1|          1|1970-01-21 18:41:11|         2|             2|
|       3|          3|1970-01-21 18:41:11|         1|             2|
|       2|          2|1970-01-21 18:41:11|         1|             1|
+--------+-----------+-------------------+----------+--------------+



In [40]:
# PySpark: Join customers with orders
if USE_SPARK and 'spark' in dir() and table_exists("customers") and table_exists("orders"):
    # Load both tables
    customers_path = get_table_path("customers")
    orders_path = get_table_path("orders")
    
    df_customers = spark.read.format("delta").load(customers_path)
    df_orders = spark.read.format("delta").load(orders_path)
    
    df_customers.createOrReplaceTempView("customers")
    df_orders.createOrReplaceTempView("orders")
    
    print("=== JOIN: Customers with Their Orders ===\n")
    
    spark.sql("""
        SELECT 
            c.id as customer_id,
            c.first_name,
            c.last_name,
            c.email,
            COUNT(o.id) as total_orders
        FROM customers c
        LEFT JOIN orders o ON c.id = o.customer_id
        GROUP BY c.id, c.first_name, c.last_name, c.email
        ORDER BY total_orders DESC
    """).show()
else:
    print("Spark not enabled or tables not found.")

=== JOIN: Customers with Their Orders ===

+-----------+----------+---------+--------------------+------------+
|customer_id|first_name|last_name|               email|total_orders|
+-----------+----------+---------+--------------------+------------+
|          3|       Bob|  Johnson|bob.johnson@examp...|           1|
|          2|      Jane|    Smith|jane.smith@exampl...|           1|
|          1|      John|      Doe|john.doe@example.com|           1|
|          4|     Alice| Williams|alice.williams@ex...|           0|
|          5|   Charlie|    Brown|charlie.brown@exa...|           0|
+-----------+----------+---------+--------------------+------------+



### Write

In [None]:
# Demo: Create a new Delta table

demo_table_path = "/Users/anh.nguyen/Documents/poc/deltalake_poc/deltalake/demo_employees"
demo_table_name = "demo_employees"

if USE_SPARK and 'spark' in dir():
    
    try:
        # Drop table if exist
        spark.sql(f"""
                DROP TABLE IF EXISTS {demo_table_name}
                  """)    

        # 1: Create new Delta table
        print("Creating new Delta table...")
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {demo_table_name} (
                id BIGINT NOT NULL,
                name STRING NOT NULL,
                department STRING,
                salary DECIMAL(10, 2),
                hire_date DATE,
                is_active BOOLEAN
            ) USING DELTA
            LOCATION '{demo_table_path}'
        """)
        print("Table created!\n")
        
    except Exception as e:
        print(f"Error: {e}")
else:
    print("Spark not enabled. Set USE_SPARK = True in Section 6.")

=== Demo: Create Table and Insert Records ===

Step 1: Creating new Delta table...
Table created!



In [None]:
# Demo: Insert records using PySpark SQL
if USE_SPARK and 'spark' in dir():
        
    try:        
        # 2: Insert initial records
        print("Inserting initial records...")
        spark.sql(f"""
            INSERT INTO {demo_table_name} VALUES
            (1, 'Alice Johnson', 'Engineering', 95000.00, DATE '2020-01-15', true),
            (2, 'Bob Smith', 'Marketing', 75000.00, DATE '2021-03-20', true),
            (3, 'Charlie Brown', 'Engineering', 88000.00, DATE '2019-07-10', true)
        """)
        print("records inserted!\n")
        
    except Exception as e:
        print(f"Error: {e}")
else:
    print("Spark not enabled. Set USE_SPARK = True in Section 6.")

2: Inserting initial records...
3 records inserted!



In [None]:
if USE_SPARK and 'spark' in dir():

    try:                
        # 3: View the data
        print("Viewing inserted data:")
        spark.sql(f"SELECT * FROM {demo_table_name}").show()
        
    except Exception as e:
        print(f"Error: {e}")
else:
    print("Spark not enabled. Set USE_SPARK = True in Section 6.")

Step 3: Viewing inserted data:
+---+-------------+------------+---------+----------+---------+
| id|         name|  department|   salary| hire_date|is_active|
+---+-------------+------------+---------+----------+---------+
|  3|Charlie Brown| Engineering| 88000.00|2019-07-10|     true|
|  1|Alice Johnson| Engineering|105000.00|2020-01-15|     true|
|  1|Alice Johnson| Engineering| 95000.00|2020-01-15|     true|
|  5| Eve Anderson| Engineering| 92000.00|2021-11-08|     true|
|  3|Charlie Brown|Data Science| 98000.00|2019-07-10|     true|
|  6| Frank Miller|       Sales| 78000.00|2023-01-15|     true|
|  2|    Bob Smith|   Marketing| 75000.00|2021-03-20|     true|
|  2|    Bob Smith|   Marketing| 75000.00|2021-03-20|    false|
|  4| Diana Prince|       Sales| 82000.00|2022-05-12|     true|
+---+-------------+------------+---------+----------+---------+



In [103]:
if USE_SPARK and 'spark' in dir():
    
    try:        
        # 4: Insert more records
        print("\n4: Inserting additional records...")
        spark.sql(f"""
            INSERT INTO {demo_table_name} VALUES
            (4, 'Diana Prince', 'Sales', 82000.00, DATE '2022-05-12', true),
            (5, 'Eve Anderson', 'Engineering', 92000.00, DATE '2021-11-08', true)
        """)
        print("2 more records inserted!\n")
        
    except Exception as e:
        print(f"Error: {e}")
else:
    print("Spark not enabled. Set USE_SPARK = True in Section 6.")


4: Inserting additional records...
2 more records inserted!



In [104]:
if USE_SPARK and 'spark' in dir():
    
    try:        
        # 5: View updated data
        print("Step 5: Viewing all records:")
        result = spark.sql(f"SELECT * FROM {demo_table_name} ORDER BY id")
        print(f"Total records: {result.count()}")
        result.show()

    except Exception as e:
        print(f"Error: {e}")
else:
    print("Spark not enabled. Set USE_SPARK = True in Section 6.")

Step 5: Viewing all records:
Total records: 5
+---+-------------+-----------+--------+----------+---------+
| id|         name| department|  salary| hire_date|is_active|
+---+-------------+-----------+--------+----------+---------+
|  1|Alice Johnson|Engineering|95000.00|2020-01-15|     true|
|  2|    Bob Smith|  Marketing|75000.00|2021-03-20|     true|
|  3|Charlie Brown|Engineering|88000.00|2019-07-10|     true|
|  4| Diana Prince|      Sales|82000.00|2022-05-12|     true|
|  5| Eve Anderson|Engineering|92000.00|2021-11-08|     true|
+---+-------------+-----------+--------+----------+---------+



In [105]:
if USE_SPARK and 'spark' in dir():
    
    try:        
        # 6: Perform aggregation
        print("\nStep 6: Aggregation - Average salary by department:")
        spark.sql(f"""
            SELECT 
                department,
                COUNT(*) as employee_count,
                AVG(salary) as avg_salary,
                MIN(hire_date) as earliest_hire
            FROM {demo_table_name}
            WHERE is_active = true
            GROUP BY department
            ORDER BY avg_salary DESC
        """).show()
        
        print("\nDemo complete! Table created at:", demo_table_path)
        
    except Exception as e:
        print(f"Error: {e}")
else:
    print("Spark not enabled. Set USE_SPARK = True in Section 6.")


Step 6: Aggregation - Average salary by department:
+-----------+--------------+------------+-------------+
| department|employee_count|  avg_salary|earliest_hire|
+-----------+--------------+------------+-------------+
|Engineering|             3|91666.666667|   2019-07-10|
|      Sales|             1|82000.000000|   2022-05-12|
|  Marketing|             1|75000.000000|   2021-03-20|
+-----------+--------------+------------+-------------+


Demo complete! Table created at: /Users/anh.nguyen/Documents/poc/deltalake_poc/deltalake/demo_employees


In [None]:
# Demo: MERGE (Upsert) operation
if USE_SPARK and 'spark' in dir():

    if os.path.exists(demo_table_path):
        print("=== Demo: MERGE (Upsert) Operation ===\n")
        
        try:
            # Load table
            df = spark.read.format("delta").load(demo_table_path)
            df.createOrReplaceTempView(demo_table_name)
            
            print("1: Performing MERGE operation...")
            print("  - Update Charlie's department and salary")
            print("  - Insert new employee Frank\n")
            
            spark.sql(f"""
                MERGE INTO {demo_table_name} AS target
                USING (
                    SELECT 3 as id, 'Charlie Brown' as name, 'Data Science' as department, 
                           98000.00 as salary, DATE '2019-07-10' as hire_date, true as is_active
                    UNION ALL
                    SELECT 6 as id, 'Frank Miller' as name, 'Sales' as department,
                           78000.00 as salary, DATE '2023-01-15' as hire_date, true as is_active
                ) AS source
                ON target.id = source.id
                WHEN MATCHED THEN UPDATE SET
                    target.department = source.department,
                    target.salary = source.salary
                WHEN NOT MATCHED THEN INSERT (id, name, department, salary, hire_date, is_active)
                    VALUES (source.id, source.name, source.department, source.salary, source.hire_date, source.is_active)
            """)
            
            # View updated data
            print("Updated table:")
            df_merged = spark.read.format("delta").load(demo_table_path)
            df_merged.orderBy("id").show()
            
            print(f"\nTotal employees: {df_merged.count()}")
            
        except Exception as e:
            print(f"Error: {e}")
    else:
        print("Demo table not found. Run the 'Create Table' demo first.")
else:
    print("Spark not enabled.")

=== Demo: MERGE (Upsert) Operation ===

Step 1: Performing MERGE operation...
  - Update Charlie's department and salary
  - Insert new employee Frank

‚úÖ MERGE complete!

üìù Changes:
   - Charlie: department 'Engineering' ‚Üí 'Data Science', salary $88,000 ‚Üí $98,000
   - Frank: NEW employee added

Updated table:
+---+-------------+------------+--------+----------+---------+
| id|         name|  department|  salary| hire_date|is_active|
+---+-------------+------------+--------+----------+---------+
|  1|Alice Johnson| Engineering|95000.00|2020-01-15|     true|
|  2|    Bob Smith|   Marketing|75000.00|2021-03-20|     true|
|  3|Charlie Brown|Data Science|98000.00|2019-07-10|     true|
|  4| Diana Prince|       Sales|82000.00|2022-05-12|     true|
|  5| Eve Anderson| Engineering|92000.00|2021-11-08|     true|
|  6| Frank Miller|       Sales|78000.00|2023-01-15|     true|
+---+-------------+------------+--------+----------+---------+


Total employees: 6


In [None]:
# Spark: Stop session when done
if USE_SPARK and 'spark' in dir():
    # Uncomment to stop Spark session
    # spark.stop()
    # print("SparkSession stopped.")
    pass

---
## 5. MinIO/S3 Delta Lake

**Method:** Query Delta Lake tables stored in MinIO (S3-compatible object storage)  
**Use Case:** RisingWave CDC pipeline sinks data to Delta Lake in MinIO  
**Libraries:** DuckDB (recommended), delta-rs with S3 storage options

> **Note:** This section requires MinIO running with RisingWave pipeline.  

In [15]:
# MinIO/S3 Configuration
MINIO_ENDPOINT = "http://localhost:9000"
MINIO_ACCESS_KEY = "admin"
MINIO_SECRET_KEY = "password"
MINIO_BUCKET = "deltalake"
MINIO_REGION = "us-east-1"

# Storage options for delta-rs
S3_STORAGE_OPTIONS = {
    "AWS_ENDPOINT_URL": MINIO_ENDPOINT,
    "AWS_ACCESS_KEY_ID": MINIO_ACCESS_KEY,
    "AWS_SECRET_ACCESS_KEY": MINIO_SECRET_KEY,
    "AWS_REGION": MINIO_REGION,
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
    "AWS_ALLOW_HTTP": "true",
}

# Available tables in MinIO (from RisingWave pipeline)
S3_TABLES = ["customers", "products", "orders", "order_items", 
             "order_analytics", "customer_order_summary", "product_inventory"]

print("MinIO Configuration:")
print(f"  Endpoint: {MINIO_ENDPOINT}")
print(f"  Bucket: {MINIO_BUCKET}")
print(f"  Tables: {len(S3_TABLES)}")

MinIO Configuration:
  Endpoint: http://localhost:9000
  Bucket: deltalake
  Tables: 7


### 5.1 Read with DuckDB (Recommended)

DuckDB provides excellent compatibility for reading Delta Lake parquet files from S3/MinIO.

In [91]:
import duckdb
# Configure DuckDB for MinIO/S3 access

# Create connection with S3
duck_conn = duckdb.connect()
    
# Install and load httpfs extension for S3 access
duck_conn.execute("INSTALL httpfs; LOAD httpfs;")
    
# Configure S3 settings for MinIO
duck_conn.execute(f"""
    SET s3_endpoint = 'localhost:9000';
    SET s3_access_key_id = '{MINIO_ACCESS_KEY}';
    SET s3_secret_access_key = '{MINIO_SECRET_KEY}';
    SET s3_use_ssl = false;
    SET s3_url_style = 'path';
""")
    
print("DuckDB configured for MinIO access")

DuckDB configured for MinIO access


In [None]:
def s3_query(table_name: str, sql: str = None) -> "duckdb.DuckDBPyRelation":
    """Query a Delta Lake table from MinIO using DuckDB.
    
    Args:
        table_name: Name of the table in MinIO
        sql: Optional SQL query (uses {table} placeholder)
    
    Returns:
        DuckDB relation (use .df() to convert to pandas)
    """
    parquet_path = f"s3://{MINIO_BUCKET}/{table_name}/*.parquet"
    
    if sql:
        # Replace {table} with actual parquet path
        full_sql = sql.replace("{table}", f"read_parquet('{parquet_path}')")
        return duck_conn.execute(full_sql)
    else:
        return duck_conn.execute(f"SELECT * FROM read_parquet('{parquet_path}')")


def s3_table_info(table_name: str):
    """Get table info (row count, columns) from MinIO."""
    try:
        result = s3_query(table_name)
        df = result.df()
        print(f"Table: {table_name}")
        print(f"  Rows: {len(df)}")
        print(f"  Columns: {list(df.columns)}")
        return df
    except Exception as e:
        print(f"Error reading {table_name}: {e}")
        return None

Query functions defined: s3_query(), s3_table_info()


In [93]:
# List all Delta Lake tables in MinIO
print("Delta Lake Tables in MinIO")
print("-" * 50)
    
for table in S3_TABLES:
    try:
        df = s3_query(table).df()
        print(f"{table}: {len(df)} rows, {len(df.columns)} columns")
    except Exception as e:
        print(f"{table}: {str(e)[:50]}")

Delta Lake Tables in MinIO
--------------------------------------------------
customers: IO Error: No files found that match the pattern "s
products: 12 rows, 8 columns
orders: 10 rows, 7 columns
order_items: IO Error: No files found that match the pattern "s
order_analytics: 4 rows, 5 columns
customer_order_summary: 4 rows, 6 columns
product_inventory: 2 rows, 6 columns


In [94]:
# Query customers table
df = s3_query("customers").df()
display(df.head(10))

Unnamed: 0,id,first_name,last_name,email,phone,created_at,updated_at
0,2,Jane,Smith,jane.smith@example.com,+1-555-0102,2025-12-24 10:37:50.012000+07:00,2025-12-24 10:37:50.012000+07:00
1,1,John,Doe,john.doe@example.com,+1-555-0101,2025-12-24 10:37:50.012000+07:00,2025-12-24 10:37:50.012000+07:00
2,4,Alice,Williams,alice.williams@example.com,+1-555-0104,2025-12-24 10:37:50.012000+07:00,2025-12-24 10:37:50.012000+07:00
3,5,Charlie,Brown,charlie.brown@example.com,+1-555-0105,2025-12-24 10:37:50.012000+07:00,2025-12-24 10:37:50.012000+07:00
4,3,Bob,Johnson,bob.johnson@example.com,+1-555-0103,2025-12-24 10:37:50.012000+07:00,2025-12-24 10:37:50.012000+07:00


In [95]:
# Custom SQL query on MinIO data

result = s3_query("products", """
    SELECT id, name, category, price, stock_quantity
    FROM {table}
    ORDER BY price DESC
    LIMIT 10
""")
display(result.df())

Unnamed: 0,id,name,category,price,stock_quantity
0,6,Laptop Pro,Electronics,1299.99,50
1,1,Laptop Pro,Electronics,1299.99,50
2,1,Laptop Pro,Electronics,1299.99,50
3,4,Mechanical Keyboard,Electronics,129.99,100
4,4,Mechanical Keyboard,Electronics,129.99,100
5,3,USB-C Hub,Electronics,79.99,150
6,3,USB-C Hub,Electronics,79.99,150
7,5,Monitor Stand,Office,59.99,75
8,5,Monitor Stand,Office,59.99,75
9,2,Wireless Mouse,Electronics,49.99,200


In [96]:
# Query analytics materialized view from RisingWave
try:
    result = s3_query("order_analytics", """
        SELECT 
            order_day,
            status,
            order_count,
            total_revenue,
            avg_order_value
        FROM {table}
        ORDER BY order_day DESC
        LIMIT 10
        """)
    display(result.df())
except Exception as e:
    print(f"No data yet: {e}")

Unnamed: 0,order_day,status,order_count,total_revenue,avg_order_value
0,NaT,completed,2,2699.96,1349.98
1,NaT,pending,1,259.97,259.97
2,NaT,shipped,1,79.99,79.99
3,NaT,completed,1,1349.98,1349.98


### 5.2 Read with delta-rs

delta-rs can also read from S3/MinIO with proper storage options.

In [None]:
# Read Delta Lake table from MinIO using delta-rs
from deltalake import DeltaTable

def load_s3_table(table_name: str, version: int = None):
    """Load a Delta Lake table from MinIO using delta-rs.
    
    Args:
        table_name: Name of the table in MinIO
        version: Optional version number for time travel
    
    Returns:
        pandas DataFrame
    """
    table_path = f"s3://{MINIO_BUCKET}/{table_name}"
    
    try:
        if version is not None:
            dt = DeltaTable(table_path, storage_options=S3_STORAGE_OPTIONS, version=version)
        else:
            dt = DeltaTable(table_path, storage_options=S3_STORAGE_OPTIONS)
        
        return dt.to_pandas()
    except Exception as e:
        print(f"Error loading {table_name}: {e}")
        return None


def s3_table_history(table_name: str):
    """Get version history of a Delta Lake table from MinIO."""
    table_path = f"s3://{MINIO_BUCKET}/{table_name}"
    
    try:
        dt = DeltaTable(table_path, storage_options=S3_STORAGE_OPTIONS)
        return dt.history()
    except Exception as e:
        print(f"Error getting history for {table_name}: {e}")
        return None

delta-rs S3 functions defined: load_s3_table(), s3_table_history()


In [98]:
# Read customers table with delta-rs
df = load_s3_table("customers")
if df is not None:
    display(df.head(10))

Unnamed: 0,id,first_name,last_name,email,phone,created_at,updated_at
0,3,Bob,Johnson,bob.johnson@example.com,+1-555-0103,2025-12-24 03:37:50.012000+00:00,2025-12-24 03:37:50.012000+00:00
1,1,John,Doe,john.doe@example.com,+1-555-0101,2025-12-24 03:37:50.012000+00:00,2025-12-24 03:37:50.012000+00:00
2,4,Alice,Williams,alice.williams@example.com,+1-555-0104,2025-12-24 03:37:50.012000+00:00,2025-12-24 03:37:50.012000+00:00
3,5,Charlie,Brown,charlie.brown@example.com,+1-555-0105,2025-12-24 03:37:50.012000+00:00,2025-12-24 03:37:50.012000+00:00
4,2,Jane,Smith,jane.smith@example.com,+1-555-0102,2025-12-24 03:37:50.012000+00:00,2025-12-24 03:37:50.012000+00:00


In [99]:
# View table version history
history = s3_table_history("customers")
if history:
    for h in history[:5]:  # Show last 5 versions
        print(f"Version {h.get('version')}: {h.get('timestamp')} - {h.get('operation')}")

Version 1: 1766547674147 - WRITE
Version 0: 1766547541257 - CREATE TABLE


In [None]:
# Compare two versions of a table in MinIO (Time Travel)
TABLE_TO_COMPARE = "customers"
VERSION_OLD = 0
VERSION_NEW = None  # None = latest

try:
    # Load old version
    df_old = load_s3_table(TABLE_TO_COMPARE, version=VERSION_OLD)
    
    # Load latest version (or specific version)
    df_new = load_s3_table(TABLE_TO_COMPARE, version=VERSION_NEW)
    
    if df_old is not None and df_new is not None:
        print(f"Version {VERSION_OLD}: {len(df_old)} rows")
        print(f"{'Latest' if VERSION_NEW is None else f'Version {VERSION_NEW}'}: {len(df_new)} rows")
        print(f"Row difference: {len(df_new) - len(df_old):+d}\n")
        
        # Show version 0
        print(f"--- Version {VERSION_OLD} ---")
        display(df_old.head(5))
        
        # Show latest
        print(f"\n--- {'Latest' if VERSION_NEW is None else f'Version {VERSION_NEW}'} ---")
        display(df_new.head(5))
        
        # Find new records (IDs in new but not in old)
        if 'id' in df_old.columns and 'id' in df_new.columns:
            new_ids = set(df_new['id']) - set(df_old['id'])
            if new_ids:
                print(f"\n‚ú® New records added (IDs): {sorted(new_ids)}")
            
            deleted_ids = set(df_old['id']) - set(df_new['id'])
            if deleted_ids:
                print(f"üóëÔ∏è  Records removed (IDs): {sorted(deleted_ids)}")
except Exception as e:
    print(f"Error comparing versions: {e}")

Version 0: 0 rows
Latest: 5 rows
Row difference: +5

--- Version 0 ---


Unnamed: 0,id,first_name,last_name,email,phone,created_at,updated_at



--- Latest ---


Unnamed: 0,id,first_name,last_name,email,phone,created_at,updated_at
0,3,Bob,Johnson,bob.johnson@example.com,+1-555-0103,2025-12-24 03:37:50.012000+00:00,2025-12-24 03:37:50.012000+00:00
1,1,John,Doe,john.doe@example.com,+1-555-0101,2025-12-24 03:37:50.012000+00:00,2025-12-24 03:37:50.012000+00:00
2,4,Alice,Williams,alice.williams@example.com,+1-555-0104,2025-12-24 03:37:50.012000+00:00,2025-12-24 03:37:50.012000+00:00
3,5,Charlie,Brown,charlie.brown@example.com,+1-555-0105,2025-12-24 03:37:50.012000+00:00,2025-12-24 03:37:50.012000+00:00
4,2,Jane,Smith,jane.smith@example.com,+1-555-0102,2025-12-24 03:37:50.012000+00:00,2025-12-24 03:37:50.012000+00:00



‚ú® New records added (IDs): [1, 2, 3, 4, 5]


In [106]:
# Track history changes using Change Data Feed (CDF)
import polars as pl
from deltalake import DeltaTable

TABLE_NAME = "customers"
RECORD_ID = 1

table_path = f"s3://{MINIO_BUCKET}/{TABLE_NAME}"
dt = DeltaTable(table_path, storage_options=S3_STORAGE_OPTIONS)

arrow_reader = dt.load_cdf(
    starting_version=0,
    ending_version=None,  # up to latest
)
cdf = pl.from_arrow(arrow_reader.read_all())

# Filter for a specific record
record_history = (
    cdf
    .filter(pl.col("id") == RECORD_ID)
    .sort("_commit_version")  # or "_commit_timestamp"
)

print(record_history)


shape: (2, 10)
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ id  ‚îÜ first_name ‚îÜ last_name ‚îÜ email      ‚îÜ ‚Ä¶ ‚îÜ updated_at ‚îÜ _change_ty ‚îÜ _commit_ve ‚îÜ _commit_t ‚îÇ
‚îÇ --- ‚îÜ ---        ‚îÜ ---       ‚îÜ ---        ‚îÜ   ‚îÜ ---        ‚îÜ pe         ‚îÜ rsion      ‚îÜ imestamp  ‚îÇ
‚îÇ i32 ‚îÜ str        ‚îÜ str       ‚îÜ str        ‚îÜ   ‚îÜ datetime[Œº ‚îÜ ---        ‚îÜ ---        ‚îÜ ---       ‚îÇ
‚îÇ     ‚îÜ            ‚îÜ           ‚îÜ            ‚îÜ   ‚îÜ s, UTC]    ‚îÜ str        ‚îÜ i64        ‚îÜ datetime[ ‚îÇ
‚îÇ     ‚îÜ            ‚îÜ           ‚îÜ            ‚îÜ   ‚îÜ            ‚îÜ            ‚îÜ            ‚îÜ ms]       ‚îÇ
‚ïû‚ïê‚ïê‚ïê‚ïê‚ïê‚ï™‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ï™‚ïê‚ïê‚ïê‚ïê‚ïê‚

### 5.3 Write to MinIO Delta Lake

Write data to Delta Lake tables in MinIO using delta-rs.

In [None]:
from deltalake import write_deltalake
import pyarrow as pa

def write_s3_table(table_name: str, df, mode: str = "append"):
    """Write a pandas DataFrame to Delta Lake in MinIO.
    
    Args:
        table_name: Name of the table in MinIO
        df: pandas DataFrame to write
        mode: 'append' or 'overwrite'
    
    Returns:
        bool: True if successful
    """
    table_path = f"s3://{MINIO_BUCKET}/{table_name}"
    
    try:
        write_deltalake(
            table_path,
            df,
            mode=mode,
            storage_options=S3_STORAGE_OPTIONS,
        )
        print(f"Wrote {len(df)} rows to {table_name} (mode={mode})")
        return True
    except Exception as e:
        print(f"Error writing to {table_name}: {e}")
        return False

Write function defined: write_s3_table()


In [83]:
# Example: Write a new test table to MinIO

import pandas as pd
from datetime import datetime

# Create sample data
test_data = pd.DataFrame({
    "id": [1, 2, 3],
    "name": ["Test A", "Test B", "Test C"],
    "value": [100.50, 200.75, 300.25],
    "created_at": [datetime.now()] * 3
})

# Write to new table
write_s3_table("test_table", test_data, mode="overwrite")
    
# Verify
df = load_s3_table("test_table")
display(df)

Wrote 3 rows to test_table (mode=overwrite)


Unnamed: 0,id,name,value,created_at
0,1,Test A,100.5,2025-12-24 09:50:44.510982
1,2,Test B,200.75,2025-12-24 09:50:44.510982
2,3,Test C,300.25,2025-12-24 09:50:44.510982


In [None]:
# Close DuckDB connection when done
duck_conn.close()
print("DuckDB connection closed")

### 5.5 Query Delta Lake with Apache Spark



In [None]:
from pyspark.sql import SparkSession

# Create Spark session with Delta Lake support
spark = (SparkSession.builder
    .appName("DeltaLake-MinIO-Query")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    # S3/MinIO configuration
    .config("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
    .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY)
    .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY)
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    # Delta Lake packages
    .config("spark.jars.packages", 
            "io.delta:delta-spark_2.12:3.2.0,"
            "org.apache.hadoop:hadoop-aws:3.3.4,"
            "com.amazonaws:aws-java-sdk-bundle:1.12.262")
    .getOrCreate()
)

print("Spark session created successfully")
print(f"Spark version: {spark.version}")

Spark session created successfully
Spark version: 3.5.0


In [None]:
# Query Delta tables from MinIO using Spark SQL
TABLE_NAME = "customers"
table_path = f"s3a://{MINIO_BUCKET}/{TABLE_NAME}"

try:
    # Read Delta table
    df = spark.read.format("delta").load(table_path)
    
    print(f"=== {TABLE_NAME} table via Spark ===")
    print(f"Total rows: {df.count()}")
    print(f"Schema:")
    df.printSchema()
    
    # Run SQL query
    df.createOrReplaceTempView(TABLE_NAME)
    result = spark.sql(f"""
        SELECT id, first_name, last_name, email 
        FROM {TABLE_NAME} 
        ORDER BY id 
        LIMIT 10
    """)
    result.show(truncate=False)
    
except Exception as e:
    print(f"Error querying with Spark: {e}")

Error querying with Spark: An error occurred while calling o143.load.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.delta.DeltaTableUtils$.findDeltaTableRoot(DeltaTable.scala:191)
	at org.apache.spark.sql.delta.sources.DeltaDataSource$.parsePathIdentifier(DeltaDataSource.scala:354)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.x$1$lzyc

In [None]:
# Read Change Data Feed using Spark (if CDF is enabled)
try:
    cdf_df = (spark.read.format("delta")
        .option("readChangeFeed", "true")
        .option("startingVersion", 0)
        .load(table_path)
    )
    print(f"Total change records: {cdf_df.count()}")
    
    # Show changes with metadata columns
    cdf_df.select(
        "id", "first_name", "last_name",
        "_change_type", "_commit_version", "_commit_timestamp"
    ).orderBy("_commit_version", "id").show(20, truncate=False)
    
    # Summary by change type
    print("\nChanges by type:")
    cdf_df.groupBy("_change_type").count().show()
    
except Exception as e:
    print(f"CDF not available: {e}")

CDF not available: An error occurred while calling o150.load.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.delta.DeltaTableUtils$.findDeltaTableRoot(DeltaTable.scala:191)
	at org.apache.spark.sql.delta.sources.DeltaDataSource$.parsePathIdentifier(DeltaDataSource.scala:354)
	at org.apache.spark.sql.delta.catalog.DeltaTableV2.x$1$lzycompute(D

In [None]:
# Stop Spark session when done
spark.stop()
print("Spark session stopped")