In [1]:
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
import numpy as np
from pathlib import Path

print("Starting emissions data processing...")

# ================================
# Step 1: Load CEMS data for facility filtering
# ================================

print("Loading CEMS facilities data...")
emissions_cems = pd.read_csv('../data/2023_annual_emissions_CEMS.csv')
cems_plants = set(emissions_cems['Facility ID'].unique())  # Convert to set for faster lookups
print(f"Loaded {len(cems_plants)} unique CEMS facility IDs")

# ================================
# Step 2: Load and filter FF10_POINT data based on CEMS facilities
# ================================

# Path to your specific FF10_POINT format file
egu_file_path = "../data/raw/point/2022hc_cb6_22m/inputs/ptegu/egu_cems_2022_POINT_20240615_2022cems_stackfix2_23jul2024_v0.csv"

print(f"Reading EGU CEMS data from {egu_file_path}...")

# Count number of header lines to skip
with open(egu_file_path, 'r') as f:
    header_lines = 0
    for line in f:
        if line.startswith('#'):
            header_lines += 1
        else:
            break

# Read the FF10_POINT file, skipping header comments
egu_df = pd.read_csv(egu_file_path, skiprows=header_lines, low_memory=False)
print(f"EGU data loaded. Shape: {egu_df.shape}")

# Filter to keep only rows where oris_facility_code is in CEMS plants
filtered_egu = []
matched_facility_ids = set()  # Track unique facility IDs that match CEMS

# Create a dictionary to store stack parameters by facility ID
stack_params_by_facility = {}

for idx, row in egu_df.iterrows():
    try:
        # Get ORIS facility code
        oris_code = row.get('oris_facility_code', None)
        facility_id = row.get('facility_id', None)
        
        # Skip if either is missing
        if oris_code is None or pd.isna(oris_code) or facility_id is None or pd.isna(facility_id):
            continue
            
        # Try to convert ORIS code to integer
        try:
            oris_code_int = int(str(oris_code).strip())
            # Check if in CEMS plants
            if oris_code_int in cems_plants:
                filtered_egu.append(row)
                facility_id_str = str(facility_id).strip()
                matched_facility_ids.add(facility_id_str)
                
                # Store stack parameters for this facility
                if facility_id_str not in stack_params_by_facility:
                    stack_params_by_facility[facility_id_str] = {
                        'orispl': oris_code_int,
                        'stkhgt': row.get('stkhgt', np.nan),
                        'stkdiam': row.get('stkdiam', np.nan),
                        'stktemp': row.get('stktemp', np.nan),
                        'stkvel': row.get('stkvel', np.nan),
                        # Get the counts for weighted averaging
                        'count': 1
                    }
                else:
                    # Update with new values (we'll average later)
                    current = stack_params_by_facility[facility_id_str]
                    current['stkhgt'] = np.nansum([current['stkhgt'], row.get('stkhgt', np.nan)])
                    current['stkdiam'] = np.nansum([current['stkdiam'], row.get('stkdiam', np.nan)])
                    current['stktemp'] = np.nansum([current['stktemp'], row.get('stktemp', np.nan)])
                    current['stkvel'] = np.nansum([current['stkvel'], row.get('stkvel', np.nan)])
                    current['count'] += 1
        except (ValueError, TypeError):
            continue
    except Exception as e:
        if idx % 5000 == 0:  # Limit error output
            print(f"Error processing EGU row {idx}: {e}")

print(f"Found {len(matched_facility_ids)} unique facility IDs matching CEMS plants")

# Calculate average stack parameters by facility
for facility_id, params in stack_params_by_facility.items():
    count = params['count']
    if count > 0:
        params['stkhgt'] = params['stkhgt'] / count
        params['stkdiam'] = params['stkdiam'] / count
        params['stktemp'] = params['stktemp'] / count
        params['stkvel'] = params['stkvel'] / count

# ================================
# Step 3: Load and filter NEI data based on matched facility IDs
# ================================

# Path to your NEI facility summary CSV file
nei_file_path = "../data/raw/2021_NEI_Facility_summary.csv"
print(f"Reading NEI data from {nei_file_path}...")

# Read the NEI CSV file
try:
    nei_df = pd.read_csv(nei_file_path, sep=',', low_memory=False)
    print("NEI file read with comma delimiter")
except:
    try:
        nei_df = pd.read_csv(nei_file_path, low_memory=False)
        print("NEI file read with default delimiter")
    except:
        nei_df = pd.read_csv(nei_file_path, sep=None, engine='python', low_memory=False)
        print("NEI file read with automatic delimiter detection")

print(f"NEI data loaded. Shape: {nei_df.shape}")

