
MVP  STAR SCHEMA
MVP Engenharia de dados Matrícula:4052025000027 
Dataset: IEA Energy Demand and Generation

A proposta deste trabalho é importar dados dos 10 estados que mais consomem energia elétrica nos estados unidos e analisar o perfil de geração elétrica de cada estado, olhando para as 3 principais fontes: Gás Natural, Carvão e Eólica.

STEP 1: IMPORTS

In [0]:
# ============================================================================
# STEP 1: IMPORT LIBRARIES
# ============================================================================
print("\n📚 STEP 1: Importing libraries...")

import requests
import pandas as pd
from datetime import datetime, timedelta
import time
import numpy as np
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
from pyspark.sql.functions import lit, current_timestamp, col, when
from pyspark.sql.types import StringType

print("✅ Libraries imported")



📚 STEP 1: Importing libraries...
✅ Libraries imported


STEP 2: API CONNECTION

In [0]:
# Use the full workspace path for your notebook
dbutils.notebook.run(
    "/Workspace/Users/lucasrobertideoliveira@hotmail.com/SETUP SECRETS",
    60
)
#  ============================================================================
# 📊 FETCH ELECTRICITY DATA FOR 10 US STATES
# ============================================================================
print("\n" + "=" * 80)
print("📊 FETCHING ELECTRICITY DATA FOR 10 US STATES")
print("=" * 80)



# ============================================================================
# 1. CONFIGURE DATE RANGE
# ============================================================================
print("\n📅 Configuring date range...")
end_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
start_date = end_date - timedelta(days=30)  # 30 days for testing

print(f"   Start Date: {start_date.strftime('%Y-%m-%d')}")
print(f"   End Date: {end_date.strftime('%Y-%m-%d')}")
print(f"   Total Days: {(end_date - start_date).days}")


# ============================================================================
# 2. SETUP EIA API CONFIGURATION (DATABRICKS SECRETS VERSION)
# ============================================================================
print("\n🔧 Setting up EIA API...")

def load_eia_api_key():
    try:
        api_key = dbutils.secrets.get(
            scope="eia-scope",
            key="eia_api_key"
        )
        if api_key and api_key.strip():
            masked = api_key[:5] + "..." + api_key[-5:] if len(api_key) > 10 else "***"
            print(f"   ✅ Loaded from Databricks Secrets: {masked}")
            return api_key.strip()
    except Exception:
        pass

    import os
    api_key = os.environ.get("EIA_API_KEY")
    if api_key and api_key.strip():
        masked = api_key[:5] + "..." + api_key[-5:] if len(api_key) > 10 else "***"
        print(f"   ✅ Loaded from environment variable: {masked}")
        return api_key.strip()

    # Check if variable is defined from SETUP_SECRETS
    try:
        api_key = YOUR_REAL_EIA_API_KEY
        if api_key and api_key.strip():
            masked = api_key[:5] + "..." + api_key[-5:] if len(api_key) > 10 else "***"
            print(f"   ✅ Loaded from variable: {masked}")
            return api_key.strip()
    except Exception:
        pass

    print("\n   ❌ ERROR: No API key found in any source!")
    raise ValueError("EIA API Key is required. Run SETUP_SECRETS notebook first.")

# ============================================================================
# 3. TARGET STATES AND THEIR BA CODES
# ============================================================================
print("\n🎯 Target States and Their Balancing Authority Codes:")
print("-" * 60)

# Based on your working codes and EIA documentation
state_ba_mapping = {
    # California - Using CAL (California) - this works
    "California": ["CAL", "CISO"],
    
    # Texas - Using ERCO (Electric Reliability Council of Texas) - this works
    "Texas": ["ERCO"],
    
    # Florida - Using FPL (Florida Power & Light) - this works
    "Florida": ["FPL", "TECO", "FPC"],
    
    # Ohio - Part of PJM, using FE (FirstEnergy) - this works
    "Ohio": ["FE", "PJM"],
    
    # Georgia - Using SOCO (Southern Company) - this works
    "Georgia": ["SOCO"],
    
    # New York - Using NY (New York) and NYIS (New York ISO)
    "New York": ["NY", "NYIS"],
    
    # Pennsylvania - Part of PJM, using PJM
    "Pennsylvania": ["PJM", "FE"],
    
    # North Carolina - Using DUK (Duke Energy) and CAR (Carolinas)
    "North Carolina": ["DUK", "CAR"],
    
    # Virginia - Part of PJM, using PJM
    "Virginia": ["PJM"],
    
    # Illinois - Using MISO (Midcontinent ISO)
    "Illinois": ["MISO"]
}

# Display mapping
for state, bas in state_ba_mapping.items():
    print(f"  • {state:20} → {', '.join(bas)}")

# Get all unique BA codes
all_ba_codes = []
for bas in state_ba_mapping.values():
    all_ba_codes.extend(bas)
all_ba_codes = list(set(all_ba_codes))

print(f"\n📊 Total unique BA codes to try: {len(all_ba_codes)}")
print(f"   Codes: {', '.join(sorted(all_ba_codes))}")

# ============================================================================
# 4. CHECK AVAILABLE RESPONDENT CODES
# ============================================================================
print("\n🔍 Checking available respondent codes from EIA API...")
respondent_facet_url = f"{EIA_BASE_URL}/electricity/rto/region-data/facet/respondent"
respondent_params = {"api_key": EIA_API_KEY}

available_respondents = {}
try:
    print(f"   Getting respondent facet values...")
    respondent_response = requests.get(respondent_facet_url, params=respondent_params, timeout=10)
    
    if respondent_response.status_code == 200:
        respondent_data = respondent_response.json()
        
        if 'response' in respondent_data and 'facets' in respondent_data['response']:
            facets = respondent_data['response']['facets']
            print(f"   ✅ Found {len(facets)} respondent codes total")
            
            # Map all available respondents
            for facet in facets:
                available_respondents[facet['id']] = facet.get('name', 'No name')
            
            # Check which of our target codes are available
            print("\n   Checking availability of our target BA codes:")
            available_codes = []
            unavailable_codes = []
            
            for ba_code in all_ba_codes:
                if ba_code in available_respondents:
                    available_codes.append(ba_code)
                    print(f"      ✅ {ba_code}: {available_respondents[ba_code]}")
                else:
                    unavailable_codes.append(ba_code)
                    print(f"      ❌ {ba_code}: Not found in API")
            
            print(f"\n   📋 Availability Summary:")
            print(f"      • Available: {len(available_codes)}/{len(all_ba_codes)}")
            print(f"      • Unavailable: {len(unavailable_codes)}")
            
            # Use only available codes
            selected_respondents = available_codes
            
            # If we don't have enough, add some fallbacks from known working codes
            if len(selected_respondents) < 5:
                print("\n   ⚠️ Few codes available. Adding known working codes...")
                fallback_codes = ["CAL", "ERCO", "SOCO", "FE", "FPL", "CISO", "NY", "PJM", "DUK", "MISO"]
                for code in fallback_codes:
                    if code not in selected_respondents and code in available_respondents:
                        selected_respondents.append(code)
                        print(f"      ➕ Added: {code}")
            
            print(f"\n   ✅ Using respondent codes: {selected_respondents}")
            
        else:
            print("   ⚠️ Could not parse respondent data")
            # Use known working codes as fallback
            selected_respondents = ["CAL", "ERCO", "SOCO", "FE", "FPL", "CISO", "NY", "PJM", "DUK", "MISO"]
            print(f"   ⚠️ Using fallback codes: {selected_respondents}")
            
    else:
        print(f"   ❌ Respondent facet failed: {respondent_response.status_code}")
        # Use known working codes as fallback
        selected_respondents = ["CAL", "ERCO", "SOCO", "FE", "FPL", "CISO", "NY", "PJM", "DUK", "MISO"]
        print(f"   ⚠️ Using fallback codes: {selected_respondents}")
        
