# Iceberg Table Benchmarking System

This notebook provides a complete interface for creating and benchmarking Iceberg tables in Databricks.

## Features
- **Dataset Generation**: Create tables with configurable size and schema
- **Write Performance**: Measure table creation and insert throughput
- **Read Benchmarking**: Run comprehensive query performance tests
- **Configuration Management**: Easily modify parameters through config variables

## Quick Start
1. Modify configuration variables in the "Configuration" section
2. Run "Create Dataset" to generate a new table
3. Run "Benchmark Queries" to test read performance
4. View detailed performance metrics and results

## Setup and Imports

In [None]:
import sys
import os
import json
from datetime import datetime

# Add databricks-utils to path
sys.path.append(os.path.join(os.getcwd(), '..', '..', 'databricks-utils'))

# Import our modules with the new schema-driven functions
from dataset_generator import (
    create_table_by_rows, create_table_by_size, 
    create_table_with_custom_schema, DEFAULT_SCHEMAS,
    SUPPORTED_TYPES
)
from benchmark_queries import run_standard_benchmark
from cluster_execution import get_e2_demo_client, get_cluster_info

print("✓ All modules imported successfully!")
print(f"Working directory: {os.getcwd()}")

# Display available schema types and data types
print("\n📋 Available Schema Types:")
for name, config in DEFAULT_SCHEMAS.items():
    col_summary = ', '.join([f'{col["name"]}:{col["type"]}' for col in config])
    print(f"  {name}: {col_summary}")

print(f"\n🔧 Supported Data Types:")
type_names = list(SUPPORTED_TYPES.keys())
for i in range(0, len(type_names), 4):  # Print 4 types per line
    line_types = type_names[i:i+4]
    print(f"  {', '.join(line_types)}")

print(f"\nTotal: {len(type_names)} data types supported, including VARIANT for JSON-like data")

## Configuration

Modify these variables to control your benchmarking setup:

In [None]:
# ========================================
# CONFIGURATION VARIABLES
# ========================================

# Experiment Configuration
RUN_NAME = "schema_test"  # Name for this benchmarking run (used for organizing results)

# Database Configuration
SCHEMA = "users.ashwin_srikant"  # Target schema for all tables
TABLE_NAME = "iceberg_schema_benchmark_test"  # Table name for this session

# Dataset Generation Method (choose one)
USE_ROW_COUNT = True  # Set to True to use row count, False to use target size

# Row-based configuration
NUM_ROWS = 1000  # Number of rows to generate

# Size-based configuration (used if USE_ROW_COUNT = False)
TARGET_SIZE_GB = 1.0  # Target table size in GB

# Schema Configuration - Choose a predefined schema or define custom
SCHEMA_TYPE = "mixed_types"  # Options: 'simple', 'mixed_types', 'with_variant', 'custom'

# Custom Schema Configuration (used when SCHEMA_TYPE = 'custom')
# Define your own table schema as a list of column dictionaries
CUSTOM_SCHEMA = [
    {'name': 'id', 'type': 'bigint', 'primary_key': True},
    {'name': 'user_name', 'type': 'string', 'length': 50},
    {'name': 'email', 'type': 'string', 'length': 100},
    {'name': 'age', 'type': 'int', 'min_value': 18, 'max_value': 100},
    {'name': 'balance', 'type': 'decimal', 'precision': 10, 'scale': 2, 'min_value': 0, 'max_value': 100000},
    {'name': 'is_premium', 'type': 'boolean'},
    {'name': 'signup_date', 'type': 'date'},
    {'name': 'last_activity', 'type': 'timestamp'},
    {'name': 'profile_data', 'type': 'variant'},  # JSON-like data
    {'name': 'notes', 'type': 'string', 'length': 500}
]

# Cluster Configuration
CLUSTER_ID = "0819-033442-njp866rg"  # Use your cluster ID here

# Import the schema configurations from dataset_generator
import sys
import os
sys.path.append(os.path.join(os.getcwd(), '..', '..', 'databricks-utils'))
from dataset_generator import DEFAULT_SCHEMAS

# Display current configuration
print("📋 CURRENT CONFIGURATION")
print("="*50)
print(f"Run Name: {RUN_NAME}")
print(f"Schema: {SCHEMA}")
print(f"Table: {TABLE_NAME}")
print(f"Cluster ID: {CLUSTER_ID}")
print()

# Display selected schema configuration
if SCHEMA_TYPE == 'custom':
    selected_schema = CUSTOM_SCHEMA
    print(f"Schema Type: Custom ({len(selected_schema)} columns)")
