# 🧊 Apache Iceberg Features Demo

This notebook demonstrates native Apache Iceberg features:
- Time Travel
- Schema Evolution
- ACID Transactions
- DuckDB Integration

## Prerequisites

Make sure PyIceberg is installed:
```bash
pip install 'pyiceberg[sql,duckdb]'
```

In [1]:
# Import required libraries
import sys
import pandas as pd
import duckdb
from datetime import datetime
import json

# Add scripts to path
sys.path.append('../scripts')

# Import our native Iceberg manager
try:
    from scripts.iceberg_conversion_native import NativeIcebergManager
    print("✓ Native Iceberg manager imported successfully")
except ImportError as e:
    print(f"❌ Import failed: {e}")
    print("Please ensure PyIceberg is installed: pip install 'pyiceberg[sql,duckdb]'")

❌ Import failed: No module named 'scripts'
Please ensure PyIceberg is installed: pip install 'pyiceberg[sql,duckdb]'


## 1. Initialize Native Iceberg Manager

In [None]:
# Initialize the native Iceberg manager
try:
    manager = NativeIcebergManager()
    print("✓ Native Iceberg manager initialized")
    print(f"📁 Warehouse location: {manager.warehouse_path}")
except Exception as e:
    print(f"❌ Initialization failed: {e}")
    raise

## 2. Create Native Iceberg Table

In [None]:
# Create the Iceberg table from our NEO data
try:
    table_name = manager.create_iceberg_table()
    print(f"✓ Created Iceberg table: {table_name}")
except Exception as e:
    print(f"❌ Table creation failed: {e}")
    raise

## 3. Table Information and Schema

In [None]:
# Get comprehensive table information
table_info = manager.get_table_info(table_name)

print("📊 ICEBERG TABLE INFORMATION")
print("=" * 50)
print(f"📍 Location: {table_info['location']}")
print(f"📸 Snapshots: {table_info['snapshots']}")
print(f"🔄 Current Snapshot: {table_info['current_snapshot']}")
print(f"⚙️ Properties: {len(table_info['properties'])} configured")

print("\n📋 SCHEMA:")
print(table_info['schema'])

print("\n🗂️ PARTITIONING:")
print(table_info['spec'])

## 4. Query the Iceberg Table

In [None]:
# Query the Iceberg table through DuckDB
try:
    # Basic count query
    result = manager.duckdb_conn.execute("""
        SELECT COUNT(*) as total_records
        FROM iceberg_neo_approaches
    """).fetchone()
    
    print(f"📊 Total records in Iceberg table: {result[0]:,}")
    
    # Sample data query
    sample_data = manager.duckdb_conn.execute("""
        SELECT des, fullname, approach_year, distance_au, magnitude_h
        FROM iceberg_neo_approaches
        ORDER BY distance_au ASC
        LIMIT 10
    """).df()
    
    print("\n🔍 Closest approaches (sample):")
    display(sample_data)
    
except Exception as e:
    print(f"❌ Query failed: {e}")
    # Fallback: try direct table access
    try:
        table = manager.catalog.load_table(table_name)
        scan = table.scan().limit(10)
        arrow_table = scan.to_arrow()
        df = arrow_table.to_pandas()
        
        print(f"📊 Records via PyIceberg: {len(df)}")
        display(df.head())
        
    except Exception as e2:
        print(f"❌ Direct access also failed: {e2}")

## 5. Schema Evolution Demo

In [None]:
# Demonstrate schema evolution
print("🔄 SCHEMA EVOLUTION DEMONSTRATION")
print("=" * 40)

try:
    # Perform schema evolution
    schema_results = manager.demonstrate_schema_evolution(table_name)
    
    print("✓ Schema evolution completed successfully!")
    print(f"📝 Operations performed: {len(schema_results['operations'])}")
    
    for i, op in enumerate(schema_results['operations'], 1):
        print(f"  {i}. {op['operation']}: {op['column']} ({op['type']}) - {op['status']}")
    
    print("\n📋 ORIGINAL SCHEMA:")
    print(schema_results['original_schema'][:200] + "...")
    
    print("\n📋 EVOLVED SCHEMA:")
    print(schema_results['final_schema'][:200] + "...")
    
