# Delta Lake: Transform pandas Prototypes into Production

## Setup and Data Preparation

Install Delta-rs and supporting libraries:

```bash
pip install deltalake pandas duckdb polars
```

We'll use actual NYC Yellow Taxi data to demonstrate real-world scenarios. The NYC Taxi & Limousine Commission provides monthly trip records in Parquet format:

In [None]:
import duckdb
import pandas as pd
import polars as pl
from deltalake import DeltaTable, write_deltalake

# Download NYC Yellow Taxi data (June 2024 as example)
# Full dataset available at: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
taxi_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-06.parquet"

# Read a sample of the data for demonstration
sample_data = pd.read_parquet(taxi_url).head(10000)

print(f"Loaded {len(sample_data)} taxi trips from NYC TLC")
print(f"Data shape: {sample_data.shape}")
print(f"Date range: {sample_data['tpep_pickup_datetime'].min()} to {sample_data['tpep_pickup_datetime'].max()}")

sample_data.head()

## Creating Your First Delta Table

Create your first Delta table in the `data` directory:

In [None]:
write_deltalake("data/taxi_delta_table", sample_data, mode="overwrite")
print("Created Delta table")

# Read back from Delta table
dt = DeltaTable("data/taxi_delta_table")
df_from_delta = dt.to_pandas()

print(f"Delta table contains {len(df_from_delta)} records")

View the Delta table structure:

In [None]:
# Inspect Delta table metadata
print("Delta table schema:")
print(dt.schema().to_arrow())

View the current version of the Delta table:

In [None]:
print(f"Current version: {dt.version()}")

## Incremental Updates and CRUD Operations

Production pipelines require efficient handling of late-arriving data and targeted updates. Delta-rs enables surgical data operations without full dataset rewrites.

To demonstrate this, we'll simulate late-arriving data:

In [None]:
# Simulate late-arriving data
late_data = pd.read_parquet(taxi_url).iloc[10000:10050]
print(f"New data to add: {len(late_data)} records")

### Traditional Approach: Process Everything

The pandas workflow requires loading both existing and new data, combining them, and rewriting the entire output file:

In [None]:
# Pandas approach - reload existing data and merge
existing_df = pd.read_parquet(taxi_url).head(10000)
complete_df = pd.concat([existing_df, late_data])
complete_df.to_parquet("data/taxi_complete.parquet")
print(f"Processed {len(complete_df)} total records")

### Delta-rs Approach: Process Only New Data

Delta-rs performs surgical operations, appending only the new records without touching existing data:

In [None]:
# Delta-rs - append only what's new
write_deltalake("data/taxi_delta_table", late_data, mode="append")

dt = DeltaTable("data/taxi_delta_table")
print(f"Added {len(late_data)} new records")
print(f"Table version: {dt.version()}")

## Time Travel and Data Versioning

Production systems need reliable access to historical data for audits and rollbacks. Delta-rs provides automatic versioning without the storage overhead and complexity of manual backup strategies.

### Traditional Approach: Manual Backup Strategy

Traditional file-based workflows rely on timestamped copies and manual versioning:

In [None]:
# Traditional approach - manual timestamped backups
import os

# Create a backup directory if it doesn't exist
os.makedirs("data", exist_ok=True)

# Example of manual backup workflow (commented out to avoid overwriting)
# timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# sample_data.to_parquet(f"data/taxi_backup_{timestamp}.parquet")  # Create manual backup
# sample_data.to_parquet("data/taxi_data.parquet")  # Overwrite original

print("Traditional approach requires manual backup creation and file management")
print("To recover data, you would need to manually identify and reload backup files")

### Delta-rs Approach: Built-in Time Travel

Delta-rs automatically tracks every change with instant access to any version:

In [None]:
# Access any historical version instantly
dt_v0 = DeltaTable("data/taxi_delta_table", version=0)
current_dt = DeltaTable("data/taxi_delta_table")

