In [None]:
import os
import subprocess
import sys
import platform

print("🐧 Linux Apache Iceberg Setup")
print("=" * 40)

# Detect Linux distribution
try:
    with open('/etc/os-release', 'r') as f:
        os_info = f.read()
        if 'ubuntu' in os_info.lower():
            distro = 'ubuntu'
        elif 'centos' in os_info.lower():
            distro = 'centos'
        elif 'debian' in os_info.lower():
            distro = 'debian'
        elif 'rhel' in os_info.lower():
            distro = 'rhel'
        else:
            distro = 'unknown'
    print(f"🖥️ Linux Distribution: {distro}")
except:
    distro = 'unknown'
    print("🖥️ Linux Distribution: unknown")

# Check for Java installation on Linux
try:
    java_version = subprocess.run(['java', '-version'], capture_output=True, text=True, stderr=subprocess.STDOUT)
    if java_version.returncode == 0:
        print("☕ Java is installed")
        
        # Try to find JAVA_HOME on Linux
        java_home_candidates = [
            '/usr/lib/jvm/java-8-openjdk-amd64',
            '/usr/lib/jvm/java-11-openjdk-amd64',
            '/usr/lib/jvm/default-java',
            '/usr/java/default',
            '/opt/java'
        ]
        
        java_home_found = False
        for candidate in java_home_candidates:
            if os.path.exists(candidate):
                os.environ["JAVA_HOME"] = candidate
                print(f"🏠 JAVA_HOME: {candidate}")
                java_home_found = True
                break
        
        if not java_home_found:
            print("⚠️ Could not determine JAVA_HOME automatically")
            print("💡 Please set JAVA_HOME manually in your environment")
            
    else:
        print("❌ Java not found!")
        if distro in ['ubuntu', 'debian']:
            print("💡 Install Java using: sudo apt-get update && sudo apt-get install openjdk-8-jdk")
        elif distro in ['centos', 'rhel']:
            print("💡 Install Java using: sudo yum install java-1.8.0-openjdk-devel")
        else:
            print("💡 Install Java 8+ for your Linux distribution")
        sys.exit(1)
        
except FileNotFoundError:
    print("❌ Java not found!")
    if distro in ['ubuntu', 'debian']:
        print("💡 Install Java using: sudo apt-get update && sudo apt-get install openjdk-8-jdk")
    elif distro in ['centos', 'rhel']:
        print("💡 Install Java using: sudo yum install java-1.8.0-openjdk-devel")
    else:
        print("💡 Install Java 8+ for your Linux distribution")
    sys.exit(1)

print("✅ Java setup completed!")


In [None]:
# Install Python packages for Linux
print("📦 Installing Python packages for Linux...")

%pip install -q pyspark==3.4.1
%pip install -q pyiceberg[s3fs]==0.5.1
%pip install -q pandas>=2.0.0
%pip install -q numpy>=1.21.0
%pip install -q matplotlib seaborn

print("✅ Package installation completed!")

# Test imports
try:
    import pandas as pd
    import numpy as np
    import pyspark
    print(f"📊 Pandas: {pd.__version__}")
    print(f"🔢 NumPy: {np.__version__}")
    print(f"⚡ PySpark: {pyspark.__version__}")
except ImportError as e:
    print(f"❌ Import error: {e}")
    
print("🚀 Linux setup completed!")


In [None]:
# Spark Configuration for Linux
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, sum, avg, count
import os

# Download Iceberg JAR for Linux
jar_url = "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/1.4.2/iceberg-spark-runtime-3.4_2.12-1.4.2.jar"
jar_path = "./iceberg-spark-runtime.jar"
warehouse_path = "./iceberg-warehouse"

# S3 configuration (commented out - uncomment and configure for S3)
# s3_warehouse_path = "s3a://your-iceberg-bucket/your-warehouse-path"
# s3_access_key = "YOUR_AWS_ACCESS_KEY_ID"
# s3_secret_key = "YOUR_AWS_SECRET_ACCESS_KEY"
# s3_hadoop_jar = "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar"
# s3_bundle_jar = "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.901/aws-java-sdk-bundle-1.11.901.jar"

print("📥 Downloading Iceberg JAR for Linux...")
!wget -q {jar_url} -O {jar_path}