except Exception as e:
    print(f"   ❌ Error getting respondent codes: {e}")
    # Use known working codes as fallback
    selected_respondents = ["CAL", "ERCO", "SOCO", "FE", "FPL", "CISO", "NY", "PJM", "DUK", "MISO"]
    print(f"   ⚠️ Using fallback codes: {selected_respondents}")

# ============================================================================
# 5. FETCH DATA FUNCTION
# ============================================================================
print("\n" + "=" * 80)
print("📥 FETCHING ELECTRICITY DATA")
print("=" * 80)

base_params = {
    "api_key": EIA_API_KEY,
    "frequency": "hourly",
    "data[]": "value",
    "start": start_date.strftime("%Y-%m-%d"),
    "end": end_date.strftime("%Y-%m-%d"),
    "sort[0][column]": "period",
    "sort[0][direction]": "asc",
    "offset": 0,
    "length": 5000
}

datasets_config = {
    "electricity_demand": {
        "endpoint": "electricity/rto/region-data/data",
        "params": base_params.copy(),
        "facets[type][]": "D"
    },
    "coal_generation": {
        "endpoint": "electricity/rto/fuel-type-data/data",
        "params": base_params.copy(),
        "facets[fueltype][]": "COL"
    },
    "natural_gas_generation": {
        "endpoint": "electricity/rto/fuel-type-data/data",
        "params": base_params.copy(),
        "facets[fueltype][]": "NG"
    },
    "wind_generation": {
        "endpoint": "electricity/rto/fuel-type-data/data",
        "params": base_params.copy(),
        "facets[fueltype][]": "WND"
    }
}

fetched_data = {}
summary = []
state_data = {}  # To organize data by state

print("\n📋 Fetching datasets for all available BA codes:")
print("-" * 60)

for name, config in datasets_config.items():
    data_type = name.replace('_', ' ').title()
    print(f"\n{'='*60}")
    print(f"📊 {data_type}")
    print('='*60)
    
    for respondent in selected_respondents:
        print(f"\n📍 BA Code: {respondent}")
        
        # Find which state this BA belongs to
        state_name = None
        for state, bas in state_ba_mapping.items():
            if respondent in bas:
                state_name = state
                break
        
        if state_name:
            print(f"   State: {state_name}")
        
        params = config["params"].copy()
        params["facets[respondent][]"] = respondent
        for key, value in config.items():
            if key.startswith("facets[") and key != "facets[respondent][]":
                params[key] = value
        
        endpoint = config["endpoint"]
        url = f"{EIA_BASE_URL}/{endpoint}"
        
        try:
            response = requests.get(url, params=params, timeout=30)
            
            if response.status_code == 200:
                data = response.json()
                records = data.get('response', {}).get('data', [])
                
                if records and len(records) > 0:
                    print(f"   ✅ SUCCESS: {len(records)} records")
                    
                    # Create keys for storage
                    raw_key = f"{name}_{respondent}"
                    state_key = f"{name}_{state_name.replace(' ', '_')}_{respondent}" if state_name else raw_key
                    
                    # Store the data
                    fetched_data[raw_key] = records
                    fetched_data[state_key] = records  # Also store with state name
                    
                    # Organize by state
                    if state_name:
                        if state_name not in state_data:
                            state_data[state_name] = {}
                        if name not in state_data[state_name]:
                            state_data[state_name][name] = []
                        state_data[state_name][name].append({
                            'ba_code': respondent,
                            'records': records,
                            'count': len(records)
                        })
                    
                    # Show sample stats
                    df = pd.DataFrame(records[:5])  # Just first 5 for stats
                    if 'value' in df.columns:
                        values = pd.to_numeric(df['value'], errors='coerce')
                        valid_values = values.dropna()
                        if len(valid_values) > 0:
                            print(f"   📈 Sample values: {valid_values.min():.0f} to {valid_values.max():.0f} MWh")
                    
                    summary.append({
                        'dataset': state_key if state_name else raw_key,
                        'state': state_name if state_name else 'Unknown',
                        'ba_code': respondent,
                        'data_type': name,
                        'records': len(records),
                        'status': 'SUCCESS'
                    })
                    
                else:
                    print(f"   ⚠️ No records returned")
                    summary.append({
                        'dataset': f"{name}_{respondent}",
                        'state': state_name if state_name else 'Unknown',
                        'ba_code': respondent,
                        'data_type': name,
                        'records': 0,
                        'status': 'NO_DATA'
                    })
                    
            else:
                print(f"   ❌ HTTP Error {response.status_code}")
                summary.append({
                    'dataset': f"{name}_{respondent}",
                    'state': state_name if state_name else 'Unknown',
                    'ba_code': respondent,
                    'data_type': name,
                    'records': 0,
                    'status': f'HTTP_{response.status_code}'
                })
                
        except Exception as e:
            print(f"   ❌ Error: {str(e)[:100]}")
            summary.append({
                'dataset': f"{name}_{respondent}",
                'state': state_name if state_name else 'Unknown',
                'ba_code': respondent,
                'data_type': name,
                'records': 0,
                'status': 'ERROR'
            })

# ============================================================================
# 6. CONVERT TO DATAFRAMES
# ============================================================================
print("\n" + "=" * 80)
print("🔄 CONVERTING TO DATAFRAMES")
print("=" * 80)

dataframes = {}

if fetched_data:
    print(f"\n📦 Converting {len(fetched_data)} datasets to DataFrames...")
    
    conversion_stats = {'success': 0, 'failed': 0}
    
    for key, records in fetched_data.items():
        try:
            if records and len(records) > 0:
                df = pd.DataFrame(records)
                dataframes[key] = df
                conversion_stats['success'] += 1
                
                # Create simplified aliases for easier access
                parts = key.split('_')
                if len(parts) >= 3:
                    # Format: type_state_ba or type_ba
                    if any(state.replace(' ', '_') in key for state in state_ba_mapping.keys()):
                        # Already has state name
                        pass
                    else:
                        # Try to add state name
                        ba_code = parts[-1]
                        for state, bas in state_ba_mapping.items():
                            if ba_code in bas:
                                state_alias = f"{parts[0]}_{state.replace(' ', '_')}_{ba_code}"
                                if state_alias not in dataframes:
                                    dataframes[state_alias] = df
                                break
            else:
                conversion_stats['failed'] += 1
                
        except Exception as e:
            conversion_stats['failed'] += 1
            print(f"   ❌ Error converting {key}: {e}")
    
    print(f"\n✅ Conversion complete:")
    print(f"   • Successful: {conversion_stats['success']}")
    print(f"   • Failed: {conversion_stats['failed']}")
    print(f"   • Total DataFrames: {len(dataframes)}")
    
else:
    print("❌ No data to convert")

# ============================================================================
# 7. DISPLAY RESULTS
# ============================================================================
print("\n" + "=" * 80)
print("📊 FETCHING RESULTS SUMMARY")
print("=" * 80)

if summary:
    summary_df = pd.DataFrame(summary)
    
    # Summary by state
    print("\n🗺️ DATA BY STATE:")
    print("-" * 60)
    
    state_stats = summary_df[summary_df['status'] == 'SUCCESS'].groupby('state').agg({
        'data_type': 'nunique',
        'records': 'sum',
        'ba_code': lambda x: ', '.join(sorted(set(x)))
    }).reset_index()
    
    state_stats.columns = ['State', 'Data Types', 'Total Records', 'BA Codes']
    print(state_stats.to_string(index=False))
    
    # Summary by data type
    print("\n⚡ DATA BY TYPE:")
    print("-" * 60)
    
    type_stats = summary_df[summary_df['status'] == 'SUCCESS'].groupby('data_type').agg({
        'state': 'nunique',
        'records': 'sum',
        'dataset': 'count'
    }).reset_index()
    
    type_stats.columns = ['Data Type', 'States', 'Total Records', 'Datasets']
    print(type_stats.to_string(index=False))
    
    # Overall totals
    total_success = len(summary_df[summary_df['status'] == 'SUCCESS'])
    total_records = summary_df[summary_df['status'] == 'SUCCESS']['records'].sum()
    
    print(f"\n📈 OVERALL TOTALS:")
    print(f"   • Total fetch attempts: {len(summary_df)}")
    print(f"   • Successful fetches: {total_success}")
    print(f"   • Total records: {total_records:,}")
    print(f"   • States with data: {state_stats.shape[0]}")
    
