In [1]:
import pandas as pd
import requests
import json
import time
from datetime import datetime
import numpy as np
import os

In [None]:
class SmokeValidation:
    def __init__(self, email, api_key):
        self.API_BASE_URL = 'https://aqs.epa.gov/data/api'
        self.email = email
        self.api_key = api_key
        
        # Maricopa County, AZ (where Glendale is located)
        self.GLENDALE_FIPS = '04013'
        
        # Load smoke estimates
        self.smoke_data = pd.read_csv('Part_1/Step_0_and_Step_1/wildfire_analysis_results/wildfire_impact_glendale_1961_2021.csv')
        
        # PM2.5 parameter code
        self.PM25_PARAM = '88101'
        
        # EPA AQI breakpoints for PM2.5 (μg/m³)
        self.PM25_BREAKPOINTS = {
            'Good': 12.0,
            'Moderate': 35.4,
            'Unhealthy for Sensitive Groups': 55.4,
            'Unhealthy': 150.4,
            'Very Unhealthy': 250.4,
            'Hazardous': 500.4
        }
        
        # Smoke impact weights for different PM2.5 levels
        self.SMOKE_WEIGHTS = {
            'Background': (0, 12.0, 0.0),  # Little to no impact
            'Light': (12.0, 35.4, 0.2),    # Light smoke
            'Moderate': (35.4, 55.4, 0.5),  # Moderate smoke
            'Heavy': (55.4, 150.4, 1.0),    # Heavy smoke
            'Severe': (150.4, 250.4, 2.0),  # Severe smoke
            'Hazardous': (250.4, float('inf'), 4.0)  # Hazardous conditions
        }

    def get_pm25_data(self, year):
        """
        Get PM2.5 data for a specific year from the EPA API.
        
        Args:
            year (int): Year to retrieve data for
            
        Returns:
            dict: JSON response from EPA API containing PM2.5 measurements
        """
        try:
            # Set up API request parameters
            params = {
                'email': self.email,
                'key': self.api_key,
                'param': self.PM25_PARAM,
                'bdate': f'{year}0501',  # May 1st
                'edate': f'{year}1031',  # October 31st
                'state': self.GLENDALE_FIPS[:2],  # State code (04 for Arizona)
                'county': self.GLENDALE_FIPS[2:]  # County code (013 for Maricopa)
            }
            
            # Construct API endpoint URL
            endpoint = '/dailyData/byCounty'
            url = f"{self.API_BASE_URL}{endpoint}"
            
            # Add delay to respect API rate limits
            time.sleep(0.1)  # 600ms delay between requests
            
            # Make API request
            response = requests.get(url, params=params)
            response.raise_for_status()  # Raise exception for bad status codes
            
            return response.json()
            
        except requests.exceptions.RequestException as e:
            print(f"API request failed for year {year}: {str(e)}")
            return None
        except Exception as e:
            print(f"Error retrieving PM2.5 data for {year}: {str(e)}")
            return None

    def calculate_daily_smoke_score(self, pm25_value):
        """
        Calculate a daily smoke score based on PM2.5 concentration.
        Uses a weighted approach based on EPA breakpoints.
        
        Args:
            pm25_value (float): Daily PM2.5 concentration
            
        Returns:
            float: Smoke score for the day
        """
        if pd.isna(pm25_value):
            return 0.0
            
        for level, (low, high, weight) in self.SMOKE_WEIGHTS.items():
            if low <= pm25_value < high:
                # Calculate normalized score within the range
                range_position = (pm25_value - low) / (high - low)
                base_score = weight * range_position
                
                # Add cumulative impact of lower levels
                prev_impact = 0
                for prev_level, (_, prev_high, prev_weight) in self.SMOKE_WEIGHTS.items():
                    if prev_high <= low:
                        prev_impact += prev_weight
                
                return base_score + prev_impact
                
        return 0.0

    def calculate_yearly_pm25_metrics(self, aqi_data):
        """
        Calculate comprehensive PM2.5 metrics focused on smoke detection.
        Enhanced to include smoke impact scoring.
        
        Parameters:
            aqi_data (dict): JSON response from EPA API containing PM2.5 measurements
            
        Returns:
            dict: Dictionary containing calculated metrics or None if data is invalid
        """
        if not aqi_data or aqi_data.get("Header", [{}])[0].get("status") != "Success":
            return None
        
        metrics = {
            'days_with_data': 0,
            'mean_pm25': 0.0,
            'median_pm25': 0.0,
            'max_pm25': 0.0,
            'smoke_days': 0,  # days above "Moderate" threshold
            'severe_smoke_days': 0,  # days above "Unhealthy" threshold
            'pm25_90th_percentile': 0.0,
            'high_pm25_episodes': 0,  # Episodes of 2+ consecutive days above "Moderate"
            'pm25_smoke_score': 0.0,  # New metric for PM2.5-based smoke impact
            'weighted_smoke_days': 0.0  # New metric for weighted smoke days
        }
        
        # Dictionary to store daily aggregated values
        daily_measurements = {}
        
        for record in aqi_data.get("Data", []):
            if record.get('arithmetic_mean') is not None:
                value = float(record['arithmetic_mean'])
                date = record['date_local']
                
                if date in daily_measurements:
                    daily_measurements[date].append(value)
                else:
                    daily_measurements[date] = [value]
        
        if not daily_measurements:
            return metrics
        
        # Calculate daily averages and smoke scores
        daily_values = []
        daily_smoke_scores = []
        
        for date, values in sorted(daily_measurements.items()):
            daily_avg = sum(values) / len(values)
            daily_values.append(daily_avg)
            
            # Calculate smoke score for the day
            smoke_score = self.calculate_daily_smoke_score(daily_avg)
            daily_smoke_scores.append(smoke_score)
            
            # Count smoke days
            if daily_avg > self.PM25_BREAKPOINTS['Moderate']:
                metrics['smoke_days'] += 1
            
            # Count severe smoke days
            if daily_avg > self.PM25_BREAKPOINTS['Unhealthy']:
                metrics['severe_smoke_days'] += 1
        
        # Update metrics
        metrics['days_with_data'] = len(daily_measurements)
        metrics['mean_pm25'] = np.mean(daily_values)
        metrics['median_pm25'] = np.median(daily_values)
        metrics['max_pm25'] = max(daily_values)
        metrics['pm25_90th_percentile'] = np.percentile(daily_values, 90)
        
        # Calculate high PM2.5 episodes
        consecutive_days = 0
        episode_started = False
        
        for value in daily_values:
            if value > self.PM25_BREAKPOINTS['Moderate']:
                consecutive_days += 1
                if consecutive_days >= 2 and not episode_started:
                    metrics['high_pm25_episodes'] += 1
                    episode_started = True
            else:
                consecutive_days = 0
                episode_started = False
        
        # Calculate composite smoke score
        episode_multiplier = 1 + (metrics['high_pm25_episodes'] * 0.2)
        metrics['pm25_smoke_score'] = sum(daily_smoke_scores) * episode_multiplier
        
        # Calculate weighted smoke days
        metrics['weighted_smoke_days'] = len([s for s in daily_smoke_scores if s > 0])
        
        return metrics

    def validate_smoke_estimates(self, start_year=2000, end_year=2020):
        """Validate smoke estimates against PM2.5 data"""
        validation_results = []
        
        for year in range(start_year, end_year + 1):
            print(f"Processing year {year}...")
            
            # Get smoke impact score for this year
            smoke_score = self.smoke_data[self.smoke_data['year'] == year]['smoke_impact_score'].iloc[0]
            acres_burned = self.smoke_data[self.smoke_data['year'] == year]['total_acres'].iloc[0]
            
            # Get EPA data for this year
            epa_data = self.get_pm25_data(year)
            pm25_metrics = self.calculate_yearly_pm25_metrics(epa_data)
            
            if pm25_metrics:
                result = {
                    'year': year,
                    'smoke_impact_score': smoke_score,
                    'acres_burned': acres_burned,
                    **pm25_metrics
                }
                validation_results.append(result)
        
        return pd.DataFrame(validation_results)