# Download S3 related JARs if using S3 (uncomment below if using S3)
# if 's3_hadoop_jar' in locals() and not os.path.exists("./hadoop-aws.jar"):
#     print("📥 Downloading Hadoop AWS JAR for Linux...")
#     !wget -q {s3_hadoop_jar} -O ./hadoop-aws.jar
# if 's3_bundle_jar' in locals() and not os.path.exists("./aws-java-sdk-bundle.jar"):
#     print("📥 Downloading AWS SDK Bundle JAR for Linux...")
#     !wget -q {s3_bundle_jar} -O ./aws-java-sdk-bundle.jar

# Configure Spark with Iceberg for Linux
print("⚡ Initializing Spark with Iceberg on Linux...")

spark_builder = SparkSession.builder \
    .appName("Iceberg Telecom Demo - Linux") \
    .config("spark.jars", jar_path) \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "2g")

# Configure for local warehouse
spark_builder = spark_builder.config("spark.sql.catalog.local.warehouse", warehouse_path) \
                             .config("spark.sql.warehouse.dir", warehouse_path)

# Configure for S3 warehouse (commented out - uncomment and configure for S3)
# spark_builder = spark_builder.config("spark.sql.catalog.local.warehouse", s3_warehouse_path) \
#                              .config("spark.sql.warehouse.dir", s3_warehouse_path) \
#                              .config("spark.jars", f"{jar_path},./hadoop-aws.jar,./aws-java-sdk-bundle.jar") \
#                              .config("spark.hadoop.fs.s3a.access.key", s3_access_key) \
#                              .config("spark.hadoop.fs.s3a.secret.key", s3_secret_key) \
#                              .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

spark = spark_builder.getOrCreate()

# Set log level to reduce noise
spark.sparkContext.setLogLevel("WARN")

print(f"✅ Spark {spark.version} with Iceberg initialized on Linux!")
print(f"📁 Warehouse: {warehouse_path}")
print(f"☕ Java Home: {os.environ.get('JAVA_HOME', 'Not set')}")

# Generate Telecom Data for Linux
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random

print("\n📡 Generating Telecom Data for Linux...")

# Linux configuration (full-scale demo)
chunk_size_seconds = 60
site_count = 200  # Larger dataset for Linux
start_time = datetime(2023, 1, 1)
demo_chunks = 100  # More data for Linux

regions = ["North", "South", "East", "West", "Central"]
cities = {
    "North": ["Dendam", "Rondburg", "Nordville"],
    "South": ["Schieveste", "Southpark"],
    "East": ["Schipstad", "Dort", "Eastport"],
    "West": ["Damstad", "Westfield"],
    "Central": ["Centrum", "Midtown"]
}
technologies = ["6G", "7G", "8G"]  # More technologies for Linux
vendors = ["Ericia", "Noson", "Weihu", "Samsong"]  # More vendors

# Generate site metadata
sites = []
for i in range(site_count):
    region = random.choice(regions)
    city = random.choice(cities[region])
    tech = random.choices(technologies, weights=[0.3, 0.5, 0.2])[0]
    vendor = random.choices(vendors, weights=[0.3, 0.3, 0.2, 0.2])[0]
    site_id = f"SITE_{i:05d}"
    sites.append({
        "site_id": site_id,
        "region": region,
        "city": city,
        "technology": tech,
        "vendor": vendor,
    })