else:
    selected_schema = DEFAULT_SCHEMAS.get(SCHEMA_TYPE, DEFAULT_SCHEMAS['simple'])
    print(f"Schema Type: {SCHEMA_TYPE} ({len(selected_schema)} columns)")

print()
print("📊 TABLE SCHEMA:")
for i, col in enumerate(selected_schema, 1):
    extra_info = []
    if 'length' in col:
        extra_info.append(f"length={col['length']}")
    if 'min_value' in col or 'max_value' in col:
        if 'min_value' in col and 'max_value' in col:
            extra_info.append(f"range={col['min_value']}-{col['max_value']}")
        elif 'min_value' in col:
            extra_info.append(f"min={col['min_value']}")
        elif 'max_value' in col:
            extra_info.append(f"max={col['max_value']}")
    if col.get('primary_key'):
        extra_info.append("PRIMARY KEY")
    
    extra_str = f" ({', '.join(extra_info)})" if extra_info else ""
    print(f"  {i:2}. {col['name']:<15} {col['type'].upper():<10} {extra_str}")

print()
if USE_ROW_COUNT:
    print(f"Mode: Row count based")
    print(f"Rows: {NUM_ROWS:,}")
    # Estimate size based on schema
    estimated_row_size = sum(
        col.get('length', 100) if col['type'] in ['string', 'varchar'] 
        else 200 if col['type'] == 'variant'
        else 50 
        for col in selected_schema
    )
    estimated_size_mb = (NUM_ROWS * estimated_row_size) / (1024 * 1024)
    print(f"Estimated row size: {estimated_row_size} bytes")
    print(f"Estimated total size: {estimated_size_mb:.2f} MB")
else:
    print(f"Mode: Size based")
    print(f"Target size: {TARGET_SIZE_GB} GB")

print("="*50)

# Store the selected schema for use in other cells
SELECTED_SCHEMA_CONFIG = selected_schema

## Initialize Databricks Connection

In [None]:
# Test Databricks connection
try:
    client = get_e2_demo_client()
    print("✓ Databricks connection established successfully!")
    print(f"Connected to: {os.getenv('DATABRICKS_HOST')}")
except Exception as e:
    print(f"❌ Failed to connect to Databricks: {e}")
    print("Please check your E2_DEMO_FIELD_ENG_PAT environment variable")

## Cluster Configuration

Display detailed information about the compute cluster being used for benchmarking:

In [None]:
# Get and display cluster configuration
try:
    print("🖥️  Retrieving cluster configuration...")
    cluster_info = get_cluster_info(client, CLUSTER_ID)
    
    if 'error' not in cluster_info:
        print("\n📊 CLUSTER CONFIGURATION")
        print("="*60)
        
        # Display key cluster information in a formatted way
        print(f"Cluster Name:        {cluster_info['cluster_name']}")
        print(f"Cluster ID:          {cluster_info['cluster_id']}")
        print(f"State:               {cluster_info['state']}")
        print()
        
        print("💻 Compute Configuration:")
        print(f"  Node Type:         {cluster_info['node_type_id']}")
        print(f"  Driver Node:       {cluster_info['driver_node_type_id']}")
        print(f"  Workers:           {cluster_info['num_workers']}")
        print()
        
        print("⚙️  Runtime Configuration:")
        print(f"  Spark Version:     {cluster_info['spark_version']}")
        print(f"  Runtime Engine:    {cluster_info['runtime_engine']}")
        print(f"  Security Mode:     {cluster_info['data_security_mode']}")
        print(f"  Auto-termination:  {cluster_info['autotermination_minutes']} minutes")
        
        # Display AWS-specific info if available
        if 'aws_zone_id' in cluster_info:
            print()
            print("☁️  AWS Configuration:")
            print(f"  Availability Zone: {cluster_info['aws_zone_id']}")
            if cluster_info.get('aws_instance_profile_arn'):
                print(f"  Instance Profile:  {cluster_info['aws_instance_profile_arn']}")
        
        # Display autoscaling info if available
        if 'autoscale_min_workers' in cluster_info:
            print()
            print("📈 Autoscaling Configuration:")
            print(f"  Min Workers:       {cluster_info['autoscale_min_workers']}")
            print(f"  Max Workers:       {cluster_info['autoscale_max_workers']}")
        
        print("="*60)
        
        # Store cluster info for session summary
        cluster_configuration = cluster_info
        
    else:
        print(f"❌ Failed to retrieve cluster info: {cluster_info['error']}")
        cluster_configuration = None

