# Data Quality Analysis - Energy Data

**Purpose**: Comprehensive data quality verification, focusing on renewable energy data issues

**Date**: January 12, 2026

---

## 1. Setup and Connection

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import psycopg2
from psycopg2.extras import RealDictCursor
from sqlalchemy import create_engine
import warnings
warnings.filterwarnings('ignore')

# Set plotting style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

# Database connection
DB_CONFIG = {
    'host': '172.18.0.1',
    'port': 5432,
    'database': 'lianel_energy',
    'user': 'airflow',
    'password': 'P9xK2mN7vQ4wR8tY3sL6hJ5nB1cV0zX'  # Should be from environment variable
}

# Create SQLAlchemy engine for pandas
connection_string = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
engine = create_engine(connection_string)

print("‚úÖ Database connection established")

## 2. Renewable Energy Issue Investigation

In [None]:
# Check renewable energy percentages
renewable_query = """
SELECT 
    cntr_code,
    year,
    pct_renewable,
    renewable_energy_gwh,
    fossil_energy_gwh,
    total_energy_gwh,
    CASE 
        WHEN pct_renewable >= 99 THEN 'SUSPICIOUS'
        WHEN pct_renewable >= 80 THEN 'HIGH'
        WHEN pct_renewable >= 50 THEN 'MODERATE'
        ELSE 'LOW'
    END as renewable_category
FROM ml_dataset_forecasting_v1
WHERE year = (SELECT MAX(year) FROM ml_dataset_forecasting_v1)
ORDER BY pct_renewable DESC, total_energy_gwh DESC;
"""

renewable_df = pd.read_sql(renewable_query, engine)

print("‚ö†Ô∏è  Renewable Energy Analysis (Latest Year):")
print(f"\nTotal countries: {len(renewable_df)}")
print(f"Countries with 100% renewable: {len(renewable_df[renewable_df['pct_renewable'] >= 99])}")
print(f"Countries with 0 fossil energy: {len(renewable_df[renewable_df['fossil_energy_gwh'] == 0])}")

print("\nüî¥ Countries with 100% renewable (SUSPICIOUS):")
suspicious = renewable_df[renewable_df['pct_renewable'] >= 99]
print(suspicious[['cntr_code', 'year', 'pct_renewable', 'renewable_energy_gwh', 'fossil_energy_gwh', 'total_energy_gwh']].to_string(index=False))

## 3. Product Code Analysis - Missing Fossil Products

In [None]:
# Check which product codes are in fact_energy_annual
product_query = """
SELECT 
    e.product_code,
    p.product_name,
    p.renewable_flag,
    p.fossil_flag,
    COUNT(*) as record_count,
    SUM(e.value_gwh) as total_gwh,
    COUNT(DISTINCT e.country_code) as countries,
    COUNT(DISTINCT e.year) as years
FROM fact_energy_annual e
LEFT JOIN dim_energy_product p ON e.product_code = p.product_code
WHERE e.source_table = 'nrg_bal_s'
GROUP BY e.product_code, p.product_name, p.renewable_flag, p.fossil_flag
ORDER BY record_count DESC;
"""

product_df = pd.read_sql(product_query, engine)

print("üì¶ Product Codes in fact_energy_annual:")
print(product_df.to_string(index=False))

print(f"\n‚ö†Ô∏è  Missing Fossil Products:")
fossil_products = pd.read_sql("""
    SELECT product_code, product_name, fossil_flag
    FROM dim_energy_product
    WHERE fossil_flag = true
    ORDER BY product_code;
""", engine)

missing_fossil = fossil_products[~fossil_products['product_code'].isin(product_df['product_code'])]
if len(missing_fossil) > 0:
    print(f"\n‚ùå {len(missing_fossil)} fossil products are missing from fact_energy_annual:")
    print(missing_fossil.to_string(index=False))
    print("\nüî¥ ROOT CAUSE: Fossil product codes (C0110, C0121, C0350, etc.) are not being ingested!")
    print("   This explains why all countries show 100% renewable energy.")
else:
    print("\n‚úÖ All fossil products are present")

## 4. Data Quality Summary and Recommendations

In [None]:
# Generate data quality report
quality_report = {
    'Issue': [],
    'Severity': [],
    'Count': [],
    'Description': []
}