def generate_telecom_data_chunk(second_offset):
    timestamp = start_time + timedelta(seconds=second_offset)
    records = []
    for site in sites:
        region = site["region"]
        city = site["city"]
        tech = site["technology"]
        vendor = site["vendor"]

        # Enhanced signal strength (RSSI) for Linux
        base_rssi = -70 if tech == "8G" else (-75 if tech == "7G" else -85)
        if vendor == "Noson":
            base_rssi += 3
        elif vendor == "Weihu":
            base_rssi -= 2
        elif vendor == "Samsong":
            base_rssi += 1
        rssi = np.random.normal(loc=base_rssi, scale=3)

        # Enhanced latency modeling
        if region == "Central":
            base_latency = 25 if tech == "8G" else (35 if tech == "7G" else 60)
        else:
            base_latency = 30 if tech == "8G" else (40 if tech == "7G" else 65)
        latency = np.random.normal(loc=base_latency, scale=8)

        # Data volume (in MB)
        data_volume = np.random.exponential(scale=8)

        # CPU usage with vendor-specific characteristics
        base_cpu = 45 + (5 if tech == "8G" else (7 if tech == "7G" else 0))
        if vendor == "Weihu":
            base_cpu += 8
        elif vendor == "Samsong":
            base_cpu += 3
        cpu_usage = np.clip(np.random.normal(loc=base_cpu, scale=10), 0, 100)

        # Drop rate calculation
        city_penalty = 0.03 if city in ["Damstad", "Schieveste"] else 0.008
        vendor_penalty = 0.015 if vendor == "Weihu" else 0.005
        drop_rate = (
            min(1.0, 0.001 * cpu_usage + city_penalty + vendor_penalty + np.random.beta(1, 200)) * 100
        )

        records.append({
            "timestamp": timestamp,
            "region": region,
            "city": city,
            "site_id": site["site_id"],
            "technology": tech,
            "vendor": vendor,
            "rssi_dbm": round(rssi, 2),
            "latency_ms": round(latency, 2),
            "data_volume_mb": round(data_volume, 2),
            "drop_rate_percent": round(drop_rate, 2),
            "cpu_usage_percent": round(cpu_usage, 2),
        })
    return records

# Generate telecom time series data
print("⚡ Generating comprehensive telecom metrics for Linux...")
all_records = []
for chunk_idx in range(demo_chunks):
    chunk_records = []
    for second in range(chunk_size_seconds):
        minute_offset = chunk_idx * chunk_size_seconds + second
        chunk_records.extend(generate_telecom_data_chunk(minute_offset))
    all_records.extend(chunk_records)

# Convert to Spark DataFrames
telecom_metrics_df = spark.createDataFrame(all_records)
sites_df = spark.createDataFrame(sites)

print("✅ Linux telecom data generated!")
print(f"   📡 Sites: {sites_df.count():,}")
print(f"   📊 Metrics: {telecom_metrics_df.count():,}")

# Show sample data
print("\n📊 Sample Telecom Data:")
telecom_metrics_df.show(10)
sites_df.show(10)


In [None]:
# Create Iceberg Tables for Linux Production Environment
print("🏗️ Creating Iceberg tables for Linux...")

# Create telecom sites table
sites_df.write \
    .format("iceberg") \
    .mode("overwrite") \
    .saveAsTable("local.db.telecom_sites")

# Create telecom metrics table with timestamp partitioning
telecom_metrics_df.write \
    .format("iceberg") \
    .mode("overwrite") \
    .partitionBy("timestamp") \
    .saveAsTable("local.db.telecom_metrics")

print("✅ Iceberg tables created for Linux production!")

# Verify tables
spark.sql("SHOW TABLES IN local.db").show()

# Comprehensive Linux Analytics
print("\n📊 Comprehensive Telecom Analytics:")

# Performance by vendor and technology
print("\n🏢 Vendor Performance Analysis:")
spark.sql("""
    SELECT 
        s.vendor,
        s.technology,
        COUNT(DISTINCT s.site_id) as sites,
        ROUND(AVG(m.rssi_dbm), 2) as avg_rssi,
        ROUND(AVG(m.latency_ms), 2) as avg_latency,
        ROUND(AVG(m.drop_rate_percent), 2) as avg_drop_rate,
        ROUND(AVG(m.cpu_usage_percent), 2) as avg_cpu
    FROM local.db.telecom_sites s
    JOIN local.db.telecom_metrics m ON s.site_id = m.site_id
    GROUP BY s.vendor, s.technology
    ORDER BY s.vendor, s.technology
""").show()

# Regional performance analysis
print("\n🌍 Regional Performance Analysis:")
spark.sql("""
    SELECT 
        s.region,
        COUNT(DISTINCT s.site_id) as sites,
        ROUND(AVG(m.rssi_dbm), 2) as avg_rssi,
        ROUND(AVG(m.latency_ms), 2) as avg_latency,
        ROUND(SUM(m.data_volume_mb), 2) as total_data_gb
    FROM local.db.telecom_sites s
    JOIN local.db.telecom_metrics m ON s.site_id = m.site_id
    GROUP BY s.region
    ORDER BY total_data_gb DESC
""").show()

# Time travel and schema evolution demo
print("\n🕐 Advanced Iceberg Features for Linux:")

