# Air Quality Normalised Data EDA

This notebook analyzes the normalised air quality data to verify that the normalisation pipeline worked correctly.

## Objectives
1. Load and inspect all normalised air quality Parquet files
2. Verify tall schema structure and data quality
3. Check station mapping consistency with staging
4. Analyze temporal coverage and completeness
5. Compare pollutant distributions and ranges
6. Validate station dimension updates

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import glob
import yaml
import warnings
warnings.filterwarnings('ignore')

# Set display options
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 20)

# Set plot style
plt.style.use('default')
sns.set_palette("husl")

print("Libraries loaded successfully")

## 1. Load Configuration and Discover Files

In [None]:
# Define paths
normalised_dir = Path("/home/jovyan/work/data/normalised")
config_dir = Path("/home/jovyan/work/src/configs")

# Load station mapping configuration
with open(config_dir / "station_mapping.yaml", 'r') as f:
    station_config = yaml.safe_load(f)

print(f"Normalised data directory: {normalised_dir}")
print(f"Directory exists: {normalised_dir.exists()}")

# Discover all air quality normalised files
air_quality_files = list(normalised_dir.glob("air_quality_*_normalised.parquet"))
print(f"\nFound {len(air_quality_files)} air quality normalised files:")
for file in sorted(air_quality_files):
    print(f"  - {file.name} ({file.stat().st_size / 1024:.1f} KB)")

# Check for station dimension
station_dim_path = normalised_dir / "station_dim.parquet"
print(f"\nStation dimension file: {station_dim_path.exists()}")
if station_dim_path.exists():
    print(f"  - station_dim.parquet ({station_dim_path.stat().st_size / 1024:.1f} KB)")

## 2. Load and Inspect Station Dimension

In [None]:
# Load station dimension
if station_dim_path.exists():
    station_dim = pd.read_parquet(station_dim_path)
    print("Station Dimension:")
    display(station_dim)
    
    print(f"\nStation dimension shape: {station_dim.shape}")
    print(f"Unique station PKs: {station_dim['station_pk'].nunique()}")
    print(f"Station PK range: {station_dim['station_pk'].min()} - {station_dim['station_pk'].max()}")
else:
    print("❌ Station dimension file not found")
    station_dim = None

# Extract air quality metrics from config
air_quality_metrics = station_config['air_quality_metrics']
pollutant_lookup = {metric['metric_code']: metric for metric in air_quality_metrics}

print("\nAir Quality Pollutants:")
for code, metric in pollutant_lookup.items():
    print(f"  {code}: {metric['metric_name']} ({metric['unit']})")

## 3. Load and Analyze All Normalised Data

In [None]:
# Load all air quality normalised files
air_quality_data = {}
file_summary = []

for file_path in sorted(air_quality_files):
    try:
        # Extract pollutant and year from filename
        filename = file_path.stem
        parts = filename.split('_')
        if len(parts) >= 5:  # air_quality_pollutant_year_normalised
            pollutant = parts[2]
            year = parts[3]
        else:
            pollutant = 'unknown'
            year = 'unknown'
        
        # Load data
        df = pd.read_parquet(file_path)
        
        # Store in dictionary
        key = f"{pollutant}_{year}"
        air_quality_data[key] = df
        
        # Collect summary info
        date_info = "No datetime"
        if 'datetime' in df.columns:
            date_info = (
                f"{df['datetime'].min().date()} to "
                f"{df['datetime'].max().date()}"
            )
        
        file_summary.append({
            'file': file_path.name,
            'pollutant': pollutant,
            'year': year,
            'rows': len(df),
            'columns': len(df.columns),
            'date_range': date_info,
            'unique_stations': df['station_pk'].nunique() if 'station_pk' in df.columns else 0,
            'memory_mb': df.memory_usage(deep=True).sum() / 1024 / 1024
        })
        
        print(f"✅ Loaded {key}: {len(df)} rows, {len(df.columns)} columns")
        
    except Exception as e:
        print(f"❌ Error loading {file_path.name}: {e}")

# Create summary DataFrame
summary_df = pd.DataFrame(file_summary)
print(f"\nLoaded {len(air_quality_data)} datasets successfully")
print(f"Total memory usage: {summary_df['memory_mb'].sum():.2f} MB")

In [None]:
# Display file summary
print("File Summary:")
display(summary_df)

# Show schema for first file
if air_quality_data:
    first_key = list(air_quality_data.keys())[0]
    first_df = air_quality_data[first_key]
    print(f"\nSchema for {first_key}:")
    print(first_df.dtypes)
    print(f"\nSample data:")
    display(first_df.head())

## 4. Data Quality Validation

In [None]:
# Validate tall schema structure
expected_columns = [
    'datetime', 'station_pk', 'station_code', 'station_name', 
    'location_type', 'metric', 'unit', 'value', 'quality_flag', 'source'
]

