# Trino Analytics Homework

## Overview

This notebook demonstrates a complete analytics workflow using Trino for federated queries across PostgreSQL and MySQL databases, with data visualization and long-term storage in Apache Iceberg.

### Architecture

- **Trino**: Distributed SQL query engine for federated queries
- **PostgreSQL**: Source database containing customer and order data
- **MySQL**: Source database containing payment data
- **Apache Iceberg**: Modern table format for analytics with MinIO storage backend
- **Python**: Analytics environment with pandas, matplotlib, and trino libraries

### Workflow Levels

1. **Level 1**: Connections and catalog exploration
2. **Level 2**: Data aggregation and DataFrame creation
3. **Level 3**: Visualization and Iceberg storage

---

## Setup and Imports

Import all required libraries and modules for the analytics workflow.

In [None]:
# Import required libraries
import sys
import warnings
from datetime import datetime

# Third-party imports
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Local module imports
sys.path.append('/home/jovyan/work')
from trino_connection import (
    create_trino_connection,
    test_catalog_connectivity,
    execute_sql_query,
    get_catalog_tables,
    test_data_access,
    close_connection
)
from data_aggregation import (
    aggregate_daily_orders,
    aggregate_daily_payments,
    merge_dataframes_with_fillna,
    calculate_payment_coverage,
    create_final_analytics_dataframe,
    get_analytics_summary
)
from visualization import (
    create_time_series_revenue_chart,
    create_payment_coverage_histogram,
    create_combined_analytics_dashboard,
    display_chart_summary
)
from iceberg_storage import (
    save_analytics_to_iceberg,
    query_iceberg_table,
    verify_data_persistence,
    list_iceberg_tables
)

# Configure display settings
warnings.filterwarnings('ignore')
%matplotlib inline
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)