# Show all snapshots
snapshots = spark.sql("SELECT snapshot_id, committed_at FROM local.db.telecom_metrics.snapshots ORDER BY committed_at").collect()
print(f"📸 Available snapshots: {len(snapshots)}")

# Schema evolution example
print("\n🔄 Schema Evolution Example:")
spark.sql("ALTER TABLE local.db.telecom_metrics ADD COLUMN network_load_percent DOUBLE").collect()
print("   ✅ Added network_load_percent column")

# Add sample data with new column
from pyspark.sql.functions import lit
updated_df = telecom_metrics_df.withColumn("network_load_percent", lit(np.random.uniform(20, 80)))
updated_df.limit(1000).write \
    .format("iceberg") \
    .mode("append") \
    .saveAsTable("local.db.telecom_metrics")

print("   ✅ Appended data with new schema")

# Show updated schema
print("\n📋 Updated Table Schema:")
spark.sql("DESCRIBE local.db.telecom_metrics").show()

# Performance optimization
print("\n⚡ Performance Optimization:")
spark.sql("CALL local.system.rewrite_data_files('local.db.telecom_metrics')").collect()
print("   ✅ Data files compacted")

print("\n✅ Linux Enterprise Iceberg Demo Completed!")
print("🚀 Ready for production telecom data lake deployment!")

# Show final statistics
total_records = spark.sql("SELECT COUNT(*) as total FROM local.db.telecom_metrics").collect()[0]['total']
print(f"📊 Total telecom records: {total_records:,}")

# Optional: Show table size
table_files = spark.sql("SELECT COUNT(*) as files FROM local.db.telecom_metrics.files").collect()[0]['files']
print(f"📁 Total data files: {table_files}")

# Cleanup option (uncomment if needed)
# spark.stop()


In [None]:
# Basic Iceberg Operations - Linux Enterprise
print("🔧 Basic Iceberg Operations (Linux)")
print("=" * 45)

# 1. Comprehensive Table Information
print("\n📋 Detailed Table Information:")
spark.sql("DESCRIBE EXTENDED local.db.telecom_metrics").show(truncate=False)

# 2. Table Properties and Configuration
print("\n⚙️ Table Properties:")
spark.sql("SHOW TBLPROPERTIES local.db.telecom_metrics").show(truncate=False)

# 3. Partition Information
print("\n📊 Partition Analysis:")
spark.sql("""
    SELECT 
        DATE(timestamp) as partition_date,
        COUNT(*) as records,
        COUNT(DISTINCT site_id) as unique_sites,
        ROUND(SUM(data_volume_mb)/1024, 2) as total_data_gb,
        ROUND(AVG(rssi_dbm), 2) as avg_rssi,
        ROUND(AVG(latency_ms), 2) as avg_latency
    FROM local.db.telecom_metrics 
    GROUP BY DATE(timestamp)
    ORDER BY partition_date
    LIMIT 15
""").show()

# 4. Advanced Filtering and Window Functions
print("\n🔍 Advanced Query Analysis:")
spark.sql("""
    SELECT 
        s.site_id,
        s.region,
        s.vendor,
        s.technology,
        ROUND(AVG(m.rssi_dbm), 2) as avg_rssi,
        ROUND(AVG(m.latency_ms), 2) as avg_latency,
        ROUND(AVG(m.cpu_usage_percent), 2) as avg_cpu,
        ROW_NUMBER() OVER (PARTITION BY s.region ORDER BY AVG(m.rssi_dbm) DESC) as region_rank
    FROM local.db.telecom_sites s
    JOIN local.db.telecom_metrics m ON s.site_id = m.site_id
    GROUP BY s.site_id, s.region, s.vendor, s.technology
    HAVING AVG(m.rssi_dbm) > -80
    ORDER BY s.region, region_rank
    LIMIT 20
""").show()

# 5. Data Quality Checks
print("\n✅ Data Quality Analysis:")
spark.sql("""
    SELECT 
        'Total Records' as metric,
        COUNT(*) as value
    FROM local.db.telecom_metrics
    
    UNION ALL
    
    SELECT 
        'Records with NULL RSSI' as metric,
        COUNT(*) as value
    FROM local.db.telecom_metrics
    WHERE rssi_dbm IS NULL
    
    UNION ALL
    
    SELECT 
        'Records with Extreme Latency (>200ms)' as metric,
        COUNT(*) as value
    FROM local.db.telecom_metrics
    WHERE latency_ms > 200
    
    UNION ALL
    
    SELECT 
        'Records with High Drop Rate (>10%)' as metric,
        COUNT(*) as value
    FROM local.db.telecom_metrics
    WHERE drop_rate_percent > 10
""").show()