# Filter NEI data to keep only rows with eis facility id matching matched_facility_ids
filtered_nei = []
for idx, row in nei_df.iterrows():
    try:
        eis_id = row.get('eis facility id', None)
        if eis_id is not None and not pd.isna(eis_id) and str(eis_id).strip() in matched_facility_ids:
            filtered_nei.append(row)
    except Exception as e:
        if idx % 10000 == 0:  # Limit error output
            print(f"Error processing NEI row {idx}: {e}")

print(f"Filtered NEI data to {len(filtered_nei)} rows matching facility IDs")

# Convert filtered_nei list to DataFrame
filtered_nei_df = pd.DataFrame(filtered_nei)

# ================================
# Step 4: Process filtered NEI data to create GeoDataFrame for InMAP
# ================================

# Create output directory if it doesn't exist
output_dir = Path("../data/processed")
output_dir.mkdir(parents=True, exist_ok=True)

# Convert emissions to metric tonnes
def convert_to_tonnes(row):
    if row['emissions uom'] == 'LB':
        return float(row['total emissions']) * 0.000453592  # Convert pounds to metric tonnes
    elif row['emissions uom'] == 'TON':
        return float(row['total emissions']) * 0.90718474  # Convert short tons to metric tonnes
    return float(row['total emissions'])  # Already in metric tonnes

# Categorize pollutants
def categorize_pollutant(row):
    pollutant = str(row['pollutant code']).upper()
    pollutant_desc = str(row['pollutant desc']).upper() if 'pollutant desc' in row else ""

    if pollutant == 'VOC' or 'VOLATILE ORGANIC' in pollutant_desc:
        return 'VOC'
    elif pollutant in ['NOX', 'NO', 'NO2'] or ('NITROGEN' in pollutant_desc and 'OXIDE' in pollutant_desc):
        return 'NOx'
    elif pollutant == 'NH3' or 'AMMONIA' in pollutant_desc:
        return 'NH3'
    elif pollutant in ['SO2', 'SO4'] or 'SULFUR' in pollutant_desc:
        return 'SOx'
    elif 'PM25' in pollutant or 'PM2.5' in pollutant_desc or 'PM2_5' in pollutant:
        return 'PM2_5'
    elif pollutant == 'CO2':
        return 'CO2'
    return 'Other'

# Apply the conversion and categorization functions
print("Processing emissions data...")

# Make sure required columns exist
required_columns = ['emissions uom', 'total emissions', 'pollutant code', 'pollutant desc', 
                    'site latitude', 'site longitude']
for col in required_columns:
    if col not in filtered_nei_df.columns:
        print(f"Warning: Missing required column '{col}' in NEI data")

# Add emissions_tonnes column
filtered_nei_df['emissions_tonnes'] = filtered_nei_df.apply(convert_to_tonnes, axis=1)

# Add pollutant_category column
filtered_nei_df['pollutant_category'] = filtered_nei_df.apply(categorize_pollutant, axis=1)

# Ensure latitude and longitude exist before creating geometry
filtered_nei_df['geometry'] = filtered_nei_df.apply(
    lambda row: Point(row['site longitude'], row['site latitude']) 
    if pd.notna(row['site longitude']) and pd.notna(row['site latitude']) 
    else None, axis=1
)

# Drop rows with invalid geometries
filtered_nei_df = filtered_nei_df.dropna(subset=['geometry'])

# ================================
# Step 5: Aggregate Data by Facility and Create GeoDataFrame
# ================================

# Group by facility and pollutant category
facility_emissions = filtered_nei_df.groupby([
    'eis facility id', 'site name', 'state', 'site latitude', 'site longitude', 
    'primary naics code', 'pollutant_category'
])['emissions_tonnes'].sum().reset_index()

# Convert to wide format with pollutants as columns
facility_wide = facility_emissions.pivot_table(
    index=['eis facility id', 'site name', 'state', 'site latitude', 'site longitude', 
           'primary naics code'],
    columns='pollutant_category', 
    values='emissions_tonnes',
    fill_value=0
).reset_index()

# Ensure all required pollutant columns exist
for cat in ['VOC', 'NOx', 'NH3', 'SOx', 'PM2_5', 'CO2']:
    if cat not in facility_wide.columns:
        facility_wide[cat] = 0

# Create geometry column
facility_wide['geometry'] = facility_wide.apply(
    lambda row: Point(row['site longitude'], row['site latitude']), 
    axis=1
)

# Add ORISPL and stack parameters from our stored dictionary
facility_wide['orispl'] = None
facility_wide['height'] = None
facility_wide['diam'] = None
facility_wide['temp'] = None
facility_wide['velocity'] = None

# Default stack parameter values (used when real values aren't available)
default_stack_height = 50.0  # meters
default_stack_diam = 5.0     # meters
default_stack_temp = 400.0   # Kelvin
default_stack_velocity = 20.0  # m/s