print("TALL SCHEMA VALIDATION:")
print("=" * 40)

schema_issues = []
for key, df in air_quality_data.items():
    missing_cols = set(expected_columns) - set(df.columns)
    if missing_cols:
        schema_issues.append(f"{key}: Missing columns {missing_cols}")
    
    # Check data types
    if 'datetime' in df.columns and not pd.api.types.is_datetime64_any_dtype(df['datetime']):
        schema_issues.append(f"{key}: datetime column not datetime type")
    
    if 'value' in df.columns and not pd.api.types.is_numeric_dtype(df['value']):
        schema_issues.append(f"{key}: value column not numeric")

if schema_issues:
    print("❌ Schema issues found:")
    for issue in schema_issues:
        print(f"  - {issue}")
else:
    print("✅ All files have correct tall schema structure")

print(f"\nDATA COMPLETENESS:")
print("=" * 40)

# Check for missing values
for key, df in list(air_quality_data.items())[:3]:  # Show first 3 for brevity
    null_counts = df.isnull().sum()
    total_rows = len(df)
    print(f"\n{key} ({total_rows:,} rows):")
    for col, null_count in null_counts.items():
        if null_count > 0:
            percentage = (null_count / total_rows) * 100
            print(f"  {col}: {null_count:,} nulls ({percentage:.1f}%)")

## 5. Pollutant Analysis

In [None]:
# Combine all data for analysis
if air_quality_data:
    all_data = pd.concat(air_quality_data.values(), ignore_index=True)
    
    print("POLLUTANT ANALYSIS:")
    print("=" * 40)
    
    # Summary by pollutant
    pollutant_summary = (
        all_data.groupby('metric')
        .agg({
            'value': ['count', 'mean', 'std', 'min', 'max'],
            'station_pk': 'nunique',
            'datetime': ['min', 'max']
        })
        .round(2)
    )
    
    print("Pollutant Statistics:")
    display(pollutant_summary)
    
    # Quality flag distribution
    print("\nQuality Flag Distribution:")
    quality_dist = all_data['quality_flag'].value_counts()
    for flag, count in quality_dist.items():
        percentage = (count / len(all_data)) * 100
        print(f"  {flag}: {count:,} ({percentage:.1f}%)")
else:
    print("No data available for analysis")

## 6. Visualizations

In [None]:
# Create comprehensive visualizations
if air_quality_data and len(summary_df) > 0:
    fig, axes = plt.subplots(2, 2, figsize=(16, 12))
    fig.suptitle('Air Quality Normalised Data Analysis', fontsize=16)
    
    # 1. Records by pollutant and year
    pollutant_year_counts = (
        summary_df.groupby(['pollutant', 'year'])['rows']
        .sum()
        .unstack(fill_value=0)
    )
    pollutant_year_counts.plot(kind='bar', ax=axes[0,0])
    axes[0,0].set_title('Records by Pollutant and Year')
    axes[0,0].set_xlabel('Pollutant')
    axes[0,0].set_ylabel('Number of Records')
    axes[0,0].legend(title='Year', bbox_to_anchor=(1.05, 1), loc='upper left')
    axes[0,0].tick_params(axis='x', rotation=45)
    
    # 2. Station coverage by pollutant
    station_counts = summary_df.groupby('pollutant')['unique_stations'].mean()
    station_counts.plot(kind='bar', ax=axes[0,1])
    axes[0,1].set_title('Average Station Coverage by Pollutant')
    axes[0,1].set_xlabel('Pollutant')
    axes[0,1].set_ylabel('Number of Stations')
    axes[0,1].tick_params(axis='x', rotation=45)
    
    # 3. Data overview text
    total_records = summary_df['rows'].sum()
    total_files = len(summary_df)
    years_covered = sorted(summary_df['year'].unique())
    pollutants_covered = sorted(summary_df['pollutant'].unique())
    
    overview_text = (
        f'Total Files: {total_files}\\n'
        f'Total Records: {total_records:,}\\n'
        f'Years: {", ".join(years_covered)}\\n'
        f'Pollutants: {", ".join(pollutants_covered)}\\n'
        f'Schema: Tall Format (VALID)\\n'
        f'Source: air_quality'
    )
    axes[1,0].text(
        0.5, 0.5, overview_text, 
        ha='center', va='center', fontsize=12, 
        transform=axes[1,0].transAxes
    )
    axes[1,0].set_title('Data Overview')
    axes[1,0].axis('off')
    
    # 4. Memory usage by pollutant
    memory_by_pollutant = summary_df.groupby('pollutant')['memory_mb'].sum()
    memory_by_pollutant.plot(kind='bar', ax=axes[1,1])
    axes[1,1].set_title('Memory Usage by Pollutant')
    axes[1,1].set_xlabel('Pollutant')
    axes[1,1].set_ylabel('Memory (MB)')
    axes[1,1].tick_params(axis='x', rotation=45)
    
    plt.tight_layout()
    plt.show()