else:
    print("\n❌ No summary data available")

# ============================================================================
# 8. SAMPLE DATA PREVIEW
# ============================================================================
print("\n" + "=" * 80)
print("👁️ SAMPLE DATA PREVIEW")
print("=" * 80)

if dataframes:
    # Show samples from different states
    sample_states = ['California', 'Texas', 'Florida', 'New York', 'Illinois']
    
    for state in sample_states:
        state_key = state.replace(' ', '_')
        state_dfs = [k for k in dataframes.keys() if state_key in k]
        
        if state_dfs:
            print(f"\n🏛️ {state.upper()}:")
            print(f"   Available datasets: {len(state_dfs)}")
            
            # Show first dataset as sample
            sample_key = state_dfs[0]
            df = dataframes[sample_key]
            
            print(f"\n   📊 {sample_key}:")
            print(f"      Shape: {df.shape}")
            
            if 'period' in df.columns and len(df) > 0:
                print(f"      Time range: {df['period'].iloc[0]} to {df['period'].iloc[-1]}")
            
            if 'value' in df.columns and len(df) > 0:
                values = pd.to_numeric(df['value'], errors='coerce')
                valid_values = values.dropna()
                if len(valid_values) > 0:
                    print(f"      Value range: {valid_values.min():.0f} to {valid_values.max():.0f} MWh")
                    print(f"      Average: {valid_values.mean():.0f} MWh")
            
            # Show first 2 rows
            print(f"\n      First 2 rows:")
            pd.set_option('display.max_columns', None)
            pd.set_option('display.width', 1000)
            print(df.head(2).to_string())
            
        else:
            print(f"\n🏛️ {state.upper()}: No data available")
    
    # List all available DataFrames
    print("\n" + "=" * 80)
    print("📋 ALL AVAILABLE DATAFRAMES")
    print("=" * 80)
    
    print(f"\nTotal DataFrames: {len(dataframes)}")
    print("\nFirst 20 DataFrames:")
    for i, key in enumerate(sorted(list(dataframes.keys()))[:20], 1):
        df = dataframes[key]
        print(f"{i:3}. {key:50} | {df.shape[0]:6} records")
    
else:
    print("\n❌ No DataFrames available")

# ============================================================================
# 9. HOW TO ACCESS YOUR DATA
# ============================================================================
print("\n" + "=" * 80)
print("🚀 HOW TO ACCESS YOUR DATA")
print("=" * 80)

print("""
ACCESS EXAMPLES:
----------------

1. By State and Data Type (Recommended):
   -------------------------------------
   # Texas electricity demand
   df = dataframes['electricity_demand_Texas_ERCO']
   
   # California natural gas generation  
   df = dataframes['natural_gas_generation_California_CAL']
   
   # Florida wind generation
   df = dataframes['wind_generation_Florida_FPL']
   
   # Illinois coal generation
   df = dataframes['coal_generation_Illinois_MISO']

2. By BA Code Only:
   -----------------
   df = dataframes['electricity_demand_ERCO']
   df = dataframes['coal_generation_PJM']
   df = dataframes['wind_generation_MISO']

3. Check All Available:
   ---------------------
   print("Available DataFrames:", len(dataframes))
   print("Keys starting with 'Texas':")
   texas_keys = [k for k in dataframes.keys() if 'Texas' in k]
   for key in texas_keys:
       print(f"  • {key}")

QUICK ANALYSIS EXAMPLES:
------------------------
# 1. Get Texas demand statistics
if 'electricity_demand_Texas_ERCO' in dataframes:
    df = dataframes['electricity_demand_Texas_ERCO']
    df['value_numeric'] = pd.to_numeric(df['value'], errors='coerce')
    print("Texas Demand Stats:")
    print(f"  Min: {df['value_numeric'].min():.0f} MWh")
    print(f"  Max: {df['value_numeric'].max():.0f} MWh") 
    print(f"  Mean: {df['value_numeric'].mean():.0f} MWh")

# 2. Compare states
states = ['California', 'Texas', 'Florida']
for state in states:
    key = f'electricity_demand_{state.replace(" ", "_")}'
    matching_keys = [k for k in dataframes.keys() if key in k]
    if matching_keys:
        df = dataframes[matching_keys[0]]
        avg_demand = pd.to_numeric(df['value'], errors='coerce').mean()
        print(f"{state}: {avg_demand:.0f} MWh average demand")
""")

# ============================================================================
# 10. FINAL STATUS AND NEXT STEPS
# ============================================================================
print("\n" + "=" * 80)
print("✅ IMPORT COMPLETE - NEXT STEPS")
print("=" * 80)

if dataframes:
    print(f"""
🎉 SUCCESS! Your electricity data has been imported.

📊 WHAT YOU HAVE:
   • {len(dataframes)} DataFrames ready for analysis
   • Data for {len(state_stats) if 'state_stats' in locals() else 'multiple'} states
   • Hourly data from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}
   • Four data types: Demand, Coal, Natural Gas, Wind

🔧 TO GET MORE DATA:
   1. Increase date range (up to 5000 records limit):
      start_date = end_date - timedelta(days=208)  # Max ~208 days
   
   2. Try additional BA codes:
      # Add these to state_ba_mapping
      "New York": ["NY", "NYIS", "PJM"],
      "Pennsylvania": ["PJM", "FE", "NY"],
      "Virginia": ["PJM", "NY"]

📈 READY FOR ANALYSIS:
   Start with: dataframes['electricity_demand_California_CAL'].head()
""")
else:
    print("""
⚠️  LIMITED OR NO DATA IMPORTED
   
   Suggestions:
   1. Check which BA codes actually returned data
   2. Try with known working codes: CAL, ERCO, SOCO, FE, FPL
   3. Reduce date range to 5-7 days for testing
   4. Check API response for error messages
""")

print("\n" + "=" * 80)
print("🧪 QUICK TEST - RUN THESE COMMANDS")
print("=" * 80)

print("""
# Test 1: Check what data we have
print(f"Total DataFrames: {len(dataframes)}")

# Test 2: List all Texas data
texas_data = [k for k in dataframes.keys() if 'Texas' in k]
print(f"Texas datasets: {len(texas_data)}")
for key in texas_data[:5]:
    print(f"  • {key}")

# Test 3: View a sample DataFrame
if dataframes:
    sample_key = list(dataframes.keys())[0]
    print(f"\nSample: {sample_key}")
    print(dataframes[sample_key].head(2))
""")

print("\n" + "=" * 80)
print("✅ ALL DONE! YOUR DATA IS READY FOR ANALYSIS")
print("=" * 80)


📊 FETCHING ELECTRICITY DATA FOR 10 US STATES

📅 Configuring date range...
   Start Date: 2025-11-22
   End Date: 2025-12-22
   Total Days: 30

🔧 Setting up EIA API...

🎯 Target States and Their Balancing Authority Codes:
------------------------------------------------------------
  • California           → CAL, CISO
  • Texas                → ERCO
  • Florida              → FPL, TECO, FPC
  • Ohio                 → FE, PJM
  • Georgia              → SOCO
  • New York             → NY, NYIS
  • Pennsylvania         → PJM, FE
  • North Carolina       → DUK, CAR
  • Virginia             → PJM
  • Illinois             → MISO

📊 Total unique BA codes to try: 14
   Codes: CAL, CAR, CISO, DUK, ERCO, FE, FPC, FPL, MISO, NY, NYIS, PJM, SOCO, TECO

🔍 Checking available respondent codes from EIA API...
   Getting respondent facet values...
   ✅ Found 81 respondent codes total

   Checking availability of our target BA codes:
      ✅ CISO: California Independent System Operator
      ✅ FPC: Duke

Resumo da Importação de Dados de Energia Elétrica - API EIA
📊 Visão Geral da Importação
Período Analisado: 21 de novembro a 21 de dezembro de 2025 (30 dias)