print("✅ Basic operations analysis completed!")


In [None]:
# Advanced Time Travel & Snapshots - Linux Enterprise
print("🕐 Time Travel & Snapshots (Linux Enterprise)")
print("=" * 50)

# 1. Comprehensive Snapshot Analysis
print("\n📸 Detailed Snapshot Information:")
snapshots_df = spark.sql("""
    SELECT 
        snapshot_id,
        committed_at,
        summary,
        manifest_list,
        schema_id
    FROM local.db.telecom_metrics.snapshots 
    ORDER BY committed_at
""")
snapshots_df.show(truncate=False)

snapshots = snapshots_df.collect()
print(f"Total snapshots available: {len(snapshots)}")

# 2. Create multiple snapshots with different data patterns
print("\n➕ Creating Multiple Snapshots for Time Travel Demo...")

# Snapshot 1: Network upgrade simulation
print("   📡 Simulating network upgrade...")
upgrade_records = []
current_time = datetime.now()

for site in sites[:50]:  # Simulate upgrade for 50 sites
    # Improved metrics after upgrade
    base_rssi = -65 if site["technology"] == "8G" else -70
    base_latency = 25 if site["technology"] == "8G" else 30
    
    upgrade_records.append({
        "timestamp": current_time,
        "region": site["region"],
        "city": site["city"],
        "site_id": site["site_id"],
        "technology": site["technology"],
        "vendor": site["vendor"],
        "rssi_dbm": round(np.random.normal(loc=base_rssi, scale=2), 2),
        "latency_ms": round(np.random.normal(loc=base_latency, scale=5), 2),
        "data_volume_mb": round(np.random.exponential(scale=10), 2),
        "drop_rate_percent": round(np.random.beta(1, 300) * 100, 2),
        "cpu_usage_percent": round(np.clip(np.random.normal(loc=40, scale=8), 0, 100), 2),
    })

upgrade_df = spark.createDataFrame(upgrade_records)
upgrade_df.write.format("iceberg").mode("append").saveAsTable("local.db.telecom_metrics")
print("   ✅ Network upgrade data added")

# Snapshot 2: Peak traffic simulation
print("   📈 Simulating peak traffic period...")
peak_records = []
peak_time = current_time + timedelta(hours=1)

for site in sites[:30]:  # Peak traffic affects 30 sites
    # Degraded performance during peak
    degraded_rssi = np.random.normal(loc=-80, scale=4)
    degraded_latency = np.random.normal(loc=70, scale=15)
    
    peak_records.append({
        "timestamp": peak_time,
        "region": site["region"],
        "city": site["city"],
        "site_id": site["site_id"],
        "technology": site["technology"],
        "vendor": site["vendor"],
        "rssi_dbm": round(degraded_rssi, 2),
        "latency_ms": round(degraded_latency, 2),
        "data_volume_mb": round(np.random.exponential(scale=15), 2),
        "drop_rate_percent": round(np.random.beta(1, 100) * 100, 2),
        "cpu_usage_percent": round(np.clip(np.random.normal(loc=80, scale=10), 0, 100), 2),
    })

peak_df = spark.createDataFrame(peak_records)
peak_df.write.format("iceberg").mode("append").saveAsTable("local.db.telecom_metrics")
print("   ✅ Peak traffic data added")

# 3. Show updated snapshots
print("\n📸 Updated Snapshot Timeline:")
updated_snapshots = spark.sql("""
    SELECT 
        snapshot_id,
        committed_at,
        summary['added-records'] as added_records,
        summary['total-records'] as total_records
    FROM local.db.telecom_metrics.snapshots 
    ORDER BY committed_at
""")
updated_snapshots.show(truncate=False)

# 4. Time Travel Queries
print("\n🔍 Time Travel Analysis:")

