# BigQuery-Lite: DuckDB Analytics Engine

This notebook demonstrates BigQuery-like functionality using DuckDB for embedded analytics.
We'll explore:
- Loading and querying Parquet files
- Query execution plans
- Custom UDFs (User Defined Functions)
- Performance analysis and optimization

In [None]:
import duckdb
import pandas as pd
import numpy as np
import time
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns

# Set up plotting
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

## 1. Initialize DuckDB Connection and Load Data

In [None]:
# Create DuckDB connection (in-memory for demonstration)
conn = duckdb.connect(':memory:')

# Enable query profiling
conn.execute("PRAGMA enable_profiling")
conn.execute("PRAGMA profiling_mode = 'detailed'")

print("DuckDB version:", duckdb.__version__)
print("Connection established successfully")

In [None]:
# Load NYC taxi data from Parquet
data_path = '../data/nyc_taxi.parquet'

# Create a view from the Parquet file
conn.execute(f"CREATE VIEW nyc_taxi AS SELECT * FROM read_parquet('{data_path}')")

# Get basic info about the dataset
result = conn.execute("SELECT COUNT(*) as total_rows FROM nyc_taxi").fetchone()
print(f"Total rows in dataset: {result[0]:,}")

# Show schema
schema_info = conn.execute("DESCRIBE nyc_taxi").fetchdf()
print("\nDataset Schema:")
print(schema_info)

## 2. Basic Query Exploration with Execution Plans

In [None]:
# Sample query: Average fare by payment type
query = """
SELECT 
    payment_type,
    COUNT(*) as trip_count,
    AVG(fare_amount) as avg_fare,
    AVG(trip_distance) as avg_distance,
    AVG(total_amount) as avg_total
FROM nyc_taxi 
WHERE fare_amount > 0 AND trip_distance > 0
GROUP BY payment_type
ORDER BY trip_count DESC
"""

# Execute and time the query
start_time = time.time()
result = conn.execute(query).fetchdf()
execution_time = time.time() - start_time

print(f"Query executed in {execution_time:.4f} seconds")
print("\nResults:")
print(result)

In [None]:
# Show query execution plan (BigQuery-like EXPLAIN)
explain_query = f"EXPLAIN ANALYZE {query}"
plan_result = conn.execute(explain_query).fetchdf()

print("Query Execution Plan:")
print("=" * 50)
for row in plan_result.itertuples():
    print(row.explain_value)

## 3. Advanced Analytics - Time Series Analysis

In [None]:
# Time series analysis: Daily trip patterns
time_series_query = """
SELECT 
    DATE(tpep_pickup_datetime) as pickup_date,
    EXTRACT(hour FROM tpep_pickup_datetime) as pickup_hour,
    COUNT(*) as trip_count,
    AVG(fare_amount) as avg_fare,
    SUM(passenger_count) as total_passengers
FROM nyc_taxi 
WHERE tpep_pickup_datetime IS NOT NULL
GROUP BY pickup_date, pickup_hour
ORDER BY pickup_date, pickup_hour
"""

time_series_data = conn.execute(time_series_query).fetchdf()
print(f"Time series data shape: {time_series_data.shape}")
print(time_series_data.head())

In [None]:
# Visualize hourly trip patterns
hourly_trips = time_series_data.groupby('pickup_hour')['trip_count'].sum().reset_index()

plt.figure(figsize=(12, 6))
plt.bar(hourly_trips['pickup_hour'], hourly_trips['trip_count'])
plt.title('NYC Taxi Trips by Hour of Day')
plt.xlabel('Hour of Day')
plt.ylabel('Total Trip Count')
plt.xticks(range(0, 24))
plt.grid(True, alpha=0.3)
plt.show()

## 4. Custom User Defined Functions (UDFs)

In [None]:
# Define Python UDFs

def calculate_tip_percentage(fare_amount, tip_amount):
    """Calculate tip percentage"""
    if fare_amount <= 0:
        return 0.0
    return (tip_amount / fare_amount) * 100

def categorize_trip_distance(distance):
    """Categorize trip distance"""
    if distance <= 1.0:
        return 'Short'
    elif distance <= 5.0:
        return 'Medium'
    elif distance <= 15.0:
        return 'Long'
    else:
        return 'Very Long'