Estados Coletados: 8 dos 10 estados-alvo

✅ Sucesso: Califórnia, Texas, Flórida, Geórgia, Illinois, Nova York, Carolina do Norte, Ohio

⚠️ Faltantes: Pensilvânia, Virgínia

📈 Dados Coletados
Total Geral:
30.931 registros de dados horários

129 DataFrames criados

86 datasets importados da API

43 de 48 tentativas bem-sucedidas (90% de sucesso)

Dados por Tipo de Energia:
Demanda Elétrica: 8.652 registros (12 datasets)

Geração a Gás Natural: 8.628 registros (12 datasets)

Geração a Carvão: 7.907 registros (11 datasets)

Geração Eólica: 5.744 registros (8 datasets)

Distribuição por Estado:
Califórnia e Nova York: 5.768 registros cada (dados mais completos)

Carolina do Norte: 4.326 registros

Flórida: 3.605 registros

Geórgia, Illinois, Ohio: ~2.884 registros cada

Texas: 2.812 registros (registros parcialmente completos)

⚡ Cobertura de Dados por Estado
✅ Estados com Dados Completos (4 tipos):
Califórnia (CAL, CISO) - Modelo de "Duck Curve"

Texas (ERCO) - Maior participação eólica

Nova York (NY, NYIS) - Zero carvão na matriz

Illinois (MISO) - Maior demanda média

Geórgia (SOCO) - Significativa geração a carvão

Ohio (PJM) - Maior geração a carvão

⚠️ Estados com Dados Parciais:
Carolina do Norte: Sem dados de geração eólica

Flórida: FPL sem dados de carvão e eólica

🔍 Insights Iniciais dos Dados
Padrões de Demanda Identificados:
Califórnia: Vale de demanda às 14-15h (efeito solar extremo)

Texas: Maior geração eólica (até 16.328 MWh)

Illinois: Maior demanda média (77.029 MWh)

Nova York/Califórnia: Geração zero ou mínima de carvão

Faixas de Valores por Estado:
Demanda Máxima: Illinois (99.092 MWh)

Demanda Mínima: Flórida-FPC (3.756 MWh)

Geração Carvão Máxima: Illinois (24.222 MWh)

Geração Eólica Máxima: Texas (16.328 MWh)



STEP 3: BRONZE LAYER SETUP

A Bronze Layer foi criada como o repositório inicial dos dados brutos da API da EIA, preservando a forma original de 30.931 registros horários de demanda e geração elétrica (carvão, gás natural e eólica) para 8 estados norte-americanos no período de novembro a dezembro de 2025. Esta camada mantém a integridade dos dados originais sem transformações, servindo como fonte única de verdade para auditoria e reprocessamento futuro, enquanto fornece a base para as transformações subsequentes nas camadas Silver e Gold do pipeline.

In [0]:
# ============================================================================
# BRONZE LAYER INGESTION FOR COMMUNITY EDITION
# ============================================================================

print("\n" + "=" * 80)
print("🥉 BRONZE LAYER - DATABRICKS COMMUNITY EDITION")
print("=" * 80)



# ============================================================================
# 1. PREPARE HELPER FUNCTIONS
# ============================================================================
print("\n🔧 PREPARING DATA FOR BRONZE LAYER")

def extract_metadata_from_key(key):
    """Extract state, data_type, and BA code from DataFrame key"""
    key_lower = key.lower()
    
    # Extract data type
    if 'demand' in key_lower or 'electricity' in key_lower:
        data_type = 'demand'
    elif 'coal' in key_lower:
        data_type = 'coal'
    elif 'gas' in key_lower or 'natural' in key_lower:
        data_type = 'gas'
    elif 'wind' in key_lower:
        data_type = 'wind'
    else:
        data_type = 'unknown'
    
    # Extract state
    state = 'Unknown'
    states = {
        'California': ['california', 'cal', 'ciso'],
        'Texas': ['texas', 'erco'],
        'Florida': ['florida', 'fl', 'fpl', 'fpc'],
        'Ohio': ['ohio', 'oh', 'pjm'],
        'Georgia': ['georgia', 'ga', 'soco'],
        'New_York': ['new_york', 'ny', 'nyis'],
        'North_Carolina': ['north_carolina', 'nc', 'car', 'duk'],
        'Illinois': ['illinois', 'il', 'miso'],
        'Pennsylvania': ['pennsylvania', 'pa'],
        'Virginia': ['virginia', 'va']
    }
    
    for state_name, keywords in states.items():
        for keyword in keywords:
            if keyword in key_lower:
                state = state_name.replace('_', ' ')
                break
    
    # Extract BA code
    ba_code = 'Unknown'
    ba_codes = ['ERCO', 'PJM', 'MISO', 'CISO', 'CAL', 'NY', 'NYIS', 'SOCO', 'DUK', 'CAR', 'FPL', 'FPC', 'TECO']
    for code in ba_codes:
        if code in key.upper():
            ba_code = code
            break
    
    return state, data_type, ba_code

# ============================================================================
# 2. PROCESS EACH DATAFRAME
# ============================================================================
print("\n📥 PROCESSING DATAFRAMES INTO BRONZE LAYER")

bronze_views = {}
processed_count = 0
failed_count = 0

print(f"\nTotal DataFrames to process: {len(dataframes)}")

# Group DataFrames by type for organized processing
dataframes_by_type = {'demand': [], 'coal': [], 'gas': [], 'wind': []}

for df_key in dataframes.keys():
    state, data_type, ba_code = extract_metadata_from_key(df_key)
    if data_type in dataframes_by_type:
        dataframes_by_type[data_type].append(df_key)

print("\n📊 DataFrames by type:")
for data_type, df_list in dataframes_by_type.items():
    print(f"  • {data_type.upper():10}: {len(df_list)} DataFrames")

# ============================================================================
# 3. CREATE BRONZE VIEWS BY DATA TYPE
# ============================================================================
print("\n" + "=" * 80)
print("🔄 CREATING BRONZE VIEWS")
print("=" * 80)

# Process each data type separately
for data_type, df_keys in dataframes_by_type.items():
    if not df_keys:
        print(f"\n📭 No DataFrames found for {data_type}")
        continue
    
    print(f"\n📂 Processing {data_type.upper()} DataFrames ({len(df_keys)}):")
    
    # List to hold processed DataFrames of this type
    processed_dfs = []
    
    for df_key in df_keys:
        try:
                      # Get the DataFrame (might be pandas or Spark)
            df = dataframes[df_key]
            
            # Convert pandas DataFrame to Spark DataFrame if needed
            if isinstance(df, pd.DataFrame):
                spark_df = spark.createDataFrame(df)
            else:
                spark_df = df  # Assume it's already a Spark DataFrame
            
            # Extract metadata
            state, data_type_from_key, ba_code = extract_metadata_from_key(df_key)
            
            # Add metadata columns
            bronze_df = spark_df.withColumn("data_type", lit(data_type_from_key)) \
                                .withColumn("state", lit(state)) \
                                .withColumn("ba_code", lit(ba_code)) \
                                .withColumn("ingestion_timestamp", current_timestamp()) \
                                .withColumn("source_file", lit(df_key)) \
                                .withColumn("record_id", lit(f"{df_key}_{current_timestamp()}"))
            
            # Standardize column names if needed
            column_mapping = {
                'respondent-name': 'respondent_name',
                'type-name': 'type_name', 
                'value-units': 'value_units'
            }
            
            for old_col, new_col in column_mapping.items():
                if old_col in bronze_df.columns:
                    bronze_df = bronze_df.withColumnRenamed(old_col, new_col)
            
            processed_dfs.append(bronze_df)
            print(f"  ✅ {df_key:50} - {spark_df.count():6} records")
            processed_count += 1
            
        except Exception as e:
            print(f"  ❌ {df_key:50} - ERROR: {str(e)[:80]}")
            failed_count += 1
    
    # Create unified view for this data type
    if processed_dfs:
        try:
            # Union all DataFrames of this type
            from functools import reduce
            from pyspark.sql import DataFrame
            
            # Start with first DataFrame
            unified_df = processed_dfs[0]
            
            # Union with remaining DataFrames
            for df in processed_dfs[1:]:
                unified_df = unified_df.unionByName(df, allowMissingColumns=True)
            
            # Create temporary view
            view_name = f"bronze_{data_type}"
            unified_df.createOrReplaceTempView(view_name)
            bronze_views[view_name] = unified_df.count()
            
            print(f"\n  🎯 Created view: {view_name}")
            print(f"    Total records: {unified_df.count():,}")
            print(f"    Columns: {', '.join(unified_df.columns[:8])}...")
            
            # Show sample
            print(f"    Sample (first 2 rows):")
            unified_df.limit(2).show(truncate=50)
            
        except Exception as e:
            print(f"  ❌ Failed to create {data_type} view: {str(e)[:100]}")