if len(snapshots) >= 2:
    # Compare performance between different snapshots
    snapshot_1 = snapshots[0]['snapshot_id']
    snapshot_2 = snapshots[-1]['snapshot_id'] if len(snapshots) > 1 else snapshots[0]['snapshot_id']
    
    print(f"Comparing snapshots: {snapshot_1} vs Current")
    
    # Historical performance
    historical_perf = spark.sql(f"""
        SELECT 
            'Historical' as period,
            COUNT(*) as records,
            ROUND(AVG(rssi_dbm), 2) as avg_rssi,
            ROUND(AVG(latency_ms), 2) as avg_latency,
            ROUND(AVG(drop_rate_percent), 2) as avg_drop_rate
        FROM local.db.telecom_metrics 
        FOR SYSTEM_VERSION AS OF {snapshot_1}
    """)
    
    # Current performance
    current_perf = spark.sql("""
        SELECT 
            'Current' as period,
            COUNT(*) as records,
            ROUND(AVG(rssi_dbm), 2) as avg_rssi,
            ROUND(AVG(latency_ms), 2) as avg_latency,
            ROUND(AVG(drop_rate_percent), 2) as avg_drop_rate
        FROM local.db.telecom_metrics
    """)
    
    # Union results for comparison
    comparison_df = historical_perf.union(current_perf)
    comparison_df.show()
    
    # Time-based analysis
    print("\n📊 Performance Evolution by Technology:")
    spark.sql(f"""
        SELECT 
            s.technology,
            'Historical' as period,
            ROUND(AVG(m.rssi_dbm), 2) as avg_rssi,
            ROUND(AVG(m.latency_ms), 2) as avg_latency
        FROM local.db.telecom_sites s
        JOIN (
            SELECT * FROM local.db.telecom_metrics 
            FOR SYSTEM_VERSION AS OF {snapshot_1}
        ) m ON s.site_id = m.site_id
        GROUP BY s.technology
        
        UNION ALL
        
        SELECT 
            s.technology,
            'Current' as period,
            ROUND(AVG(m.rssi_dbm), 2) as avg_rssi,
            ROUND(AVG(m.latency_ms), 2) as avg_latency
        FROM local.db.telecom_sites s
        JOIN local.db.telecom_metrics m ON s.site_id = m.site_id
        GROUP BY s.technology
        
        ORDER BY technology, period
    """).show()

# 5. Rollback Simulation
print("\n⏪ Rollback Simulation:")
try:
    # Show how to rollback to a previous snapshot
    if len(snapshots) > 0:
        rollback_snapshot = snapshots[0]['snapshot_id']
        print(f"   To rollback to snapshot {rollback_snapshot}, use:")
        print(f"   CALL local.system.rollback_to_snapshot('local.db.telecom_metrics', {rollback_snapshot})")
        print("   ⚠️ Not executing rollback in demo to preserve data")
except Exception as e:
    print(f"   ⚠️ Rollback procedure: {str(e)}")

print("\n✅ Time travel and snapshot analysis completed!")


In [None]:
# Advanced Schema Evolution - Linux Enterprise
print("🔄 Advanced Schema Evolution (Linux Enterprise)")
print("=" * 55)

# 1. Current Schema Analysis
print("\n📋 Current Schema Structure:")
current_schema = spark.sql("DESCRIBE local.db.telecom_metrics")
current_schema.show()

# Store current columns for comparison
current_columns = [row['col_name'] for row in current_schema.collect() if row['col_name'] not in ['', '# Partitioning', '# Metadata']]
print(f"Current columns: {len(current_columns)}")

# 2. Enterprise Schema Evolution - Add Multiple Columns
print("\n➕ Adding Enterprise Telecom Columns...")

# Add network quality metrics
evolution_steps = [
    ("network_quality_score", "DOUBLE", "Overall network quality score (0-100)"),
    ("signal_to_noise_ratio", "DOUBLE", "Signal-to-noise ratio in dB"),
    ("throughput_mbps", "DOUBLE", "Actual throughput in Mbps"),
    ("jitter_ms", "DOUBLE", "Network jitter in milliseconds"),
    ("packet_loss_percent", "DOUBLE", "Packet loss percentage"),
    ("is_5g_compatible", "BOOLEAN", "5G compatibility flag"),
    ("energy_efficiency_score", "DOUBLE", "Energy efficiency score (0-100)"),
    ("maintenance_required", "BOOLEAN", "Maintenance required flag"),
    ("last_maintenance_date", "DATE", "Last maintenance date"),
    ("firmware_version", "STRING", "Equipment firmware version"),
    ("antenna_type", "STRING", "Antenna type classification"),
    ("coverage_radius_km", "DOUBLE", "Coverage radius in kilometers")
]