def calculate_speed_mph(distance, pickup_time, dropoff_time):
    """Calculate average speed in mph"""
    if pickup_time is None or dropoff_time is None:
        return 0.0
    
    duration_seconds = (dropoff_time - pickup_time).total_seconds()
    if duration_seconds <= 0:
        return 0.0
    
    duration_hours = duration_seconds / 3600
    return distance / duration_hours if duration_hours > 0 else 0.0

# Register UDFs with DuckDB
conn.create_function('tip_percentage', calculate_tip_percentage)
conn.create_function('trip_category', categorize_trip_distance)
conn.create_function('avg_speed_mph', calculate_speed_mph)

print("Custom UDFs registered successfully!")

In [None]:
# Use custom UDFs in queries
udf_query = """
SELECT 
    trip_category(trip_distance) as distance_category,
    COUNT(*) as trip_count,
    AVG(tip_percentage(fare_amount, tip_amount)) as avg_tip_pct,
    AVG(avg_speed_mph(trip_distance, tpep_pickup_datetime, tpep_dropoff_datetime)) as avg_speed
FROM nyc_taxi 
WHERE 
    fare_amount > 0 
    AND tip_amount >= 0 
    AND trip_distance > 0
    AND tpep_pickup_datetime IS NOT NULL 
    AND tpep_dropoff_datetime IS NOT NULL
GROUP BY distance_category
ORDER BY trip_count DESC
"""

udf_results = conn.execute(udf_query).fetchdf()
print("Results using custom UDFs:")
print(udf_results)

## 5. Performance Testing and Resource Monitoring

In [None]:
# Performance comparison of different query approaches
import psutil
import os

def monitor_query_performance(query, description):
    """Monitor query performance including memory usage"""
    process = psutil.Process(os.getpid())
    
    # Before execution
    mem_before = process.memory_info().rss / 1024 / 1024  # MB
    
    # Execute query
    start_time = time.time()
    result = conn.execute(query).fetchdf()
    execution_time = time.time() - start_time
    
    # After execution
    mem_after = process.memory_info().rss / 1024 / 1024  # MB
    
    print(f"\n{description}:")
    print(f"  Execution time: {execution_time:.4f} seconds")
    print(f"  Memory usage: {mem_after - mem_before:.2f} MB increase")
    print(f"  Result rows: {len(result):,}")
    
    return result, execution_time

# Test different aggregation approaches
simple_agg = "SELECT COUNT(*), AVG(fare_amount) FROM nyc_taxi"
complex_agg = """
SELECT 
    DATE(tpep_pickup_datetime) as date,
    payment_type,
    trip_category(trip_distance) as distance_cat,
    COUNT(*) as trips,
    AVG(fare_amount) as avg_fare,
    STDDEV(fare_amount) as stddev_fare,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY fare_amount) as median_fare
FROM nyc_taxi 
WHERE fare_amount > 0
GROUP BY date, payment_type, distance_cat
ORDER BY date, trips DESC
"""

# Run performance tests
simple_result, simple_time = monitor_query_performance(simple_agg, "Simple Aggregation")
complex_result, complex_time = monitor_query_performance(complex_agg, "Complex Aggregation with UDF")

print(f"\nPerformance Summary:")
print(f"Complex query is {complex_time/simple_time:.1f}x slower than simple query")

## 6. BigQuery-like Features: Window Functions and Analytics

In [None]:
# Advanced analytics with window functions
window_query = """
WITH daily_stats AS (
    SELECT 
        DATE(tpep_pickup_datetime) as pickup_date,
        COUNT(*) as daily_trips,
        SUM(fare_amount) as daily_revenue,
        AVG(fare_amount) as avg_fare
    FROM nyc_taxi 
    WHERE tpep_pickup_datetime IS NOT NULL AND fare_amount > 0
    GROUP BY pickup_date
)
SELECT 
    pickup_date,
    daily_trips,
    daily_revenue,
    avg_fare,
    -- Window functions for trend analysis
    LAG(daily_trips, 1) OVER (ORDER BY pickup_date) as prev_day_trips,
    daily_trips - LAG(daily_trips, 1) OVER (ORDER BY pickup_date) as trip_change,
    AVG(daily_trips) OVER (ORDER BY pickup_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as weekly_avg_trips,
    ROW_NUMBER() OVER (ORDER BY daily_revenue DESC) as revenue_rank
FROM daily_stats
ORDER BY pickup_date
"""