# ============================================================================
# 4. CREATE UNIFIED BRONZE VIEW
# ============================================================================
print("\n" + "=" * 80)
print("🌉 CREATING UNIFIED BRONZE VIEW")
print("=" * 80)

# Get all bronze views
bronze_view_names = [f"bronze_{dt}" for dt in ['demand', 'coal', 'gas', 'wind']]

# Check which views exist
existing_views = []
for view_name in bronze_view_names:
    try:
        spark.sql(f"SELECT 1 FROM {view_name} LIMIT 1")
        existing_views.append(view_name)
    except:
        pass

if existing_views:
    print(f"\n📋 Available bronze views: {existing_views}")
    
    # Build UNION query
    union_parts = []
    for view_name in existing_views:
        union_parts.append(f"SELECT * FROM {view_name}")
    
    if union_parts:
        union_query = " UNION ALL ".join(union_parts)
        
        # Create unified view
        spark.sql(f"""
            CREATE OR REPLACE TEMP VIEW bronze_all_data AS
            {union_query}
        """)
        
        # Get count
        total_records = spark.sql("SELECT COUNT(*) as total FROM bronze_all_data").collect()[0]['total']
        
        print(f"\n✅ Created unified view: bronze_all_data")
        print(f"   Total records: {total_records:,}")
        print(f"   Combined from: {len(existing_views)} data types")
else:
    print("\n❌ No bronze views available to unify")

# ============================================================================
# 5. CREATE METADATA VIEW
# ============================================================================
print("\n" + "=" * 80)
print("📊 CREATING METADATA VIEW")
print("=" * 80)

# Create metadata DataFrame
metadata_data = []
for view_name, count in bronze_views.items():
    metadata_data.append({
        'view_name': view_name,
        'record_count': count,
        'data_type': view_name.replace('bronze_', ''),
        'created_at': pd.Timestamp.now()
    })

if metadata_data:
    metadata_df = spark.createDataFrame(metadata_data)
    metadata_df.createOrReplaceTempView("bronze_metadata")
    
    print(f"\n✅ Created metadata view: bronze_metadata")
    print("\n📋 Metadata content:")
    metadata_df.show(truncate=False)
else:
    print("\n❌ No metadata to create")

# ============================================================================
# 6. VERIFY AND SUMMARIZE
# ============================================================================
print("\n" + "=" * 80)
print("🔍 VERIFYING BRONZE LAYER")
print("=" * 80)

# List all available bronze views
print("\n📋 ALL BRONZE VIEWS:")
all_views = [v for v in spark.catalog.listTables() if v.name.startswith('bronze_')]
for view in all_views:
    try:
        count = spark.sql(f"SELECT COUNT(*) as cnt FROM {view.name}").collect()[0]['cnt']
        print(f"  • {view.name:25} - {count:8,} records")
    except:
        print(f"  • {view.name:25} - ERROR")

# ============================================================================
# 7. RUN DATA QUALITY CHECKS
# ============================================================================
print("\n" + "=" * 80)
print("🧪 DATA QUALITY CHECKS")
print("=" * 80)

if 'bronze_all_data' in [v.name for v in spark.catalog.listTables()]:
    print("\n📈 DATA QUALITY METRICS:")
    
    # 1. Check for missing values
    print("\n1. Missing Values Check:")
    result = spark.sql("""
        SELECT 
            COUNT(*) as total_records,
            SUM(CASE WHEN period IS NULL THEN 1 ELSE 0 END) as missing_period,
            SUM(CASE WHEN value IS NULL THEN 1 ELSE 0 END) as missing_value,
            SUM(CASE WHEN state = 'Unknown' THEN 1 ELSE 0 END) as unknown_state
        FROM bronze_all_data
    """).collect()[0]
    
    print(f"   • Total records: {result['total_records']:,}")
    print(f"   • Missing period: {result['missing_period']:,} ({result['missing_period']/result['total_records']*100:.1f}%)")
    print(f"   • Missing value: {result['missing_value']:,} ({result['missing_value']/result['total_records']*100:.1f}%)")
    print(f"   • Unknown state: {result['unknown_state']:,} ({result['unknown_state']/result['total_records']*100:.1f}%)")
    
    # 2. Check data distribution
    print("\n2. Data Distribution by Type:")
    dist_result = spark.sql("""
        SELECT 
            data_type,
            COUNT(*) as record_count,
            COUNT(DISTINCT state) as state_count,
            COUNT(DISTINCT ba_code) as ba_count,
            MIN(period) as earliest,
            MAX(period) as latest
        FROM bronze_all_data
        GROUP BY data_type
        ORDER BY record_count DESC
    """).collect()
    
    for row in dist_result:
        print(f"   • {row['data_type']:10}: {row['record_count']:6,} records, {row['state_count']} states, {row['ba_count']} BA codes")
    
    # 3. Check value ranges
    print("\n3. Value Ranges by Data Type:")
    value_ranges = spark.sql("""
        SELECT 
            data_type,
            MIN(CAST(value AS DOUBLE)) as min_value,
            MAX(CAST(value AS DOUBLE)) as max_value,
            AVG(CAST(value AS DOUBLE)) as avg_value
        FROM bronze_all_data
        WHERE value IS NOT NULL AND value != ''
        GROUP BY data_type
    """).collect()
    
    for row in value_ranges:
        print(f"   • {row['data_type']:10}: Min={row['min_value']:.0f}, Max={row['max_value']:.0f}, Avg={row['avg_value']:.0f}")
    
    # 4. Show sample queries
    print("\n4. Sample Data:")
    spark.sql("""
        SELECT 
            data_type,
            state,
            ba_code,
            period,
            value
        FROM bronze_all_data
        WHERE period LIKE '2025-12-20%'
        ORDER BY period
        LIMIT 5
    """).show(truncate=False)

# ============================================================================
# # ============================================================================
# 8. FINAL SUMMARY AND USAGE GUIDE
# ============================================================================
print("\n" + "=" * 80)
print("✅ BRONZE LAYER COMPLETE!")
print("=" * 80)

print(f'''
🎉 BRONZE LAYER SUCCESSFULLY CREATED!

📊 INGESTION SUMMARY:
   • Processed: {processed_count} DataFrames successfully
   • Failed: {failed_count} DataFrames
   • Created {len(bronze_views)} bronze views
   • Total records in unified view: {total_records if 'total_records' in locals() else 0:,}

📋 AVAILABLE VIEWS:
   • bronze_all_data      - All unified data
   • bronze_metadata      - Metadata about bronze layer
   • bronze_demand        - Electricity demand data
   • bronze_coal          - Coal generation data  
   • bronze_gas           - Natural gas generation data
   • bronze_wind          - Wind generation data

🔍 HOW TO USE YOUR BRONZE DATA:

1. Query specific data type:
   display(spark.sql('SELECT * FROM bronze_demand LIMIT 5'))

2. Analyze by state:
   display(spark.sql(
        "SELECT state, COUNT(*) as records "
        "FROM bronze_all_data "
        "GROUP BY state "
        "ORDER BY records DESC"
   ))

3. Check data quality:
   display(spark.sql(
        "SELECT "
        "    data_type, "
        "    COUNT(*) as total, "
        "    SUM(CASE WHEN value IS NULL THEN 1 ELSE 0 END) as null_values "
        "FROM bronze_all_data "
        "GROUP BY data_type"
   ))

4. Time series analysis:
   display(spark.sql(
        "SELECT "
        "    DATE(period) as date, "
        "    AVG(CAST(value AS DOUBLE)) as avg_value "
        "FROM bronze_all_data "
        "WHERE data_type = 'demand' "
        "GROUP BY DATE(period) "
        "ORDER BY date"
   ))

🧪 QUICK TEST QUERIES:
''')