for col_name, col_type, description in evolution_steps:
    try:
        spark.sql(f"ALTER TABLE local.db.telecom_metrics ADD COLUMN {col_name} {col_type}").collect()
        print(f"   ✅ Added {col_name} ({col_type}) - {description}")
    except Exception as e:
        print(f"   ⚠️ Failed to add {col_name}: {str(e)}")

# 3. Show evolved schema
print("\n📋 Evolved Schema Structure:")
evolved_schema = spark.sql("DESCRIBE local.db.telecom_metrics")
evolved_schema.show()

# 4. Insert Enhanced Enterprise Data
print("\n📊 Inserting Enhanced Enterprise Data...")
from pyspark.sql.functions import lit, when, col
from datetime import date

enhanced_enterprise_records = []
current_time = datetime.now()

for i, site in enumerate(sites[:100]):  # Use 100 sites for comprehensive demo
    # Calculate advanced metrics
    rssi = np.random.normal(loc=-72, scale=4)
    latency = np.random.normal(loc=35, scale=10)
    
    # Network quality score algorithm
    quality_score = max(0, min(100, 100 - abs(rssi + 50) * 1.5 - latency * 0.5))
    
    # Signal-to-noise ratio
    snr = np.random.normal(loc=15, scale=3)
    
    # Throughput based on technology and quality
    base_throughput = 200 if site["technology"] == "8G" else (150 if site["technology"] == "7G" else 100)
    throughput = np.random.normal(loc=base_throughput, scale=20)
    
    # Jitter and packet loss
    jitter = np.random.exponential(scale=2)
    packet_loss = np.random.beta(1, 200) * 100
    
    # 5G compatibility
    is_5g = site["technology"] in ["7G", "8G"] and site["vendor"] in ["Ericia", "Noson", "Samsong"]
    
    # Energy efficiency
    energy_score = np.random.normal(loc=75, scale=10)
    energy_score = max(0, min(100, energy_score))
    
    # Maintenance flags
    maintenance_required = np.random.choice([True, False], p=[0.15, 0.85])
    last_maintenance = date(2023, np.random.randint(1, 13), np.random.randint(1, 28))
    
    # Equipment details
    firmware_versions = ["v2.1.4", "v2.2.1", "v2.3.0", "v3.0.1"]
    firmware = np.random.choice(firmware_versions)
    
    antenna_types = ["Omnidirectional", "Directional", "Sector", "Parabolic"]
    antenna = np.random.choice(antenna_types)
    
    # Coverage radius
    coverage_radius = np.random.normal(loc=2.5, scale=0.5)
    coverage_radius = max(0.5, coverage_radius)
    
    enhanced_enterprise_records.append({
        "timestamp": current_time,
        "region": site["region"],
        "city": site["city"],
        "site_id": site["site_id"],
        "technology": site["technology"],
        "vendor": site["vendor"],
        "rssi_dbm": round(rssi, 2),
        "latency_ms": round(latency, 2),
        "data_volume_mb": round(np.random.exponential(scale=8), 2),
        "drop_rate_percent": round(np.random.beta(1, 200) * 100, 2),
        "cpu_usage_percent": round(np.clip(np.random.normal(loc=50, scale=10), 0, 100), 2),
        "network_quality_score": round(quality_score, 2),
        "signal_to_noise_ratio": round(snr, 2),
        "throughput_mbps": round(throughput, 2),
        "jitter_ms": round(jitter, 2),
        "packet_loss_percent": round(packet_loss, 2),
        "is_5g_compatible": is_5g,
        "energy_efficiency_score": round(energy_score, 2),
        "maintenance_required": maintenance_required,
        "last_maintenance_date": last_maintenance,
        "firmware_version": firmware,
        "antenna_type": antenna,
        "coverage_radius_km": round(coverage_radius, 2)
    })

enhanced_enterprise_df = spark.createDataFrame(enhanced_enterprise_records)
enhanced_enterprise_df.write.format("iceberg").mode("append").saveAsTable("local.db.telecom_metrics")