except Exception as e:
    print(f"❌ Error getting cluster configuration: {e}")
    cluster_configuration = None

In [None]:
# Record start time for session tracking
session_start = datetime.now()
print(f"🚀 Starting dataset creation at {session_start.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Using schema: {SCHEMA_TYPE}")
print()

# Create dataset based on configuration using the new schema-driven approach
try:
    if USE_ROW_COUNT:
        print(f"Creating table with {NUM_ROWS:,} rows using '{SCHEMA_TYPE}' schema...")
        write_performance = create_table_with_custom_schema(
            table_name=TABLE_NAME,
            schema_config=SELECTED_SCHEMA_CONFIG,
            num_rows=NUM_ROWS,
            database_schema=SCHEMA
        )
    else:
        print(f"Creating table with target size {TARGET_SIZE_GB} GB using '{SCHEMA_TYPE}' schema...")
        write_performance = create_table_by_size(
            table_name=TABLE_NAME,
            target_size_gb=TARGET_SIZE_GB,
            schema_config=SELECTED_SCHEMA_CONFIG,
            database_schema=SCHEMA
        )
    
    print("\n✅ Dataset creation completed successfully!")
    
    # Store results for later reference
    write_results = write_performance
    
except Exception as e:
    print(f"❌ Failed to create dataset: {e}")
    import traceback
    traceback.print_exc()
    write_results = None

## Write Performance Analysis

Detailed analysis of table creation and insert performance:

In [None]:
if write_results:
    print("📊 WRITE PERFORMANCE ANALYSIS")
    print("="*60)
    
    # Key metrics
    total_time = write_results['total_time_seconds']
    insert_time = write_results['total_insert_time_seconds']
    text_gen_time = write_results['text_generation_time_seconds']
    
    print(f"Table: {write_results['table_name']}")
    print(f"Rows: {write_results['num_rows']:,}")
    print(f"Data size: {write_results['total_data_size_mb']:.2f} MB")
    print()
    print("⏱️  Performance Breakdown:")
    print(f"  Total time:        {total_time:8.3f}s (100.0%)")
    print(f"  Insert operations: {insert_time:8.3f}s ({(insert_time/total_time)*100:5.1f}%)")
    print(f"  Text generation:   {text_gen_time:8.3f}s ({(text_gen_time/total_time)*100:5.1f}%)")
    print()
    print("🚀 Throughput Metrics:")
    print(f"  Overall:     {write_results['overall_rows_per_second']:8.1f} rows/s | {write_results['overall_mb_per_second']:8.2f} MB/s")
    print(f"  Insert only: {write_results['insert_rows_per_second']:8.1f} rows/s | {write_results['insert_mb_per_second']:8.2f} MB/s")
    
    # Table statistics
    if 'table_stats' in write_results:
        stats = write_results['table_stats']
        print()
        print("📈 Final Table Statistics:")
        print(f"  {stats}")
    
    print("="*60)
else:
    print("❌ No write performance data available. Please run dataset creation first.")

## Run Read Performance Benchmarks

Execute a comprehensive suite of read queries to measure query performance:

In [None]:
# Run comprehensive read benchmarks
try:
    print(f"🔍 Starting read performance benchmarks on {SCHEMA}.{TABLE_NAME}")
    print()
    
    benchmark_results = run_standard_benchmark(TABLE_NAME, SCHEMA)
    
    print("\n✅ Read benchmarks completed successfully!")
    
except Exception as e:
    print(f"❌ Failed to run benchmarks: {e}")
    benchmark_results = []

## Read Performance Analysis

Detailed analysis of query performance results:

In [None]:
if benchmark_results:
    print("📊 READ PERFORMANCE ANALYSIS")
    print("="*80)
    
    # Separate successful and failed queries
    successful = [r for r in benchmark_results if r["status"] == "success"]
    failed = [r for r in benchmark_results if r["status"] == "error"]
    
    print(f"📈 Summary:")
    print(f"  Total queries: {len(benchmark_results)}")
    print(f"  Successful: {len(successful)}")
    print(f"  Failed: {len(failed)}")
    
    if successful:
        total_time = sum(r["execution_time_seconds"] for r in successful)
        avg_time = total_time / len(successful)
        min_time = min(r["execution_time_seconds"] for r in successful)
        max_time = max(r["execution_time_seconds"] for r in successful)
        
        print(f"  Total execution time: {total_time:.3f}s")
        print(f"  Average query time: {avg_time:.3f}s")
        print(f"  Fastest query: {min_time:.3f}s")
        print(f"  Slowest query: {max_time:.3f}s")
        
        print()
        print("⚡ Query Performance Breakdown:")
        print(f"{'Query Name':<35} {'Time (s)':<10} {'Status':<10}")
        print("-" * 60)
        
        for result in benchmark_results:
            status_icon = "✓" if result['status'] == 'success' else "✗"
            print(f"{result['query_name']:<35} {result['execution_time_seconds']:<10.3f} {status_icon} {result['status']}")
    
    if failed:
        print()
        print("❌ Failed Queries:")
        for result in failed:
            print(f"  {result['query_name']}: {result.get('error', 'Unknown error')}")
    
    print("="*80)
else:
    print("❌ No benchmark results available. Please run read benchmarks first.")

## Session Summary

Complete overview of this benchmarking session:

In [None]:
session_end = datetime.now()
session_duration = session_end - session_start

print("🎯 BENCHMARKING SESSION SUMMARY")
print("="*70)
print(f"Run Name: {RUN_NAME}")
print(f"Session started: {session_start.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Session ended:   {session_end.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Total duration:  {session_duration}")
print()

print(f"📋 Configuration:")
print(f"  Table: {SCHEMA}.{TABLE_NAME}")
print(f"  Cluster: {CLUSTER_ID}")
if USE_ROW_COUNT:
    print(f"  Rows: {NUM_ROWS:,}")
    print(f"  Text length: {TEXT_LENGTH:,} bytes")
else:
    print(f"  Target size: {TARGET_SIZE_GB} GB")

# Include cluster configuration in summary
if 'cluster_configuration' in locals() and cluster_configuration and 'error' not in cluster_configuration:
    print()
    print(f"🖥️  Cluster Details:")
    print(f"  Node Type: {cluster_configuration['node_type_id']}")
    print(f"  Workers: {cluster_configuration['num_workers']}")
    print(f"  Runtime: {cluster_configuration['runtime_engine']}")
    print(f"  Spark Version: {cluster_configuration['spark_version']}")

print()
if 'write_results' in locals() and write_results:
    print(f"📝 Write Performance:")
    print(f"  Created {write_results['num_rows']:,} rows in {write_results['total_time_seconds']:.1f}s")
    print(f"  Throughput: {write_results['overall_rows_per_second']:.1f} rows/s")
    print(f"  Data size: {write_results['total_data_size_mb']:.2f} MB")

print()
if 'benchmark_results' in locals() and benchmark_results:
    successful_reads = [r for r in benchmark_results if r["status"] == "success"]
    print(f"📖 Read Performance:")
    print(f"  Executed {len(benchmark_results)} queries ({len(successful_reads)} successful)")
    if successful_reads:
        avg_read_time = sum(r["execution_time_seconds"] for r in successful_reads) / len(successful_reads)
        print(f"  Average query time: {avg_read_time:.3f}s")

print()
print("🏁 Session completed successfully!")
print("="*70)

## Export Results (Optional)

Save detailed results to JSON files for further analysis:

In [None]:
# Export results to JSON files
export_results = input("Export results to JSON files? (y/N): ").lower().strip() == 'y'