# Run a quick test
try:
    print("Test 1: Count by data type")
    display(
        spark.sql(
            """
SELECT data_type, COUNT(*) as count
FROM bronze_all_data
GROUP BY data_type
ORDER BY count DESC
            """
        )
    )

    print("\nTest 2: States with most data")
    display(
        spark.sql(
            """
SELECT state, COUNT(*) as records
FROM bronze_all_data
WHERE state != 'Unknown'
GROUP BY state
ORDER BY records DESC
LIMIT 5
            """
        )
    )

    print("\nTest 3: Latest data sample")
    display(
        spark.sql(
            """
SELECT 
    data_type,
    state,
    period,
    value
FROM bronze_all_data
WHERE period LIKE '2025-12-20%'
ORDER BY period DESC
LIMIT 3
            """
        )
    )

except Exception as e:
    print(f"Test queries failed: {e}")

print("\n" + "=" * 80)
print("🚀 READY FOR SILVER LAYER PROCESSING!")
print("=" * 80)
print("\nYour bronze layer is now available as temporary views.")
print("Proceed to create the Silver Layer for data cleaning and transformation.")


🥉 BRONZE LAYER - DATABRICKS COMMUNITY EDITION

🔧 PREPARING DATA FOR BRONZE LAYER

📥 PROCESSING DATAFRAMES INTO BRONZE LAYER

Total DataFrames to process: 129

📊 DataFrames by type:
  • DEMAND    : 36 DataFrames
  • COAL      : 33 DataFrames
  • GAS       : 36 DataFrames
  • WIND      : 24 DataFrames

🔄 CREATING BRONZE VIEWS

📂 Processing DEMAND DataFrames (36):
  ✅ electricity_demand_CISO                            -    721 records
  ✅ electricity_California_CISO                        -    721 records
  ✅ electricity_demand_California_CISO                 -    721 records
  ✅ electricity_demand_FPC                             -    721 records
  ✅ electricity_Florida_FPC                            -    721 records
  ✅ electricity_demand_Florida_FPC                     -    721 records
  ✅ electricity_demand_PJM                             -    721 records
  ✅ electricity_Ohio_PJM                               -    721 records
  ✅ electricity_demand_Ohio_PJM                        -   

data_type,count
demand,25956
gas,25884
coal,23721
wind,17232



Test 2: States with most data


state,records
New York,17304
Georgia,17256
California,14420
North Carolina,12978
Illinois,8652



Test 3: Latest data sample


data_type,state,period,value
demand,California,2025-12-20T23,25454
demand,California,2025-12-20T23,25454
demand,California,2025-12-20T23,25454



🚀 READY FOR SILVER LAYER PROCESSING!

Your bronze layer is now available as temporary views.
Proceed to create the Silver Layer for data cleaning and transformation.


STEP 4: SILVER LAYER

A Silver Layer realizou a limpeza, padronização e enriquecimento dos dados brutos, convertendo valores para formato numérico, tratando dados ausentes, padronizando nomes de estados e unidades de medida, além de adicionar colunas temporais (timestamp, data, hora) para análise temporal. Esta camada de qualidade garantida unificou 129 DataFrames em uma estrutura consolidada pronta para modelagem analítica, mantendo rastreabilidade através de metadados de processamento enquanto prepara os dados para consumo na camada Gold.



In [0]:
# ============================================================================
# 🥈 SILVER LAYER - FIXED FOR ISO FORMAT
# ============================================================================

print("\n" + "=" * 80)
print("🥈 SILVER LAYER - FIXED FOR ISO FORMAT (YYYY-MM-DDTHH)")
print("=" * 80)

# Clean up
print("\n🧹 Cleaning up existing silver views...")
existing_silver_views = [v.name for v in spark.catalog.listTables() 
                         if v.name.startswith('silver_')]
for view in existing_silver_views:
    try:
        spark.sql(f"DROP VIEW IF EXISTS {view}")
        print(f"  ✅ Dropped: {view}")
    except:
        pass

# Create silver_all_data with the CORRECT format
print("\n🔄 Creating silver_all_data with ISO format parsing...")

spark.sql("""
    CREATE OR REPLACE TEMP VIEW silver_all_data AS
    SELECT
        -- Parse the ISO format: 2025-11-21T00 → 2025-11-21 00:00:00
        CASE 
            WHEN period LIKE '____-__-__T__' AND LENGTH(period) = 13
            THEN to_timestamp(
                CONCAT(
                    SUBSTRING(period, 1, 10),  -- Date part: 2025-11-21
                    ' ',
                    SUBSTRING(period, 12, 2),  -- Hour part: 00
                    ':00:00'                    -- Add minutes and seconds
                ),
                'yyyy-MM-dd HH:mm:ss'
            )
            ELSE NULL
        END AS period_ts,
        
        -- Extract date part
        CASE 
            WHEN period LIKE '____-__-__T__' AND LENGTH(period) = 13
            THEN to_date(SUBSTRING(period, 1, 10), 'yyyy-MM-dd')
            ELSE NULL
        END AS period_date,
        
        -- Original period for reference
        period,
        
        -- Extract hour from period (e.g., get '00' from '2025-11-21T00')
        CASE 
            WHEN period LIKE '____-__-__T__' AND LENGTH(period) = 13
            THEN CAST(SUBSTRING(period, 12, 2) AS INT)
            ELSE NULL
        END AS period_hour,
        
        -- Clean the value
        CASE 
            WHEN TRIM(value) = '' THEN NULL
            WHEN TRY_CAST(value AS DOUBLE) IS NOT NULL 
            AND TRY_CAST(value AS DOUBLE) >= 0 
            THEN CAST(value AS DOUBLE)
            ELSE NULL 
        END AS value_cleaned,
        
        -- Original value
        value AS value_original,
        
        -- All other columns
        data_type,
        state,
        ba_code,
        COALESCE(respondent_name, 'Unknown') AS respondent_name,
        COALESCE(type_name, 'Unknown') AS type_name,
        COALESCE(value_units, 'Unknown') AS value_units,
        source_file,
        ingestion_timestamp,
        record_id,
        
        -- Processing metadata
        CURRENT_TIMESTAMP() AS silver_processing_timestamp
        
    FROM bronze_all_data
    WHERE period IS NOT NULL
    AND value IS NOT NULL
    AND period LIKE '____-__-__T__'  -- Ensure it matches our expected format
""")

# Check what we created
print("\n✅ Created silver_all_data")
print("\n📊 Statistics:")
total_count = spark.sql("SELECT COUNT(*) FROM silver_all_data").collect()[0][0]
period_ts_count = spark.sql("SELECT COUNT(*) FROM silver_all_data WHERE period_ts IS NOT NULL").collect()[0][0]

print(f"• Total records: {total_count:,}")
print(f"• Records with timestamps: {period_ts_count:,} ({period_ts_count/total_count*100:.1f}%)")

# Show sample
print("\n👁️  Sample data (with parsed timestamps):")
display(spark.sql("""
    SELECT 
        period,
        period_ts,
        period_date,
        period_hour,
        state,
        data_type,
        value_cleaned,
        value_units
    FROM silver_all_data 
    WHERE period_ts IS NOT NULL
    ORDER BY period_ts
    LIMIT 10
"""))

# Create individual views by data type
print("\n" + "=" * 80)
print("📁 CREATING INDIVIDUAL SILVER VIEWS")
print("=" * 80)