print("‚úÖ All modules imported successfully")
print(f"üìÖ Notebook execution started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

---

# Level 1: Connections and Catalog Exploration

In this level, we establish connections to Trino and explore the available catalogs and data sources.

## 1.1 Establish Trino Connection

Create a connection to the Trino coordinator and verify connectivity.

In [None]:
# Establish connection to Trino
print("üîå Establishing connection to Trino...")
conn = create_trino_connection()
print("‚úÖ Trino connection established successfully")

## 1.2 Test Catalog Connectivity

Verify that all configured catalogs (PostgreSQL, MySQL, Iceberg) are accessible.

In [None]:
# Test connectivity to all catalogs
print("üîç Testing catalog connectivity...")
catalog_status = test_catalog_connectivity(conn)

print("\nüìä Catalog Connectivity Status:")
print("=" * 35)
for catalog, status in catalog_status.items():
    status_icon = "‚úÖ" if status else "‚ùå"
    print(f"{status_icon} {catalog:<15}: {'Connected' if status else 'Failed'}")

successful_catalogs = sum(catalog_status.values())
total_catalogs = len(catalog_status)
print(f"\nüìà Summary: {successful_catalogs}/{total_catalogs} catalogs connected successfully")

## 1.3 Explore Available Catalogs

List all available catalogs and their schemas.

In [None]:
# Show all available catalogs
catalogs_df = execute_sql_query(conn, "SHOW CATALOGS")
print("üìö Available Catalogs:")
display(catalogs_df)

print(f"\nTotal catalogs available: {len(catalogs_df)}")

## 1.4 Explore PostgreSQL Tables

Examine the structure and content of PostgreSQL tables containing order and customer data.

In [None]:
# Explore PostgreSQL tables
print("üêò PostgreSQL Tables:")
pg_tables = get_catalog_tables(conn, "postgresql", "public")
display(pg_tables)

# Sample data from key tables
print("\nüìã Sample Customer Data:")
customers_sample = execute_sql_query(conn, """
    SELECT customer_id, customer_name, email, created_at
    FROM postgresql.public.trn_customers
    ORDER BY customer_id
    LIMIT 5
""")
display(customers_sample)

print("\nüì¶ Sample Orders Data:")
orders_sample = execute_sql_query(conn, """
    SELECT order_id, customer_id, order_ts, total_amount
    FROM postgresql.public.trn_orders
    ORDER BY order_ts DESC
    LIMIT 5
""")
display(orders_sample)

## 1.5 Explore MySQL Tables

Examine the structure and content of MySQL tables containing payment data.

In [None]:
# Explore MySQL tables
print("üê¨ MySQL Tables:")
mysql_tables = get_catalog_tables(conn, "mysql", "demo_db")
display(mysql_tables)

# Sample payment data
print("\nüí≥ Sample Payment Data:")
payments_sample = execute_sql_query(conn, """
    SELECT payment_id, order_id, amount, paid_at, payment_method
    FROM mysql.demo_db.trn_payments
    ORDER BY paid_at DESC
    LIMIT 5
""")
display(payments_sample)

## 1.6 Test Cross-Catalog Queries

Demonstrate Trino's federated query capabilities by joining data across PostgreSQL and MySQL.

In [None]:
# Test cross-catalog join
print("üîó Cross-Catalog Join Example:")
cross_catalog_query = """
SELECT 
    o.order_id,
    o.customer_id,
    o.total_amount as order_amount,
    p.amount as paid_amount,
    p.payment_method,
    o.order_ts,
    p.paid_at,
    CASE 
        WHEN p.amount >= o.total_amount THEN 'Fully Paid'
        WHEN p.amount > 0 THEN 'Partially Paid'
        ELSE 'Unpaid'
    END as payment_status
FROM postgresql.public.trn_orders o
LEFT JOIN mysql.demo_db.trn_payments p ON o.order_id = p.order_id
ORDER BY o.order_ts DESC
LIMIT 10
"""

cross_catalog_df = execute_sql_query(conn, cross_catalog_query)
display(cross_catalog_df)

print(f"\n‚úÖ Successfully executed cross-catalog query returning {len(cross_catalog_df)} rows")

## 1.7 Data Access Verification

Verify that we can access all required tables and get row counts.

In [None]:
# Test data access across all catalogs
print("üìä Data Access Verification:")
data_access_results = test_data_access(conn)

print("\nüìà Table Row Counts:")
print("=" * 50)
total_rows = 0
for table, count in data_access_results.items():
    if count >= 0:
        print(f"‚úÖ {table:<35}: {count:,} rows")
        total_rows += count
    else:
        print(f"‚ùå {table:<35}: Access failed")

print(f"\nüìä Total rows across all tables: {total_rows:,}")
print("\n‚úÖ Level 1 completed: Connections and catalog exploration successful!")

---

# Level 2: Data Aggregation and DataFrame Creation

In this level, we perform data aggregation across multiple data sources and create analytical DataFrames.

## 2.1 Daily Orders Aggregation

Aggregate order data by day to calculate daily revenue and order counts.

In [None]:
# Aggregate daily orders from PostgreSQL
print("üì¶ Aggregating daily orders data...")
orders_df = aggregate_daily_orders(conn)

print(f"\nüìä Daily Orders Summary:")
print(f"  ‚Ä¢ Date range: {orders_df['dt'].min()} to {orders_df['dt'].max()}")
print(f"  ‚Ä¢ Total days: {len(orders_df)}")
print(f"  ‚Ä¢ Total revenue: ${orders_df['revenue'].sum():,.2f}")
print(f"  ‚Ä¢ Total orders: {orders_df['orders_cnt'].sum():,}")
print(f"  ‚Ä¢ Average daily revenue: ${orders_df['revenue'].mean():,.2f}")
print(f"  ‚Ä¢ Average daily orders: {orders_df['orders_cnt'].mean():.1f}")

print("\nüìã Sample Daily Orders Data:")
display(orders_df.head(10))

## 2.2 Daily Payments Aggregation

Aggregate payment data by day to calculate daily payment amounts and counts.

In [None]:
# Aggregate daily payments from MySQL
print("üí≥ Aggregating daily payments data...")
payments_df = aggregate_daily_payments(conn)

print(f"\nüìä Daily Payments Summary:")
print(f"  ‚Ä¢ Date range: {payments_df['dt'].min()} to {payments_df['dt'].max()}")
print(f"  ‚Ä¢ Total days: {len(payments_df)}")
print(f"  ‚Ä¢ Total paid amount: ${payments_df['paid_amount'].sum():,.2f}")
print(f"  ‚Ä¢ Total payments: {payments_df['payments_cnt'].sum():,}")
print(f"  ‚Ä¢ Average daily paid amount: ${payments_df['paid_amount'].mean():,.2f}")
print(f"  ‚Ä¢ Average daily payments: {payments_df['payments_cnt'].mean():.1f}")

print("\nüìã Sample Daily Payments Data:")
display(payments_df.head(10))

## 2.3 DataFrame Merging with Missing Value Handling

Merge orders and payments DataFrames using proper handling of missing values.

In [None]:
# Merge DataFrames with proper fillna handling
print("üîó Merging orders and payments DataFrames...")
merged_df = merge_dataframes_with_fillna(orders_df, payments_df, fill_value=0.0)

print(f"\nüìä Merged DataFrame Summary:")
print(f"  ‚Ä¢ Total rows: {len(merged_df)}")
print(f"  ‚Ä¢ Date range: {merged_df['dt'].min()} to {merged_df['dt'].max()}")
print(f"  ‚Ä¢ Columns: {list(merged_df.columns)}")

# Check for missing values after merge
missing_values = merged_df.isnull().sum()
print(f"\nüîç Missing values after merge:")
for col, count in missing_values.items():
    if count > 0:
        print(f"  ‚Ä¢ {col}: {count} missing values")
    else:
        print(f"  ‚úÖ {col}: No missing values")

print("\nüìã Sample Merged Data:")
display(merged_df.head(10))

## 2.4 Payment Coverage Calculation

Calculate payment coverage metrics to understand the relationship between orders and payments.

In [None]:
# Calculate payment coverage metrics
print("üìà Calculating payment coverage metrics...")
coverage_df = calculate_payment_coverage(merged_df)

print(f"\nüìä Payment Coverage Analysis:")
coverage_stats = coverage_df['payment_coverage'].describe()
print(f"  ‚Ä¢ Average coverage: {coverage_stats['mean']:.2%}")
print(f"  ‚Ä¢ Median coverage: {coverage_stats['50%']:.2%}")
print(f"  ‚Ä¢ Maximum coverage: {coverage_stats['max']:.2%}")
print(f"  ‚Ä¢ Minimum coverage: {coverage_stats['min']:.2%}")
print(f"  ‚Ä¢ Standard deviation: {coverage_stats['std']:.2%}")

# Coverage distribution analysis
full_coverage_days = (coverage_df['payment_coverage'] >= 1.0).sum()
high_coverage_days = (coverage_df['payment_coverage'] >= 0.8).sum()
low_coverage_days = (coverage_df['payment_coverage'] < 0.5).sum()

print(f"\nüìä Coverage Distribution:")
print(f"  ‚Ä¢ Days with full coverage (‚â•100%): {full_coverage_days} ({full_coverage_days/len(coverage_df):.1%})")
print(f"  ‚Ä¢ Days with high coverage (‚â•80%): {high_coverage_days} ({high_coverage_days/len(coverage_df):.1%})")
print(f"  ‚Ä¢ Days with low coverage (<50%): {low_coverage_days} ({low_coverage_days/len(coverage_df):.1%})")

print("\nüìã Sample Coverage Data:")
display(coverage_df.head(10))

## 2.5 Complete Analytics DataFrame Creation

Create the final analytics DataFrame using the complete pipeline.

In [None]:
# Create final analytics DataFrame using complete pipeline
print("üèóÔ∏è Creating final analytics DataFrame...")
final_df = create_final_analytics_dataframe(conn)

print(f"\nüìä Final Analytics DataFrame:")
print(f"  ‚Ä¢ Shape: {final_df.shape}")
print(f"  ‚Ä¢ Columns: {list(final_df.columns)}")
print(f"  ‚Ä¢ Date range: {final_df['dt'].min()} to {final_df['dt'].max()}")

print("\nüìã Final DataFrame Sample:")
display(final_df.head(10))

print("\nüìã Final DataFrame Tail:")
display(final_df.tail(5))

## 2.6 Analytics Summary Generation

Generate comprehensive summary statistics for the analytics data.

In [None]:
# Generate comprehensive analytics summary
print("üìà Generating analytics summary...")
summary = get_analytics_summary(final_df)

print("\nüìä COMPREHENSIVE ANALYTICS SUMMARY")
print("=" * 50)

print(f"\nüìÖ Time Period:")
print(f"  ‚Ä¢ Total days analyzed: {summary['total_days']}")
print(f"  ‚Ä¢ Date range: {summary['date_range']['start']} to {summary['date_range']['end']}")

print(f"\nüí∞ Revenue Analysis:")
print(f"  ‚Ä¢ Total revenue: ${summary['revenue']['total']:,.2f}")
print(f"  ‚Ä¢ Average daily revenue: ${summary['revenue']['average_daily']:,.2f}")
print(f"  ‚Ä¢ Maximum daily revenue: ${summary['revenue']['max_daily']:,.2f}")
print(f"  ‚Ä¢ Minimum daily revenue: ${summary['revenue']['min_daily']:,.2f}")

print(f"\nüì¶ Orders Analysis:")
print(f"  ‚Ä¢ Total orders: {summary['orders']['total']:,}")
print(f"  ‚Ä¢ Average daily orders: {summary['orders']['average_daily']:.1f}")
print(f"  ‚Ä¢ Maximum daily orders: {summary['orders']['max_daily']:,}")
print(f"  ‚Ä¢ Minimum daily orders: {summary['orders']['min_daily']:,}")

print(f"\nüí≥ Payments Analysis:")
print(f"  ‚Ä¢ Total payments: {summary['payments']['total']:,}")
print(f"  ‚Ä¢ Total paid amount: ${summary['payments']['total_amount']:,.2f}")
print(f"  ‚Ä¢ Average daily payment count: {summary['payments']['average_daily_count']:.1f}")
print(f"  ‚Ä¢ Average daily paid amount: ${summary['payments']['average_daily_amount']:,.2f}")

print(f"\nüìà Payment Coverage Analysis:")
print(f"  ‚Ä¢ Average coverage: {summary['payment_coverage']['average']:.2%}")
print(f"  ‚Ä¢ Maximum coverage: {summary['payment_coverage']['max']:.2%}")
print(f"  ‚Ä¢ Minimum coverage: {summary['payment_coverage']['min']:.2%}")
print(f"  ‚Ä¢ Days with full coverage: {summary['payment_coverage']['days_with_full_coverage']}")

print("\n‚úÖ Level 2 completed: Data aggregation and DataFrame creation successful!")

---

# Level 3: Visualization and Iceberg Storage

In this level, we create professional visualizations and save the analytics results to Apache Iceberg for long-term storage.

## 3.1 Data Visualization Preparation

Prepare the data for visualization and display summary information.

In [None]:
# Display chart data summary
print("üìä Preparing data for visualization...")
display_chart_summary(final_df)

# Set visualization style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print("\n‚úÖ Data prepared for visualization")

## 3.2 Time-Series Revenue Visualization

Create a professional time-series chart showing daily revenue trends.

In [None]:
# Create time-series revenue chart
print("üìà Creating time-series revenue chart...")
fig1 = create_time_series_revenue_chart(
    final_df,
    figsize=(14, 8),
    title="Daily Revenue Analysis - Time Series Trend"
)
plt.show()

print("‚úÖ Time-series revenue chart created successfully")

## 3.3 Payment Coverage Distribution

Create a histogram showing the distribution of payment coverage ratios.

In [None]:
# Create payment coverage histogram
print("üìä Creating payment coverage distribution histogram...")
fig2 = create_payment_coverage_histogram(
    final_df,
    bins=25,
    figsize=(12, 8),
    title="Payment Coverage Distribution Analysis"
)
plt.show()

print("‚úÖ Payment coverage histogram created successfully")

## 3.4 Comprehensive Analytics Dashboard

Create a comprehensive dashboard combining multiple visualizations and summary statistics.

In [None]:
# Create comprehensive analytics dashboard
print("üéõÔ∏è Creating comprehensive analytics dashboard...")
fig3 = create_combined_analytics_dashboard(
    final_df,
    figsize=(18, 12),
    title="Trino Analytics Homework - Comprehensive Dashboard"
)
plt.show()

print("‚úÖ Comprehensive analytics dashboard created successfully")

## 3.5 Iceberg Storage - Schema and Table Creation

Save the analytics results to Apache Iceberg for long-term storage and future analysis.

In [None]:
# Save analytics data to Iceberg
print("üèîÔ∏è Saving analytics data to Apache Iceberg...")
iceberg_results = save_analytics_to_iceberg(
    conn, 
    final_df, 
    table_name="trino_homework_analytics",
    schema_name="homework"
)

print("\nüìä Iceberg Storage Results:")
print("=" * 30)
print(f"‚úÖ Success: {iceberg_results['success']}")
print(f"üìã Table: {iceberg_results['full_table_name']}")
print(f"üìä Rows inserted: {iceberg_results['rows_inserted']:,}")
print(f"üîç Verification passed: {iceberg_results['verification']['verification_passed']}")
print(f"‚è∞ Timestamp: {iceberg_results['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}")

if iceberg_results['success']:
    print("\nüéâ Data successfully saved to Iceberg!")
else:
    print("\n‚ö†Ô∏è Iceberg storage completed with issues")

## 3.6 Iceberg Data Verification

Verify that the data was correctly persisted to Iceberg by querying it back.

In [None]:
# Verify data persistence in Iceberg
print("üîç Verifying data persistence in Iceberg...")

# Query data back from Iceberg
iceberg_df = query_iceberg_table(
    conn, 
    "trino_homework_analytics", 
    "homework",
    order_by="dt"
)

print(f"\nüìä Iceberg Query Results:")
print(f"  ‚Ä¢ Rows retrieved: {len(iceberg_df):,}")
print(f"  ‚Ä¢ Columns: {list(iceberg_df.columns)}")
print(f"  ‚Ä¢ Date range: {iceberg_df['dt'].min()} to {iceberg_df['dt'].max()}")

# Compare key metrics
print(f"\nüîç Data Integrity Check:")
original_revenue = final_df['revenue'].sum()
iceberg_revenue = iceberg_df['revenue'].sum()
revenue_match = abs(original_revenue - iceberg_revenue) < 0.01

original_orders = final_df['orders_cnt'].sum()
iceberg_orders = iceberg_df['orders_cnt'].sum()
orders_match = original_orders == iceberg_orders

print(f"  {'‚úÖ' if revenue_match else '‚ùå'} Revenue: Original=${original_revenue:,.2f}, Iceberg=${iceberg_revenue:,.2f}")
print(f"  {'‚úÖ' if orders_match else '‚ùå'} Orders: Original={original_orders:,}, Iceberg={iceberg_orders:,}")
print(f"  {'‚úÖ' if len(final_df) == len(iceberg_df) else '‚ùå'} Row count: Original={len(final_df)}, Iceberg={len(iceberg_df)}")

print("\nüìã Sample Iceberg Data:")
display(iceberg_df.head(5))

if revenue_match and orders_match and len(final_df) == len(iceberg_df):
    print("\n‚úÖ Data integrity verification passed!")
else:
    print("\n‚ö†Ô∏è Data integrity verification found discrepancies")

## 3.7 Iceberg Table Management

Demonstrate Iceberg table management capabilities.

In [None]:
# List all Iceberg tables in the homework schema
print("üìö Listing Iceberg tables...")
iceberg_tables = list_iceberg_tables(conn, "homework")

if not iceberg_tables.empty:
    print(f"\nüìä Found {len(iceberg_tables)} table(s) in homework schema:")
    for table in iceberg_tables['table_name']:
        print(f"  ‚Ä¢ {table}")
    display(iceberg_tables)
else:
    print("\n‚ÑπÔ∏è No tables found in homework schema")

# Demonstrate analytical query on Iceberg data
print("\nüìà Running analytical query on Iceberg data...")
iceberg_analytics = execute_sql_query(conn, """
    SELECT 
        COUNT(*) as total_days,
        SUM(revenue) as total_revenue,
        AVG(revenue) as avg_daily_revenue,
        SUM(orders_cnt) as total_orders,
        AVG(payment_coverage) as avg_coverage,
        COUNT(CASE WHEN payment_coverage >= 1.0 THEN 1 END) as full_coverage_days
    FROM iceberg.homework.trino_homework_analytics
""")

print("\nüìä Iceberg Analytics Summary:")
display(iceberg_analytics)

print("\n‚úÖ Iceberg table management demonstration completed")

---

# Final Summary and Cleanup

Summarize the complete workflow and clean up resources.

## Workflow Summary

This notebook successfully demonstrated a complete analytics workflow using Trino for federated queries.

In [None]:
# Final workflow summary
print("üéØ TRINO ANALYTICS HOMEWORK - FINAL SUMMARY")
print("=" * 60)

print("\n‚úÖ LEVEL 1 - Connections and Catalog Exploration:")
print("  ‚Ä¢ Established connection to Trino coordinator")
print("  ‚Ä¢ Verified connectivity to PostgreSQL, MySQL, and Iceberg catalogs")
print("  ‚Ä¢ Explored table structures and sample data")
print("  ‚Ä¢ Demonstrated cross-catalog federated queries")

print("\n‚úÖ LEVEL 2 - Data Aggregation and DataFrame Creation:")
print("  ‚Ä¢ Aggregated daily orders data from PostgreSQL")
print("  ‚Ä¢ Aggregated daily payments data from MySQL")
print("  ‚Ä¢ Merged DataFrames with proper missing value handling")
print("  ‚Ä¢ Calculated payment coverage metrics")
print("  ‚Ä¢ Generated comprehensive analytics summary")

print("\n‚úÖ LEVEL 3 - Visualization and Iceberg Storage:")
print("  ‚Ä¢ Created professional time-series revenue charts")
print("  ‚Ä¢ Generated payment coverage distribution histograms")
print("  ‚Ä¢ Built comprehensive analytics dashboard")
print("  ‚Ä¢ Saved results to Apache Iceberg for long-term storage")
print("  ‚Ä¢ Verified data persistence and integrity")

print(f"\nüìä KEY METRICS:")
print(f"  ‚Ä¢ Total days analyzed: {len(final_df)}")
print(f"  ‚Ä¢ Total revenue: ${final_df['revenue'].sum():,.2f}")
print(f"  ‚Ä¢ Total orders: {final_df['orders_cnt'].sum():,}")
print(f"  ‚Ä¢ Average payment coverage: {final_df['payment_coverage'].mean():.2%}")
print(f"  ‚Ä¢ Data successfully persisted to Iceberg: {'Yes' if iceberg_results['success'] else 'No'}")

print(f"\n‚è∞ Execution completed at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("\nüéâ HOMEWORK COMPLETED SUCCESSFULLY!")

## Resource Cleanup

Clean up connections and resources.

In [None]:
# Close Trino connection
print("üßπ Cleaning up resources...")
close_connection(conn)
print("‚úÖ Trino connection closed successfully")

# Clear large variables to free memory
del final_df, orders_df, payments_df, merged_df, coverage_df, iceberg_df
print("‚úÖ Memory cleanup completed")

print("\nüèÅ All resources cleaned up successfully!")
print("\n" + "=" * 60)
print("Thank you for using Trino Analytics Homework!")
print("=" * 60)