print("✅ Enhanced enterprise data inserted!")

# 5. Advanced Analytics with New Schema
print("\n📊 Advanced Enterprise Analytics:")

# Network Quality Analysis
print("\n🏆 Network Quality Analysis:")
spark.sql("""
    SELECT 
        s.vendor,
        s.technology,
        COUNT(*) as measurements,
        ROUND(AVG(m.network_quality_score), 2) as avg_quality,
        ROUND(AVG(m.throughput_mbps), 2) as avg_throughput,
        ROUND(AVG(m.energy_efficiency_score), 2) as avg_energy_efficiency,
        ROUND(AVG(m.signal_to_noise_ratio), 2) as avg_snr,
        SUM(CASE WHEN m.is_5g_compatible THEN 1 ELSE 0 END) as compatible_5g_count
    FROM local.db.telecom_sites s
    JOIN local.db.telecom_metrics m ON s.site_id = m.site_id
    WHERE m.network_quality_score IS NOT NULL
    GROUP BY s.vendor, s.technology
    ORDER BY avg_quality DESC, avg_throughput DESC
""").show()

# Maintenance Analysis
print("\n🔧 Maintenance Analysis:")
spark.sql("""
    SELECT 
        s.region,
        COUNT(*) as total_sites,
        SUM(CASE WHEN m.maintenance_required THEN 1 ELSE 0 END) as maintenance_needed,
        ROUND(AVG(m.energy_efficiency_score), 2) as avg_energy_score,
        COUNT(DISTINCT m.firmware_version) as firmware_versions
    FROM local.db.telecom_sites s
    JOIN local.db.telecom_metrics m ON s.site_id = m.site_id
    WHERE m.maintenance_required IS NOT NULL
    GROUP BY s.region
    ORDER BY maintenance_needed DESC
""").show()

# Technology Performance Comparison
print("\n📡 Technology Performance Matrix:")
spark.sql("""
    SELECT 
        s.technology,
        COUNT(DISTINCT s.site_id) as sites,
        ROUND(AVG(m.throughput_mbps), 2) as avg_throughput,
        ROUND(AVG(m.network_quality_score), 2) as avg_quality,
        ROUND(AVG(m.jitter_ms), 2) as avg_jitter,
        ROUND(AVG(m.packet_loss_percent), 2) as avg_packet_loss,
        ROUND(AVG(m.coverage_radius_km), 2) as avg_coverage_radius
    FROM local.db.telecom_sites s
    JOIN local.db.telecom_metrics m ON s.site_id = m.site_id
    WHERE m.throughput_mbps IS NOT NULL
    GROUP BY s.technology
    ORDER BY avg_throughput DESC
""").show()

# 6. Schema History and Versioning
print("\n📜 Schema Evolution History:")
try:
    schema_history = spark.sql("SELECT * FROM local.db.telecom_metrics.history ORDER BY made_current_at DESC LIMIT 5")
    schema_history.show(truncate=False)
except Exception as e:
    print(f"   ⚠️ Schema history not available: {str(e)}")

# 7. Column Statistics
print("\n📈 Column Statistics:")
new_columns = [col for col in enhanced_enterprise_df.columns if col not in current_columns]
print(f"Added {len(new_columns)} new columns: {', '.join(new_columns)}")

# Show data completeness
spark.sql("""
    SELECT 
        'network_quality_score' as column_name,
        COUNT(*) as total_records,
        COUNT(network_quality_score) as non_null_records,
        ROUND(COUNT(network_quality_score) * 100.0 / COUNT(*), 2) as completeness_percent
    FROM local.db.telecom_metrics
    
    UNION ALL
    
    SELECT 
        'throughput_mbps' as column_name,
        COUNT(*) as total_records,
        COUNT(throughput_mbps) as non_null_records,
        ROUND(COUNT(throughput_mbps) * 100.0 / COUNT(*), 2) as completeness_percent
    FROM local.db.telecom_metrics
    
    UNION ALL
    
    SELECT 
        'is_5g_compatible' as column_name,
        COUNT(*) as total_records,
        COUNT(is_5g_compatible) as non_null_records,
        ROUND(COUNT(is_5g_compatible) * 100.0 / COUNT(*), 2) as completeness_percent
    FROM local.db.telecom_metrics
""").show()

print("\n✅ Advanced schema evolution completed!")