for data_type in ['demand', 'coal', 'gas', 'wind']:
    silver_view = f"silver_{data_type}"
    spark.sql(f"""
        CREATE OR REPLACE TEMP VIEW {silver_view} AS
        SELECT *
        FROM silver_all_data
        WHERE data_type = '{data_type}'
    """)
    count = spark.sql(f"SELECT COUNT(*) FROM {silver_view}").collect()[0][0]
    print(f"✅ {silver_view:20} - {count:8,} records")

print("\n" + "=" * 80)
print("✅ SILVER LAYER COMPLETE!")
print("=" * 80)
print(f"\n✅ Timestamps successfully parsed from ISO format: YYYY-MM-DDTHH")
print(f"✅ Added period_hour column for easy hour-based analysis")
print(f"✅ Ready for Gold layer star schema creation!")


🥈 SILVER LAYER - FIXED FOR ISO FORMAT (YYYY-MM-DDTHH)

🧹 Cleaning up existing silver views...
  ✅ Dropped: silver_all_data
  ✅ Dropped: silver_coal
  ✅ Dropped: silver_demand
  ✅ Dropped: silver_gas
  ✅ Dropped: silver_metadata
  ✅ Dropped: silver_wind

🔄 Creating silver_all_data with ISO format parsing...

✅ Created silver_all_data

📊 Statistics:
• Total records: 92,793
• Records with timestamps: 92,793 (100.0%)

👁️  Sample data (with parsed timestamps):


period,period_ts,period_date,period_hour,state,data_type,value_cleaned,value_units
2025-11-21T00,2025-11-21T00:00:00.000Z,2025-11-21,0,Ohio,demand,100148.0,megawatthours
2025-11-21T00,2025-11-21T00:00:00.000Z,2025-11-21,0,Florida,demand,6955.0,megawatthours
2025-11-21T00,2025-11-21T00:00:00.000Z,2025-11-21,0,California,demand,26814.0,megawatthours
2025-11-21T00,2025-11-21T00:00:00.000Z,2025-11-21,0,California,demand,26814.0,megawatthours
2025-11-21T00,2025-11-21T00:00:00.000Z,2025-11-21,0,Florida,demand,6955.0,megawatthours
2025-11-21T00,2025-11-21T00:00:00.000Z,2025-11-21,0,Florida,demand,6955.0,megawatthours
2025-11-21T00,2025-11-21T00:00:00.000Z,2025-11-21,0,Ohio,demand,100148.0,megawatthours
2025-11-21T00,2025-11-21T00:00:00.000Z,2025-11-21,0,Ohio,demand,100148.0,megawatthours
2025-11-21T00,2025-11-21T00:00:00.000Z,2025-11-21,0,California,demand,26814.0,megawatthours
2025-11-21T00,2025-11-21T00:00:00.000Z,2025-11-21,0,New York,demand,19327.0,megawatthours



📁 CREATING INDIVIDUAL SILVER VIEWS
✅ silver_demand        -   25,956 records
✅ silver_coal          -   23,721 records
✅ silver_gas           -   25,884 records
✅ silver_wind          -   17,232 records

✅ SILVER LAYER COMPLETE!

✅ Timestamps successfully parsed from ISO format: YYYY-MM-DDTHH
✅ Added period_hour column for easy hour-based analysis
✅ Ready for Gold layer star schema creation!


STEP 5: GOLD LAYER - STAR SCHEMA

🏆 Gold Layer - Camada de Modelagem Analítica
A Gold Layer representa a camada final de modelagem dimensional, implementando um esquema estrela otimizado para análise de desempenho e consumo pela camada de visualização (Power BI). Esta camada transforma os dados da Silver em tabelas factuais e dimensionais permanentes, organizadas para consultas analíticas eficientes através do Databricks SQL Serverless.

⭐ Esquema Estrela Implementado
🌟 Tabela Fato Principal: gold_fact_energy_measurements
Propósito: Registros detalhados de medições horárias

Contém: 721 registros horários × 8 estados × 4 tipos de dados

Campos-chave:

measurement_id: Identificador único (monotonically_increasing_id)

measurement_timestamp: Data/hora completa da medição

measurement_date e measurement_hour: Componentes temporais

state: Estado (California, Texas, Florida, etc.)

ba_code: Código da autoridade de balanceamento (CISO, ERCO, PJM, etc.)

data_type: Tipo de dado (demand, coal, gas, wind)

measurement_value: Valor limpo em MWh

original_value: Valor bruto preservado

value_units: Unidades de medida (megawatthours)

Metadados: source_file, ingestion_timestamp, silver_processing_timestamp

📊 Tabela Fato Agregada: gold_fact_daily_energy
Propósito: Agregações diárias otimizadas para visualizações

Contém: Agregações diárias por estado e tipo de energia

Métricas:

total_value: Soma diária em MWh

avg_value: Média horária diária

min_value/max_value: Valores extremos do dia

measurement_count: Número de medições horárias

Dimensões: date, state, data_type

🗓️ Dimensão de Data: gold_dim_date
Contém: Todos os dias presentes nos dados factuais

Atributos:

date: Data no formato YYYY-MM-DD

year, month, day: Componentes numéricos

quarter: Trimestre (1-4)

month_name, day_name: Nomes por extenso

day_of_week, day_of_year: Referências temporais

weekday_weekend: Classificação dia útil/fim de semana

📍 Dimensão de Localização: gold_dim_location
Contém: Estados únicos e seus códigos BA

Atributos:

location_id: Identificador numérico sequencial

state: Nome do estado (10 estados incluídos)

ba_code: Código da autoridade de balanceamento

market_category: Classificação (Major Market/Other Market)

Major Market: California, Texas, Florida, New York

Other Market: Demais estados

⚡ Dimensão de Tipo de Energia: gold_dim_energy_type
Contém: Tipos de dados energéticos únicos

Atributos:

energy_type_id: Identificador numérico sequencial

data_type: Código (demand, coal, gas, wind)

energy_type_name: Descrição por extenso

demand → "Electricity Demand"

coal → "Coal Generation"

gas → "Natural Gas Generation"

wind → "Wind Generation"

fuel_category: Classificação por categoria de combustível

Fossil Fuel: coal, gas

Renewable: wind

Load: demand

Other: demais tipos

🎯 Objetivos do Esquema Estrela
Desempenho: Consultas rápidas através de junções simples

Simplicidade: Modelo intuitivo para usuários de negócio

Flexibilidade: Suporte a múltiplos níveis de granularidade

Consistência: Dimensões compartilhadas entre fatos

Histórico: Preservação de dados históricos para tendências

📈 Casos de Uso Habilitados
Análise Temporal:
Tendências horárias/diárias de demanda

Sazonalidade por dia da semana

Padrões "Duck Curve" (Califórnia)

Análise Geográfica:
Comparativo entre estados

Distribuição por região

Performance por autoridade de balanceamento

Análise de Mix Energético:
Participação de fontes (carvão, gás, eólica)

Transição energética por estado

Correlação demanda-geração

Business Intelligence:
KPIs de desempenho do grid

Alertas de picos de demanda

Planejamento de capacidade

In [0]:
# ============================================================================
# 🥇 GOLD LAYER - PERMANENT TABLES FOR SERVERLESS SQL
# ============================================================================

print("\n" + "=" * 80)
print("🥇 GOLD LAYER - PERMANENT TABLES")
print("=" * 80)
print("Creating permanent tables for Serverless SQL access...")

# First, check what catalog/database we're in
print("\n📋 Current catalog/database:")
display(spark.sql("SELECT current_catalog(), current_database()"))

# ============================================================================
# CLEANUP EXISTING TABLES
# ============================================================================
print("\n🧹 Cleaning up existing tables...")
try:
    spark.sql("DROP TABLE IF EXISTS gold_fact_energy_measurements")
    spark.sql("DROP TABLE IF EXISTS gold_fact_daily_energy")
    spark.sql("DROP TABLE IF EXISTS gold_dim_date")
    spark.sql("DROP TABLE IF EXISTS gold_dim_location")
    spark.sql("DROP TABLE IF EXISTS gold_dim_energy_type")
    print("✅ Cleanup complete")