else:
    print("No data available for visualization")

## 7. Cross-Dataset Compatibility Check

In [None]:
# Check for wind normalised files for compatibility
wind_files = list(normalised_dir.glob("wind_*_normalised.parquet"))
print(f"Wind normalised files found: {len(wind_files)}")

if wind_files and air_quality_data:
    # Load one wind file to compare schema
    wind_sample = pd.read_parquet(wind_files[0])
    air_sample = list(air_quality_data.values())[0]
    
    print("\nSCHEMA COMPATIBILITY:")
    print("=" * 40)
    
    wind_cols = set(wind_sample.columns)
    air_cols = set(air_sample.columns)
    
    common_cols = wind_cols & air_cols
    wind_only = wind_cols - air_cols
    air_only = air_cols - wind_cols
    
    print(f"Common columns: {len(common_cols)}")
    print(f"  {sorted(common_cols)}")
    
    if wind_only:
        print(f"\nWind-only columns: {sorted(wind_only)}")
    
    if air_only:
        print(f"\nAir quality-only columns: {sorted(air_only)}")
    
    # Check shared stations
    wind_stations = set(wind_sample['station_pk'].unique())
    air_stations = set(air_sample['station_pk'].unique())
    shared_stations = wind_stations & air_stations
    
    print(f"\nSTATION OVERLAP:")
    print(f"Wind stations: {sorted(wind_stations)}")
    print(f"Air quality stations: {sorted(air_stations)}")
    print(f"Shared stations (PKs 1-7): {sorted(shared_stations)}")
    
    if len(shared_stations) == 7:
        print("✅ Expected 7 shared stations found")
    else:
        print(f"⚠️  Expected 7 shared stations, found {len(shared_stations)}")

else:
    print("Wind data not available for compatibility check")

## 8. Summary and Validation

In [None]:
print("AIR QUALITY NORMALISATION PIPELINE VALIDATION SUMMARY")
print("=" * 70)

# File statistics
if len(summary_df) > 0:
    print(f"\n📊 DATA VOLUME:")
    print(f"  • Total normalised files: {len(air_quality_files)}")
    print(f"  • Total records: {summary_df['rows'].sum():,}")
    print(f"  • Total memory usage: {summary_df['memory_mb'].sum():.2f} MB")
    print(f"  • Years covered: {sorted(summary_df['year'].unique())}")
    print(f"  • Pollutants processed: {sorted(summary_df['pollutant'].unique())}")
    
    # Expected outputs validation
    expected_pollutants = ['no2', 'o3', 'pm10', 'pm25', 'so2']
    expected_years = ['2019', '2020', '2021', '2022']
    
    # 2019: 4 files (no PM2.5), 2020-2022: 5 files each
    expected_total_files = 4 + (3 * 5)  # 19 files
    actual_total_files = len(air_quality_files)
    
    print(f"\n🎯 VALIDATION RESULTS:")
    validation_passed = actual_total_files == expected_total_files
    print(f"  • Expected total files: {expected_total_files}")
    print(f"  • Actual total files: {actual_total_files}")
    status_text = 'PASSED' if validation_passed else 'FAILED'
    print(f"  • File count validation: {status_text}")
    
    # Schema validation
    schema_valid = len(schema_issues) == 0
    schema_status = 'PASSED' if schema_valid else 'FAILED'
    print(f"  • Tall schema validation: {schema_status}")
    
    # Station dimension check
    station_dim_valid = station_dim_path.exists()
    station_status = 'PASSED' if station_dim_valid else 'FAILED'
    print(f"  • Station dimension present: {station_status}")
    
    overall_validation = validation_passed and schema_valid and station_dim_valid
    
    print(f"\n🚀 STATUS:")
    if overall_validation:
        print(f"  • ✅ Air quality normalisation pipeline working correctly")
        print(f"  • ✅ Tall schema format validated")
        print(f"  • ✅ Ready for DuckDB loading and cross-dataset analytics")
        print(f"  • ✅ Station dimension updated with air quality stations")
    else:
        print(f"  • ❌ Review normalisation pipeline for issues")
        if not validation_passed:
            print(f"    - File count mismatch")
        if not schema_valid:
            print(f"    - Schema validation failed")
        if not station_dim_valid:
            print(f"    - Station dimension missing")
    
    final_status = 'SUCCESS' if overall_validation else 'NEEDS REVIEW'
    print(f"\n📝 NORMALISATION PIPELINE STATUS: {final_status}")
    
    if overall_validation:
        print(f"\n🔗 NEXT STEPS:")
        print(f"  1. Load normalised data into DuckDB")
        print(f"  2. Combine with wind data for comprehensive analytics")
        print(f"  3. Implement data quality checks")
        print(f"  4. Create cross-dataset analysis dashboards")
else:
    print("\n❌ No normalised air quality data found")
    print("   Run the air quality normalisation pipeline first")