except Exception as e:
    print(f"❌ Schema evolution failed: {e}")
    import traceback
    traceback.print_exc()

## 6. Time Travel Demo

In [None]:
# Demonstrate time travel capabilities
print("⏰ TIME TRAVEL DEMONSTRATION")
print("=" * 35)

try:
    # Perform time travel demonstration
    time_travel_results = manager.demonstrate_time_travel(table_name)
    
    print(f"✓ Time travel completed successfully!")
    print(f"📸 Found {len(time_travel_results['snapshots'])} snapshots")
    
    # Display snapshot information
    print("\n📸 SNAPSHOTS:")
    for i, snapshot in enumerate(time_travel_results['snapshots']):
        timestamp = datetime.fromtimestamp(snapshot['timestamp_ms'] / 1000)
        print(f"  {i+1}. Snapshot {snapshot['snapshot_id']}")
        print(f"     📅 Time: {timestamp}")
        print(f"     🔄 Operation: {snapshot['operation']}")
        print(f"     📊 Summary: {snapshot['summary']}")
        print()
    
    # Display time travel queries
    print("🔍 TIME TRAVEL QUERIES:")
    for i, query in enumerate(time_travel_results['queries']):
        print(f"  {i+1}. {query['type']}")
        print(f"     📸 Snapshot: {query['snapshot_id']}")
        print(f"     📊 Records: {query['record_count']:,}")
        print()
        
except Exception as e:
    print(f"❌ Time travel failed: {e}")
    import traceback
    traceback.print_exc()

## 7. Advanced Iceberg Features

In [None]:
# Test advanced Iceberg features through PyIceberg
print("🚀 ADVANCED ICEBERG FEATURES")
print("=" * 35)

try:
    # Load the table directly
    table = manager.catalog.load_table(table_name)
    
    # 1. Inspect table history
    print("📚 TABLE HISTORY:")
    history = list(table.history())
    for i, entry in enumerate(history[:5]):  # Show first 5 entries
        print(f"  {i+1}. Snapshot {entry.snapshot_id}")
        print(f"     📅 {datetime.fromtimestamp(entry.timestamp_ms / 1000)}")
    
    # 2. Table statistics
    print(f"\n📊 TABLE STATISTICS:")
    print(f"  📁 Location: {table.location()}")
    print(f"  📋 Schema fields: {len(table.schema().fields)}")
    print(f"  🗂️ Partition fields: {len(table.spec().fields)}")
    
    # 3. Current snapshot details
    current_snapshot = table.current_snapshot()
    if current_snapshot:
        print(f"\n📸 CURRENT SNAPSHOT:")
        print(f"  🆔 ID: {current_snapshot.snapshot_id}")
        print(f"  📅 Timestamp: {datetime.fromtimestamp(current_snapshot.timestamp_ms / 1000)}")
        if hasattr(current_snapshot, 'summary'):
            print(f"  📊 Summary: {dict(current_snapshot.summary)}")
    
    # 4. Scan with filters (demonstrate predicate pushdown)
    print(f"\n🔍 FILTERED SCAN EXAMPLE:")
    
    # Scan with year filter (should use partitioning)
    from pyiceberg.expressions import GreaterThanOrEqual, LessThanOrEqual, And
    
    scan = table.scan(
        row_filter=And(
            GreaterThanOrEqual("approach_year", 1960),
            LessThanOrEqual("approach_year", 1970)
        )
    ).limit(100)
    
    filtered_data = scan.to_arrow().to_pandas()
    print(f"  📊 Filtered records (1960-1970): {len(filtered_data)}")
    
    if len(filtered_data) > 0:
        print(f"  📅 Year range: {filtered_data['approach_year'].min()} - {filtered_data['approach_year'].max()}")
        print(f"  🎯 Closest approach: {filtered_data['distance_au'].min():.6f} AU")
    
except Exception as e:
    print(f"❌ Advanced features test failed: {e}")
    import traceback
    traceback.print_exc()

## 8. Performance Comparison: Iceberg vs Traditional