In [3]:
def analyze_validation_results(results_df):
    """Analyze the validation results and print insights"""
    # Calculate correlations
    smoke_correlations = {
        'Mean PM2.5': results_df['mean_pm25'].corr(results_df['smoke_impact_score']),
        'Smoke Days': results_df['smoke_days'].corr(results_df['smoke_impact_score']),
        'Max PM2.5': results_df['max_pm25'].corr(results_df['smoke_impact_score']),
        '90th Percentile PM2.5': results_df['pm25_90th_percentile'].corr(results_df['smoke_impact_score']),
        'High PM2.5 Episodes': results_df['high_pm25_episodes'].corr(results_df['smoke_impact_score'])
    }
    
    return {
        'correlations': smoke_correlations,
        'data_completeness': results_df['days_with_data'].mean() / 365,
        'worst_smoke_years': results_df.nlargest(3, 'smoke_impact_score')[['year', 'smoke_impact_score', 'mean_pm25', 'smoke_days']]
    }

In [None]:
from dotenv import load_dotenv

# using dotenv to hide api keys
# Initialize validator (replace with your EPA API credentials)
EMAIL = os.getenv('EMAIL')
API_KEY = os.getenv('API_KEY')

validator = SmokeValidation(EMAIL, API_KEY)

# Validate smoke estimates for recent years
results_glendale = validator.validate_smoke_estimates(start_year=1998, end_year=2020)

# Analyze results
analysis_glendale = analyze_validation_results(results_glendale)

print("\nValidation Results:")
print(results_glendale.to_string())
print("\nAnalysis:")
print(json.dumps(analysis_glendale, indent=2, default=str))

Processing year 1998...
Processing year 1999...
Processing year 2000...
Processing year 2001...
Processing year 2002...
Processing year 2003...
Processing year 2004...
Processing year 2005...
Processing year 2006...
Processing year 2007...
Processing year 2008...
Processing year 2009...
Processing year 2010...
Processing year 2011...
Processing year 2012...
Processing year 2013...
Processing year 2014...
Processing year 2015...
Processing year 2016...
Processing year 2017...
Processing year 2018...
Processing year 2019...
Processing year 2020...

Validation Results:
    year  smoke_impact_score  acres_burned  days_with_data  mean_pm25  median_pm25   max_pm25  smoke_days  severe_smoke_days  pm25_90th_percentile  high_pm25_episodes  pm25_smoke_score  weighted_smoke_days
0   1999               40.41    2259635.49             184   9.002627     8.183333  20.000000           0                  0             13.517500                   0          0.685897                   30
1   2000       

  c /= stddev[:, None]
  c /= stddev[None, :]