# Unit conversion functions
def ft_to_m(ft_value):
    """Convert feet to meters"""
    if pd.isna(ft_value):
        return np.nan
    return float(ft_value) * 0.3048

def f_to_k(f_value):
    """Convert Fahrenheit to Kelvin"""
    if pd.isna(f_value):
        return np.nan
    return (float(f_value) - 32) * 5/9 + 273.15

# Apply stack parameters from our stored dictionary
for idx, row in facility_wide.iterrows():
    facility_id = str(row['eis facility id']).strip()
    if facility_id in stack_params_by_facility:
        params = stack_params_by_facility[facility_id]
        facility_wide.at[idx, 'orispl'] = params['orispl']
        
        # Convert units and apply parameters with fallbacks to defaults
        stkhgt = ft_to_m(params['stkhgt'])
        facility_wide.at[idx, 'height'] = stkhgt if not pd.isna(stkhgt) else default_stack_height
        
        stkdiam = ft_to_m(params['stkdiam'])
        facility_wide.at[idx, 'diam'] = stkdiam if not pd.isna(stkdiam) else default_stack_diam
        
        stktemp = f_to_k(params['stktemp'])
        facility_wide.at[idx, 'temp'] = stktemp if not pd.isna(stktemp) else default_stack_temp
        
        stkvel = ft_to_m(params['stkvel'])
        facility_wide.at[idx, 'velocity'] = stkvel if not pd.isna(stkvel) else default_stack_velocity
    else:
        # If we don't have stack parameters for this facility, use defaults
        facility_wide.at[idx, 'height'] = default_stack_height
        facility_wide.at[idx, 'diam'] = default_stack_diam
        facility_wide.at[idx, 'temp'] = default_stack_temp
        facility_wide.at[idx, 'velocity'] = default_stack_velocity

# Create GeoDataFrame
gdf = gpd.GeoDataFrame(
    facility_wide, 
    geometry='geometry',
    crs='epsg:4269'  # Set coordinate reference system
)

print(f"Created GeoDataFrame with {len(gdf)} facilities")

# Filter for power plants (EGUs) using NAICS codes
egu_naics = ['2211', '221111', '221112', '221113', '221114', '221115', 
             '221116', '221117', '221118', '221121', '221122']

# Create a mask for power plants
is_power_plant = gdf['primary naics code'].astype(str).apply(
    lambda x: any(x.startswith(prefix) for prefix in egu_naics) if not pd.isna(x) else False
)

# Apply the mask
egu_gdf = gdf[is_power_plant].copy()
print(f"Filtered to {len(egu_gdf)} power plant facilities")

# ================================
# Step 7: Save Processed Data
# ================================

# Save all facilities data
all_facilities_output = f"{output_dir}/processed_all_facilities_emissions.gpkg"
gdf.to_file(all_facilities_output, driver="GPKG")
print(f"Saved all facilities data to {all_facilities_output}")

# Save power plant data
egu_output_file = f"{output_dir}/processed_egu_emissions.gpkg"
egu_gdf.to_file(egu_output_file, driver="GPKG")
print(f"Saved processed power plant data to {egu_output_file}")

# Create a version with just the InMAP required columns plus orispl for reference
inmap_columns = ["orispl", "site name", "VOC", "NOx", "NH3", "SOx", "PM2_5", "height", "diam", "temp", "velocity", "geometry"]
inmap_egu = egu_gdf[inmap_columns].copy()

# Save InMAP-compatible version
inmap_output_file = f"{output_dir}/processed_emissions_for_inmap.gpkg"
inmap_egu.to_file(inmap_output_file, driver="GPKG")
print(f"Saved InMAP-formatted emissions data to {inmap_output_file}")

Starting emissions data processing...
Loading CEMS facilities data...
Loaded 1343 unique CEMS facility IDs
Reading EGU CEMS data from ../data/raw/point/2022hc_cb6_22m/inputs/ptegu/egu_cems_2022_POINT_20240615_2022cems_stackfix2_23jul2024_v0.csv...
EGU data loaded. Shape: (126465, 77)
Found 1106 unique facility IDs matching CEMS plants
Reading NEI data from ../data/raw/2021_NEI_Facility_summary.csv...
NEI file read with comma delimiter
NEI data loaded. Shape: (2005169, 33)
Filtered NEI data to 46417 rows matching facility IDs
Processing emissions data...
Created GeoDataFrame with 1097 facilities
Filtered to 1069 power plant facilities
Saved all facilities data to ../data/processed/processed_all_facilities_emissions.gpkg
Saved processed power plant data to ../data/processed/processed_egu_emissions.gpkg
Saved InMAP-formatted emissions data to ../data/processed/processed_emissions_for_inmap.gpkg