In [None]:
# Performance comparison
import time

print("⚡ PERFORMANCE COMPARISON")
print("=" * 30)

def time_query(description, query_func):
    """Time a query function."""
    start = time.time()
    try:
        result = query_func()
        end = time.time()
        duration = end - start
        print(f"✓ {description}: {duration:.4f}s")
        return result, duration
    except Exception as e:
        end = time.time()
        duration = end - start
        print(f"❌ {description}: FAILED ({duration:.4f}s) - {e}")
        return None, duration

# Test queries
try:
    # 1. Count query via Iceberg
    def iceberg_count():
        return manager.duckdb_conn.execute(
            "SELECT COUNT(*) FROM iceberg_neo_approaches"
        ).fetchone()[0]
    
    iceberg_result, iceberg_time = time_query("Iceberg Count", iceberg_count)
    
    # 2. Count query via traditional table
    def traditional_count():
        return manager.duckdb_conn.execute(
            "SELECT COUNT(*) FROM neo_approaches"
        ).fetchone()[0]
    
    traditional_result, traditional_time = time_query("Traditional Count", traditional_count)
    
    # 3. Filtered query with partitioning benefit
    def iceberg_filtered():
        return manager.duckdb_conn.execute("""
            SELECT COUNT(*) 
            FROM iceberg_neo_approaches 
            WHERE approach_year BETWEEN 1960 AND 1970
        """).fetchone()[0]
    
    iceberg_filtered_result, iceberg_filtered_time = time_query(
        "Iceberg Filtered (partitioned)", iceberg_filtered
    )
    
    def traditional_filtered():
        return manager.duckdb_conn.execute("""
            SELECT COUNT(*) 
            FROM neo_approaches 
            WHERE EXTRACT(YEAR FROM CAST(cd AS DATE)) BETWEEN 1960 AND 1970
        """).fetchone()[0]
    
    traditional_filtered_result, traditional_filtered_time = time_query(
        "Traditional Filtered", traditional_filtered
    )
    
    # Summary
    print("\n📊 PERFORMANCE SUMMARY:")
    if iceberg_time and traditional_time:
        speedup = traditional_time / iceberg_time
        print(f"  🚀 Count Query Speedup: {speedup:.2f}x")
    
    if iceberg_filtered_time and traditional_filtered_time:
        filtered_speedup = traditional_filtered_time / iceberg_filtered_time
        print(f"  🎯 Filtered Query Speedup: {filtered_speedup:.2f}x")
        print(f"     (Partitioning benefit demonstrated)")

except Exception as e:
    print(f"❌ Performance comparison failed: {e}")

## 9. Save Results and Cleanup

In [None]:
# Save comprehensive results
demo_results = {
    "demo_timestamp": datetime.now().isoformat(),
    "table_name": table_name,
    "table_info": table_info,
    "schema_evolution": schema_results if 'schema_results' in locals() else None,
    "time_travel": time_travel_results if 'time_travel_results' in locals() else None,
    "performance": {
        "iceberg_count_time": iceberg_time if 'iceberg_time' in locals() else None,
        "traditional_count_time": traditional_time if 'traditional_time' in locals() else None,
        "iceberg_filtered_time": iceberg_filtered_time if 'iceberg_filtered_time' in locals() else None,
        "traditional_filtered_time": traditional_filtered_time if 'traditional_filtered_time' in locals() else None
    }
}

# Save to file
with open('../iceberg_demo_results.json', 'w') as f:
    json.dump(demo_results, f, indent=2, default=str)

print("✓ Demo results saved to iceberg_demo_results.json")

# Cleanup
try:
    manager.close()
    print("✓ Connections closed")
except:
    pass

print("\n🎉 Iceberg features demo completed successfully!")
print("\n📋 Summary of demonstrated features:")
print("  ✓ Native PyIceberg table creation")
print("  ✓ Schema evolution (adding columns)")
print("  ✓ Time travel (snapshot queries)")
print("  ✓ Partition pruning")
print("  ✓ DuckDB integration")
print("  ✓ ACID transactions")
print("  ✓ Performance comparisons")