window_results = conn.execute(window_query).fetchdf()
print("Window function analysis:")
print(window_results.head(10))

## 7. Query Optimization and Indexing Simulation

In [None]:
# Simulate BigQuery-like table partitioning
# Create a partitioned view by date
partition_setup = """
CREATE OR REPLACE VIEW nyc_taxi_partitioned AS 
SELECT 
    *,
    DATE(tpep_pickup_datetime) as partition_date,
    EXTRACT(year FROM tpep_pickup_datetime) as partition_year,
    EXTRACT(month FROM tpep_pickup_datetime) as partition_month
FROM nyc_taxi
WHERE tpep_pickup_datetime IS NOT NULL
"""

conn.execute(partition_setup)

# Test partition pruning effect
partition_query = """
SELECT 
    partition_date,
    COUNT(*) as trips,
    AVG(fare_amount) as avg_fare
FROM nyc_taxi_partitioned 
WHERE partition_date BETWEEN '2023-01-01' AND '2023-01-07'
AND fare_amount > 0
GROUP BY partition_date
ORDER BY partition_date
"""

partition_result, partition_time = monitor_query_performance(
    partition_query, 
    "Partitioned Query (7 days)"
)

print("\nPartitioned query results:")
print(partition_result)

## 8. Data Export and Integration Patterns

In [None]:
# Export results to different formats (BigQuery-like EXPORT)
export_query = """
SELECT 
    payment_type,
    trip_category(trip_distance) as distance_category,
    COUNT(*) as trip_count,
    AVG(fare_amount) as avg_fare,
    AVG(tip_percentage(fare_amount, tip_amount)) as avg_tip_pct
FROM nyc_taxi 
WHERE fare_amount > 0 AND tip_amount >= 0
GROUP BY payment_type, distance_category
ORDER BY payment_type, trip_count DESC
"""

export_data = conn.execute(export_query).fetchdf()

# Export to CSV
conn.execute("""
COPY (
    SELECT 
        payment_type,
        trip_category(trip_distance) as distance_category,
        COUNT(*) as trip_count,
        AVG(fare_amount) as avg_fare,
        AVG(tip_percentage(fare_amount, tip_amount)) as avg_tip_pct
    FROM nyc_taxi 
    WHERE fare_amount > 0 AND tip_amount >= 0
    GROUP BY payment_type, distance_category
) TO '../data/trip_summary.csv' WITH (HEADER 1, DELIMITER ',')
""")

# Export to Parquet
conn.execute("""
COPY (
    SELECT * FROM nyc_taxi WHERE fare_amount > 0 LIMIT 10000
) TO '../data/sample_trips.parquet' (FORMAT PARQUET)
""")

print("Data exported successfully:")
print("- ../data/trip_summary.csv")
print("- ../data/sample_trips.parquet")
print(f"\nSummary data shape: {export_data.shape}")
print(export_data)

## 9. Resource Usage Summary

In [None]:
# Get DuckDB memory usage statistics
memory_stats = conn.execute("PRAGMA memory_limit").fetchone()
database_size = conn.execute("PRAGMA database_size").fetchone()

print("DuckDB Performance Summary:")
print("=" * 40)
print(f"Memory limit: {memory_stats[0]}")
print(f"Database size: {database_size[0]}")

# Show system resource usage
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
cpu_percent = process.cpu_percent(interval=1)

print(f"\nSystem Resource Usage:")
print(f"Memory (RSS): {memory_info.rss / 1024 / 1024:.2f} MB")
print(f"Memory (VMS): {memory_info.vms / 1024 / 1024:.2f} MB")
print(f"CPU Usage: {cpu_percent:.1f}%")

print("\n🎉 DuckDB exploration completed successfully!")
print("Ready to integrate with ClickHouse for distributed processing.")

In [None]:
# Clean up
conn.close()
print("DuckDB connection closed.")