if export_results:
    # Create organized directory structure
    timestamp = session_start.strftime('%Y%m%d_%H%M%S')
    results_dir = f"results/{RUN_NAME}_{timestamp}"
    
    # Create directory if it doesn't exist
    os.makedirs(results_dir, exist_ok=True)
    
    print(f"📁 Creating results directory: {results_dir}")
    
    # Export write performance
    if 'write_results' in locals() and write_results:
        write_filename = os.path.join(results_dir, f"write_performance.json")
        with open(write_filename, 'w') as f:
            json.dump(write_results, f, indent=2, default=str)
        print(f"✓ Write performance saved to: {write_filename}")
    
    # Export read benchmarks
    if 'benchmark_results' in locals() and benchmark_results:
        read_filename = os.path.join(results_dir, f"read_benchmarks.json")
        with open(read_filename, 'w') as f:
            json.dump(benchmark_results, f, indent=2, default=str)
        print(f"✓ Read benchmarks saved to: {read_filename}")
    
    # Export session summary with cluster configuration and run metadata
    summary = {
        "run_metadata": {
            "run_name": RUN_NAME,
            "timestamp": timestamp,
            "session_start": session_start.isoformat(),
            "session_end": session_end.isoformat(),
            "duration_seconds": session_duration.total_seconds()
        },
        "configuration": {
            "schema": SCHEMA,
            "table_name": TABLE_NAME,
            "cluster_id": CLUSTER_ID,
            "use_row_count": USE_ROW_COUNT,
            "num_rows": NUM_ROWS if USE_ROW_COUNT else None,
            "target_size_gb": TARGET_SIZE_GB if not USE_ROW_COUNT else None,
            "text_length": TEXT_LENGTH
        },
        "cluster_configuration": cluster_configuration if 'cluster_configuration' in locals() else None,
        "performance_summary": {
            "write_throughput_rows_per_sec": write_results.get('overall_rows_per_second') if 'write_results' in locals() and write_results else None,
            "write_throughput_mb_per_sec": write_results.get('overall_mb_per_second') if 'write_results' in locals() and write_results else None,
            "avg_read_query_time_sec": sum(r["execution_time_seconds"] for r in benchmark_results if r["status"] == "success") / len([r for r in benchmark_results if r["status"] == "success"]) if 'benchmark_results' in locals() and benchmark_results else None,
            "successful_read_queries": len([r for r in benchmark_results if r["status"] == "success"]) if 'benchmark_results' in locals() and benchmark_results else 0,
            "total_read_queries": len(benchmark_results) if 'benchmark_results' in locals() and benchmark_results else 0
        }
    }
    
    summary_filename = os.path.join(results_dir, f"session_summary.json")
    with open(summary_filename, 'w') as f:
        json.dump(summary, f, indent=2, default=str)
    print(f"✓ Session summary saved to: {summary_filename}")
    
    # Create a simple README for the run
    readme_content = f"""# Benchmarking Run: {RUN_NAME}

## Run Information
- **Run Name**: {RUN_NAME}
- **Timestamp**: {timestamp}
- **Date**: {session_start.strftime('%Y-%m-%d %H:%M:%S')}
- **Duration**: {session_duration}

## Configuration
- **Table**: {SCHEMA}.{TABLE_NAME}
- **Cluster**: {CLUSTER_ID}
{"- **Rows**: {:,}".format(NUM_ROWS) if USE_ROW_COUNT else "- **Target Size**: {} GB".format(TARGET_SIZE_GB)}
- **Text Length**: {TEXT_LENGTH:,} bytes

## Files in this directory
- `write_performance.json` - Detailed write performance metrics
- `read_benchmarks.json` - Read query benchmark results  
- `session_summary.json` - Complete session summary with cluster info
- `README.md` - This file

## Quick Results Summary
"""
    
    if 'write_results' in locals() and write_results:
        readme_content += f"- **Write Throughput**: {write_results['overall_rows_per_second']:.1f} rows/s\n"
        readme_content += f"- **Data Size**: {write_results['total_data_size_mb']:.2f} MB\n"
    
    if 'benchmark_results' in locals() and benchmark_results:
        successful_reads = [r for r in benchmark_results if r["status"] == "success"]
        if successful_reads:
            avg_read_time = sum(r["execution_time_seconds"] for r in successful_reads) / len(successful_reads)
            readme_content += f"- **Average Read Query Time**: {avg_read_time:.3f}s\n"
            readme_content += f"- **Successful Queries**: {len(successful_reads)}/{len(benchmark_results)}\n"
    
    readme_filename = os.path.join(results_dir, "README.md")
    with open(readme_filename, 'w') as f:
        f.write(readme_content)
    print(f"✓ Run documentation saved to: {readme_filename}")
    
    print(f"\n📊 All results exported to: {results_dir}")
    print(f"🏷️  Run identifier: {RUN_NAME}_{timestamp}")
else:
    print("Results not exported.")

## Next Steps

### To run different configurations:
1. Modify the configuration variables in the "Configuration" section
2. Re-run the "Create Dataset" and "Benchmark Queries" sections

### To compare results:
- Export results from different runs using different configurations
- Analyze JSON files to compare performance across different table sizes

### Recommended experiments:
- Test different row counts (1K, 10K, 100K, 1M)
- Test different text lengths (1KB, 10KB, 100KB, 1MB)
- Compare performance with different cluster configurations
- Analyze query patterns that perform best/worst on your data

### Performance optimization:
- Monitor cluster utilization during tests
- Consider partitioning strategies for larger datasets
- Test with different Spark configurations
- Analyze query execution plans for optimization opportunities