# Check 1: 100% renewable countries
suspicious_count = len(renewable_df[renewable_df['pct_renewable'] >= 99])
quality_report['Issue'].append('Countries with 100% renewable energy')
quality_report['Severity'].append('HIGH' if suspicious_count > 5 else 'MEDIUM')
quality_report['Count'].append(suspicious_count)
quality_report['Description'].append(f'{suspicious_count} countries show 100% renewable, likely missing fossil data')

# Check 2: Missing fossil products
missing_fossil_count = len(missing_fossil) if len(missing_fossil) > 0 else 0
quality_report['Issue'].append('Missing fossil product codes')
quality_report['Severity'].append('HIGH' if missing_fossil_count > 0 else 'NONE')
quality_report['Count'].append(missing_fossil_count)
quality_report['Description'].append(f'{missing_fossil_count} fossil products not found in fact_energy_annual')

# Check 3: Zero fossil energy
zero_fossil = len(renewable_df[renewable_df['fossil_energy_gwh'] == 0])
quality_report['Issue'].append('Countries with zero fossil energy')
quality_report['Severity'].append('HIGH' if zero_fossil > 5 else 'MEDIUM')
quality_report['Count'].append(zero_fossil)
quality_report['Description'].append(f'{zero_fossil} countries have 0 GWh fossil energy')

quality_df = pd.DataFrame(quality_report)

print("\nüìã Data Quality Report:")
print("=" * 80)
print(quality_df.to_string(index=False))
print("=" * 80)

# Summary
high_severity = quality_df[quality_df['Severity'] == 'HIGH']
if len(high_severity) > 0:
    print(f"\nüî¥ HIGH SEVERITY ISSUES: {len(high_severity)}")
    for _, row in high_severity.iterrows():
        print(f"  - {row['Issue']}: {row['Count']} occurrences")
    print("\nüí° RECOMMENDATIONS:")
    print("  1. Check Eurostat API response - verify fossil product codes are returned")
    print("  2. Review ingestion DAG logs for filtering/validation issues")
    print("  3. Verify product code mapping in ingestion DAG")
    print("  4. Re-run ingestion DAG after fixing issues")
    print("  5. Re-run harmonization and ML dataset DAGs")
else:
    print("\n‚úÖ No high severity issues found")

## 5. Root Cause Analysis - Eurostat API Investigation

In [None]:
# Investigate what product codes Eurostat API actually returns
import requests

BASE_URL = 'https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data'
params = {
    'format': 'JSON',
    'geo': 'DE',  # Germany as example
    'time': '2024'
}

try:
    response = requests.get(f'{BASE_URL}/nrg_bal_s', params=params, timeout=60)
    data = response.json()
    
    dimensions = data.get('dimension', {})
    siec_dim = dimensions.get('siec', {})
    api_product_codes = []
    
    if 'category' in siec_dim and 'index' in siec_dim['category']:
        api_product_codes = list(siec_dim['category']['index'].keys())
    
    print("üì° Product codes returned by Eurostat API (nrg_bal_s):")
    print(f"Total: {len(api_product_codes)}")
    for code in sorted(api_product_codes):
        print(f"  - {code}")
    
    print("\nüîç Analysis:")
    fossil_codes_expected = ['C0110', 'C0121', 'C0350']
    fossil_codes_found = [c for c in api_product_codes if any(fc in c for fc in fossil_codes_expected)]
    
    print(f"\n‚ùå Individual fossil codes NOT in API response:")
    for code in fossil_codes_expected:
        if code not in api_product_codes:
            print(f"  - {code} (missing)")
    
    print(f"\n‚úÖ Aggregated codes that might map to fossil:")
    aggregated = [c for c in api_product_codes if 'C' in c and ('0350' in c or '0370' in c)]
    for code in aggregated:
        print(f"  - {code} (can map to C0350)")
    
    print("\nüí° ROOT CAUSE:")
    print("  - Eurostat API returns aggregated codes (C0350-0370) not individual codes")
    print("  - C0110 (Hard coal) and C0121 (Brown coal) are NOT available in nrg_bal_s table")
    print("  - Only natural gas aggregated code (C0350-0370) is available")
    print("  - Solution: Map aggregated codes to individual codes in ingestion DAG")
    
except Exception as e:
    print(f"‚ùå Error querying Eurostat API: {e}")