print(f"Version 0: {len(dt_v0.to_pandas())} records")
print(f"Current version: {len(current_dt.to_pandas())} records")
print(f"Available versions: {current_dt.version() + 1}")

## Schema Evolution in Action

Data structures evolve as business requirements change. Delta-rs automatically handles schema changes without breaking existing pipelines or requiring migration scripts.

To demonstrate this, imagine NYC's taxi authority introduces weather tracking and surge pricing features, requiring your pipeline to handle new `weather_condition` and `surge_multiplier` columns alongside existing fare data.

In [None]:
# Copy the existing data
enhanced_data = pd.read_parquet(taxi_url).iloc[20000:20100].copy()

# Simulate new data with additional business columns
weather_options = ["clear", "rain", "snow", "cloudy"]
surge_options = [1.0, 1.2, 1.5, 2.0]
enhanced_data["weather_condition"] = [weather_options[i % 4] for i in range(len(enhanced_data))]
enhanced_data["surge_multiplier"] = [surge_options[i % 4] for i in range(len(enhanced_data))]

print(f"Enhanced data: {len(enhanced_data)} records with {len(enhanced_data.columns)} columns")
print(f"New columns: {[col for col in enhanced_data.columns if col not in sample_data.columns]}")

### Traditional Approach: No Schema History

Traditional formats provide no tracking of schema changes or evolution history:

```python
# Traditional approach - no schema versioning or history
df_v1 = pd.read_parquet("taxi_v1.parquet")  # Original schema
df_v2 = pd.read_parquet("taxi_v2.parquet")  # Enhanced schema
```

### Delta-rs Approach: Schema Versioning and History

Delta-rs automatically merges schemas while tracking every change:

In [None]:
# Schema evolution with automatic versioning
write_deltalake(
    "data/taxi_delta_table", 
    enhanced_data, 
    mode="append",
    schema_mode="merge"
)

dt = DeltaTable("data/taxi_delta_table")
print(f"Schema evolved: {len(dt.to_pandas().columns)} columns | Version: {dt.version()}")

Explore the complete schema evolution history and access any previous version:

In [None]:
# View schema change history
history = dt.history()
for entry in history[:2]:
    print(f"Version {entry['version']}: {entry['operation']} at {entry['timestamp']}")

# Access different schema versions
original_schema = DeltaTable("data/taxi_delta_table", version=0)
print(f"\nOriginal schema (v0): {len(original_schema.to_pandas().columns)} columns")
print(f"Current schema (v{dt.version()}): {len(dt.to_pandas().columns)} columns")

## Multi-Engine Integration

Cross-functional teams require unified data access across different analytical tools. Delta-rs eliminates format conversion overhead with native multi-engine support.

### Traditional Approach: Engine-Specific Optimization Requirements

Each engine needs different file optimizations that don't transfer between tools.

Start with the original dataset:

In [None]:
# Traditional approach - Each engine needs different optimizations
data = {"payment_type": [1, 1, 2, 1, 2], "fare_amount": [15.5, 20.0, 18.3, 12.5, 25.0]}
df = pd.DataFrame(data)

The Pandas team optimizes for indexed lookups:

In [None]:
# Pandas team needs indexed Parquet for fast lookups
df.to_parquet("data/pandas_optimized.parquet", index=True)
pandas_result = pd.read_parquet("data/pandas_optimized.parquet")
print(f"Pandas: {len(pandas_result)} trips, avg ${pandas_result['fare_amount'].mean():.2f}")

The Polars team needs sorted data for predicate pushdown optimization:

In [None]:
# Polars team needs sorted columns for predicate pushdown
df.sort_values("payment_type").to_parquet("data/polars_optimized.parquet")
polars_result = pl.read_parquet("data/polars_optimized.parquet").select([
    pl.len().alias("trips"), pl.col("fare_amount").mean().alias("avg_fare")
])
print(f"Polars: {polars_result}")

The DuckDB team requires specific compression for query performance:

In [None]:
# DuckDB needs specific compression/statistics for query planning
df.to_parquet("data/duckdb_optimized.parquet", compression="zstd")
duckdb_result = duckdb.execute("""
    SELECT COUNT(*) as trips, ROUND(AVG(fare_amount), 2) as avg_fare
    FROM 'data/duckdb_optimized.parquet'
""").fetchone()
print(f"DuckDB: {duckdb_result[0]} trips, ${duckdb_result[1]} avg")

### Delta-rs Approach: Universal Optimizations

Delta-rs provides built-in optimizations that benefit all engines simultaneously.

Create one optimized Delta table that serves all engines:

In [None]:
# Delta-rs approach - Universal optimizations for all engines
import duckdb
import polars as pl
from deltalake import DeltaTable, write_deltalake

# Create Delta table with built-in optimizations:
data = {"payment_type": [1, 1, 2, 1, 2], "fare_amount": [15.5, 20.0, 18.3, 12.5, 25.0]}
write_deltalake("data/universal_demo", pd.DataFrame(data))

Pandas benefits from Delta's statistics for efficient filtering:

In [None]:
# Pandas gets automatic optimization benefits
dt = DeltaTable("data/universal_demo")
pandas_result = dt.to_pandas()
print(f"Pandas: {len(pandas_result)} trips, avg ${pandas_result['fare_amount'].mean():.2f}")

Polars leverages Delta's column statistics for predicate pushdown:

In [None]:
# Polars gets predicate pushdown optimization automatically
polars_result = pl.read_delta("data/universal_demo").select([
    pl.len().alias("trips"), 
    pl.col("fare_amount").mean().alias("avg_fare")
])
print(f"Polars: {polars_result}")

DuckDB uses Delta's statistics for query planning optimization:

In [None]:
# DuckDB gets optimized query plans from Delta statistics
duckdb_result = duckdb.execute("""
    SELECT COUNT(*) as trips, ROUND(AVG(fare_amount), 2) as avg_fare
    FROM delta_scan('data/universal_demo')
""").fetchone()
print(f"DuckDB: {duckdb_result[0]} trips, ${duckdb_result[1]} avg")

## Performance Optimization

Production systems require efficient storage management and query performance. Delta-rs provides built-in optimization without manual cleanup scripts or downtime.

### Traditional Approach: Manual Cleanup Scripts

Traditional workflows require custom scripts to manage file cleanup:

```python
# Traditional approach - manual file management
import os
import glob
from datetime import datetime, timedelta

# Find old backup files manually
old_files = []
cutoff_date = datetime.now() - timedelta(days=7)
for file in glob.glob("data/taxi_backup_*.parquet"):
    file_time = datetime.fromtimestamp(os.path.getmtime(file))
    if file_time < cutoff_date:
        old_files.append(file)
        os.remove(file)  # Manual cleanup with risk
```

### Delta-rs Approach: Built-in Vacuum Operation

Delta-rs provides safe, automated cleanup through its `vacuum()` operation, which removes unused transaction files while preserving data integrity. Files become unused when:

• **UPDATE operations** create new versions, leaving old data files unreferenced
• **DELETE operations** remove data, making those files obsolete  
• **Failed transactions** leave temporary files that were never committed
• **Table optimization** consolidates small files, making originals unnecessary

In [None]:
# Delta-rs vacuum removes unused files safely with ACID protection
import os

from deltalake import DeltaTable

dt = DeltaTable("data/taxi_delta_table")

def get_size(path):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(path):
        for filename in filenames:
            total_size += os.path.getsize(os.path.join(dirpath, filename))
    return total_size / (1024 * 1024)

# Delta-rs automatically protects against concurrent operations
before_size = get_size("data/taxi_delta_table")

# Safe cleanup - files only deleted if no active readers/writers
dt.vacuum(retention_hours=168)  # Built-in safety: won't delete files in use

after_size = get_size("data/taxi_delta_table")

print("Delta vacuum completed safely")
print(f"Storage before: {before_size:.1f} MB")
print(f"Storage after: {after_size:.1f} MB")
print(f"Space reclaimed: {before_size - after_size:.1f} MB")