except Exception as e:
    print(f"⚠️  Cleanup warning: {str(e)[:50]}")

# ============================================================================
# 1. MAIN FACT TABLE (PERMANENT TABLE)
# ============================================================================
print("\n1. Creating permanent fact table...")
spark.sql("""
    CREATE OR REPLACE TABLE gold_fact_energy_measurements AS
    SELECT
        -- Generate a unique ID
        monotonically_increasing_id() AS measurement_id,
        
        -- Time dimensions
        period_ts AS measurement_timestamp,
        period_date AS measurement_date,
        period_hour AS measurement_hour,
        
        -- Location
        state,
        ba_code,
        
        -- Energy type
        data_type,
        
        -- Values
        value_cleaned AS measurement_value,
        value_original AS original_value,
        value_units,
        
        -- Additional info
        respondent_name,
        type_name,
        source_file,
        
        -- Metadata
        ingestion_timestamp,
        silver_processing_timestamp
        
    FROM silver_all_data
    WHERE value_cleaned IS NOT NULL
      AND period_ts IS NOT NULL
      AND period_date IS NOT NULL
""")
fact_count = spark.sql("SELECT COUNT(*) FROM gold_fact_energy_measurements").collect()[0][0]
print(f"✅ Created: gold_fact_energy_measurements ({fact_count:,} records)")

# ============================================================================
# 2. DAILY AGGREGATED TABLE (FOR YOUR AREA CHART)
# ============================================================================
print("\n2. Creating daily aggregated table for area chart...")
spark.sql("""
    CREATE OR REPLACE TABLE gold_fact_daily_energy AS
    SELECT
        measurement_date AS date,
        state,
        data_type,
        SUM(measurement_value) AS total_value,
        AVG(measurement_value) AS avg_value,
        MIN(measurement_value) AS min_value,
        MAX(measurement_value) AS max_value,
        COUNT(*) AS measurement_count
    FROM gold_fact_energy_measurements
    WHERE measurement_value IS NOT NULL
      AND measurement_date IS NOT NULL
    GROUP BY measurement_date, state, data_type
""")
daily_count = spark.sql("SELECT COUNT(*) FROM gold_fact_daily_energy").collect()[0][0]
print(f"✅ Created: gold_fact_daily_energy ({daily_count:,} records)")

# ============================================================================
# 3. DIMENSION TABLES (PERMANENT TABLES)
# ============================================================================
print("\n3. Creating dimension tables...")

# Dim Date
spark.sql("""
    CREATE OR REPLACE TABLE gold_dim_date AS
    WITH date_range AS (
        SELECT DISTINCT measurement_date AS date
        FROM gold_fact_energy_measurements
        WHERE measurement_date IS NOT NULL
    )
    SELECT
        date,
        YEAR(date) AS year,
        MONTH(date) AS month,
        DAY(date) AS day,
        DAYOFWEEK(date) AS day_of_week,
        DAYOFYEAR(date) AS day_of_year,
        QUARTER(date) AS quarter,
        DATE_FORMAT(date, 'MMMM') AS month_name,
        DATE_FORMAT(date, 'EEEE') AS day_name,
        CASE 
            WHEN DAYOFWEEK(date) IN (1, 7) THEN 'Weekend'
            ELSE 'Weekday'
        END AS weekday_weekend
    FROM date_range
    ORDER BY date
""")
print("✅ Created: gold_dim_date")

# Dim Location
spark.sql("""
    CREATE OR REPLACE TABLE gold_dim_location AS
    SELECT DISTINCT
        ROW_NUMBER() OVER (ORDER BY state) AS location_id,
        state,
        ba_code,
        CASE 
            WHEN state IN ('California', 'Texas', 'Florida', 'New York') 
            THEN 'Major Market'
            ELSE 'Other Market'
        END AS market_category
    FROM gold_fact_energy_measurements
    WHERE state IS NOT NULL AND state != 'Unknown'
    ORDER BY state
""")
print("✅ Created: gold_dim_location")

# Dim Energy Type
spark.sql("""
    CREATE OR REPLACE TABLE gold_dim_energy_type AS
    SELECT DISTINCT
        ROW_NUMBER() OVER (ORDER BY data_type) AS energy_type_id,
        data_type,
        CASE 
            WHEN data_type = 'demand' THEN 'Electricity Demand'
            WHEN data_type = 'coal' THEN 'Coal Generation'
            WHEN data_type = 'gas' THEN 'Natural Gas Generation'
            WHEN data_type = 'wind' THEN 'Wind Generation'
            ELSE data_type
        END AS energy_type_name,
        CASE 
            WHEN data_type IN ('coal', 'gas') THEN 'Fossil Fuel'
            WHEN data_type = 'wind' THEN 'Renewable'
            WHEN data_type = 'demand' THEN 'Load'
            ELSE 'Other'
        END AS fuel_category
    FROM gold_fact_energy_measurements
    WHERE data_type IS NOT NULL
    ORDER BY data_type
""")
print("✅ Created: gold_dim_energy_type")

# ============================================================================
# SUMMARY
# ============================================================================
print("\n" + "=" * 80)
print("✅ PERMANENT GOLD LAYER TABLES CREATED!")
print("=" * 80)

print(f"""
🎉 Permanent tables created! Access them from Serverless SQL with:

1. Your area chart query:
   SELECT 
     date,
     state,
     SUM(total_value) AS total_generation_mw
   FROM gold_fact_daily_energy
   WHERE data_type IN ('coal', 'gas', 'wind')
     AND state IS NOT NULL
     AND state != 'Unknown'
   GROUP BY date, state
   ORDER BY date, state;

2. Test query:
   SELECT * FROM gold_fact_daily_energy 
   WHERE data_type IN ('coal', 'gas', 'wind')
   LIMIT 5;

📋 Available permanent tables:
   • gold_fact_energy_measurements
   • gold_fact_daily_energy
   • gold_dim_date
   • gold_dim_location
   • gold_dim_energy_type
""")

# Quick test
print("\n🧪 Quick test - sample from daily aggregated:")
display(spark.sql("""
    SELECT * FROM gold_fact_daily_energy 
    WHERE data_type IN ('coal', 'gas', 'wind')
    LIMIT 5
"""))


🥇 GOLD LAYER - PERMANENT TABLES
Creating permanent tables for Serverless SQL access...

📋 Current catalog/database:


current_catalog(),current_database()
workspace,default



🧹 Cleaning up existing tables...
✅ Cleanup complete

1. Creating permanent fact table...
✅ Created: gold_fact_energy_measurements (91,299 records)

2. Creating daily aggregated table for area chart...
✅ Created: gold_fact_daily_energy (911 records)

3. Creating dimension tables...
✅ Created: gold_dim_date
✅ Created: gold_dim_location
✅ Created: gold_dim_energy_type

✅ PERMANENT GOLD LAYER TABLES CREATED!

🎉 Permanent tables created! Access them from Serverless SQL with:

1. Your area chart query:
   SELECT 
     date,
     state,
     SUM(total_value) AS total_generation_mw
   FROM gold_fact_daily_energy
   WHERE data_type IN ('coal', 'gas', 'wind')
     AND state IS NOT NULL
     AND state != 'Unknown'
   GROUP BY date, state
   ORDER BY date, state;

2. Test query:
   SELECT * FROM gold_fact_daily_energy 
   WHERE data_type IN ('coal', 'gas', 'wind')
   LIMIT 5;

📋 Available permanent tables:
   • gold_fact_energy_measurements
   • gold_fact_daily_energy
   • gold_dim_date
   • gold

date,state,data_type,total_value,avg_value,min_value,max_value,measurement_count
2025-12-17,Illinois,gas,1804449.0,25061.791666666668,21766.0,31778.0,72
2025-12-08,Illinois,gas,2261835.0,31414.375,26637.0,36941.0,72
2025-12-08,Georgia,gas,6182038.0,17172.327777777777,2888.0,57462.0,360
2025-12-10,California,wind,120198.0,834.7083333333334,428.0,1351.0,144
2025-12-17,California,wind,312309.0,2168.8125,479.0,3313.0,144
