In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from skmultiflow.lazy import KNN
from skmultiflow.trees import HoeffdingTreeRegressor
from skmultiflow.ensemble import AdaptiveRandomForestRegressor
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

In [3]:
class BeijingAirQualityForecaster:
    """
    Stream forecasting system for Beijing Air Quality data.
    Predicts air pollutant concentrations (PM2.5, PM10, etc.) based on 
    previous measurements and meteorological conditions.
    """
    
    def __init__(self, target_pollutant='PM2.5', model_type='knn', 
                 window_size=168, min_samples_to_predict=24):  # 168 hours = 1 week
        """
        Initialize the forecaster.
        
        Args:
            target_pollutant: Which pollutant to predict ('PM2.5', 'PM10', 'SO2', 'NO2', 'CO', 'O3')
            model_type: 'knn', 'hoeffding_tree', or 'adaptive_rf'
            window_size: Number of hours to keep in memory (default: 1 week)
            min_samples_to_predict: Minimum samples needed before making predictions
        """
        self.target_pollutant = target_pollutant
        self.window_size = window_size
        self.min_samples_to_predict = min_samples_to_predict
        
        # Initialize model
        self.model = self._initialize_model(model_type)
        
        # Data preprocessing
        self.scaler = StandardScaler()
        self.wind_encoder = LabelEncoder()
        self.station_encoder = LabelEncoder()
        
        # Data storage
        self.data_buffer = []
        self.feature_names = None
        self.is_fitted = False
        
        # Prediction tracking
        self.predictions = []
        self.actual_values = []
        self.prediction_errors = []
        self.timestamps = []
        
        # Performance metrics
        self.mae_history = []
        self.rmse_history = []
        self.r2_history = []
        
        # Station-specific tracking
        self.station_performance = {}
        
    def _initialize_model(self, model_type):
        """Initialize the streaming model"""
        if model_type == 'knn':
            return KNN(n_neighbors=8, max_window_size=self.window_size, leaf_size=50)
        elif model_type == 'hoeffding_tree':
            return HoeffdingTreeRegressor(split_criterion='variance_reduction')
        elif model_type == 'adaptive_rf':
            return AdaptiveRandomForestRegressor(n_estimators=10, random_state=42)
        else:
            raise ValueError("model_type must be 'knn', 'hoeffding_tree', or 'adaptive_rf'")
    
    def _prepare_features(self, data_point):
        """
        Prepare features from a single data point.
        Expected columns: No, year, month, day, hour, PM2.5, PM10, SO2, NO2, CO, O3, 
                         TEMP, PRES, DEWP, RAIN, wd, WSPM, station
        """
        features = []
        
        # Time features
        features.extend([
            data_point.get('year', 2013),
            data_point.get('month', 1),
            data_point.get('day', 1),
            data_point.get('hour', 0),
            data_point.get('hour', 0) % 6,  # 4-hour period of day
            1 if data_point.get('hour', 0) >= 6 and data_point.get('hour', 0) <= 18 else 0  # day/night
        ])
        
        # Air quality features (excluding target)
        pollutants = ['PM2.5', 'PM10', 'SO2', 'NO2', 'CO', 'O3']
        for pollutant in pollutants:
            if pollutant != self.target_pollutant:
                features.append(data_point.get(pollutant, 0))
        
        # Meteorological features
        features.extend([
            data_point.get('TEMP', 0),
            data_point.get('PRES', 1013),
            data_point.get('DEWP', 0),
            data_point.get('RAIN', 0),
            data_point.get('WSPM', 0)
        ])
        
        # Wind direction (encoded)
        wind_dir = data_point.get('wd', 'N')
        if hasattr(self.wind_encoder, 'classes_'):
            try:
                wind_encoded = self.wind_encoder.transform([wind_dir])[0]
            except ValueError:
                wind_encoded = 0  # Unknown direction
        else:
            wind_encoded = 0
        features.append(wind_encoded)
        
        # Station (encoded)
        station = data_point.get('station', 'Unknown')
        if hasattr(self.station_encoder, 'classes_'):
            try:
                station_encoded = self.station_encoder.transform([station])[0]
            except ValueError:
                station_encoded = 0  # Unknown station
        else:
            station_encoded = 0
        features.append(station_encoded)
        
        return np.array(features)
    
    def _get_lagged_features(self, n_lags=5):
        """Get lagged features from recent data"""
        if len(self.data_buffer) < n_lags:
            return np.array([])
        
        lagged_values = []
        for i in range(1, n_lags + 1):
            if len(self.data_buffer) >= i:
                lagged_values.append(self.data_buffer[-i].get(self.target_pollutant, 0))
            else:
                lagged_values.append(0)
        
        return np.array(lagged_values)
    
    def process_new_point(self, data_point):
        """
        Process a new air quality data point.
        
        Args:
            data_point: Dictionary with air quality measurements
                       Expected keys: year, month, day, hour, PM2.5, PM10, SO2, NO2, 
                                    CO, O3, TEMP, PRES, DEWP, RAIN, wd, WSPM, station
        """
        current_time = datetime(
            data_point.get('year', 2013),
            data_point.get('month', 1),
            data_point.get('day', 1),
            data_point.get('hour', 0)
        )
        
        target_value = data_point.get(self.target_pollutant, 0)
        station = data_point.get('station', 'Unknown')
        
        # Handle missing target values
        if pd.isna(target_value) or target_value < 0:
            print(f"Skipping point with invalid {self.target_pollutant} value: {target_value}")
            return
        
        # Step 1: Make prediction if we have enough data
        if len(self.data_buffer) >= self.min_samples_to_predict:
            try:
                # Prepare features
                current_features = self._prepare_features(data_point)
                lagged_features = self._get_lagged_features(n_lags=5)
                
                if len(lagged_features) > 0:
                    full_features = np.concatenate([current_features, lagged_features])
                else:
                    full_features = current_features
                
                # Prepare training data from buffer
                if not self.is_fitted:
                    self._fit_initial_model()
                
                # Make prediction
                if hasattr(self.model, 'predict') and self.is_fitted:
                    prediction = self.model.predict(full_features.reshape(1, -1))[0]
                    
                    # Calculate error
                    error = abs(prediction - target_value)
                    relative_error = error / max(target_value, 1.0) * 100
                    
                    # Store prediction results
                    self.predictions.append(prediction)
                    self.actual_values.append(target_value)
                    self.prediction_errors.append(error)
                    self.timestamps.append(current_time)
                    
                    # Update performance metrics
                    if len(self.predictions) >= 2:
                        mae = mean_absolute_error(self.actual_values, self.predictions)
                        rmse = np.sqrt(mean_squared_error(self.actual_values, self.predictions))
                        r2 = r2_score(self.actual_values, self.predictions) if len(set(self.actual_values)) > 1 else 0
                        
                        self.mae_history.append(mae)
                        self.rmse_history.append(rmse)
                        self.r2_history.append(r2)
                    
                    # Update station-specific performance
                    if station not in self.station_performance:
                        self.station_performance[station] = {'errors': [], 'predictions': [], 'actual': []}
                    
                    self.station_performance[station]['errors'].append(error)
                    self.station_performance[station]['predictions'].append(prediction)
                    self.station_performance[station]['actual'].append(target_value)
                    
                    print(f"{current_time.strftime('%Y-%m-%d %H:%00')} | Station: {station} | "
                          f"Predicted: {prediction:.1f} | Actual: {target_value:.1f} | "
                          f"Error: {error:.1f} ({relative_error:.1f}%)")
                
            except Exception as e:
                print(f"Prediction error: {e}")
        
        # Step 2: Update buffer with new point
        self.data_buffer.append(data_point)
        
        # Maintain sliding window
        if len(self.data_buffer) > self.window_size:
            self.data_buffer.pop(0)
        
        # Step 3: Update model incrementally
        if self.is_fitted and len(self.data_buffer) > self.min_samples_to_predict:
            try:
                current_features = self._prepare_features(data_point)
                lagged_features = self._get_lagged_features(n_lags=5)
                
                if len(lagged_features) > 0:
                    full_features = np.concatenate([current_features, lagged_features])
                    
                    # Scale features
                    if hasattr(self.scaler, 'mean_'):
                        scaled_features = self.scaler.transform(full_features.reshape(1, -1))
                    else:
                        scaled_features = full_features.reshape(1, -1)
                    
                    # Incremental learning
                    self.model.partial_fit(scaled_features, [target_value])
            except Exception as e:
                print(f"Model update error: {e}")
    
    def _fit_initial_model(self):
        """Fit the initial model using buffered data"""
        if len(self.data_buffer) < self.min_samples_to_predict:
            return
        
        try:
            # Prepare training data
            X_train = []
            y_train = []
            
            for i, point in enumerate(self.data_buffer[5:]):  # Skip first 5 for lag features
                features = self._prepare_features(point)
                
                # Get lagged features from previous points
                lagged_values = []
                for lag in range(1, 6):  # 5 lags
                    if i + 5 - lag >= 0:
                        lagged_values.append(self.data_buffer[i + 5 - lag].get(self.target_pollutant, 0))
                    else:
                        lagged_values.append(0)
                
                full_features = np.concatenate([features, np.array(lagged_values)])
                target_value = point.get(self.target_pollutant, 0)
                
                if not pd.isna(target_value) and target_value >= 0:
                    X_train.append(full_features)
                    y_train.append(target_value)
            
            if len(X_train) > 0:
                X_train = np.array(X_train)
                y_train = np.array(y_train)
                
                # Fit encoders
                wind_dirs = [point.get('wd', 'N') for point in self.data_buffer]
                stations = [point.get('station', 'Unknown') for point in self.data_buffer]
                
                self.wind_encoder.fit(wind_dirs)
                self.station_encoder.fit(stations)
                
                # Scale features
                self.scaler.fit(X_train)
                X_train_scaled = self.scaler.transform(X_train)
                
                # Train model
                self.model.partial_fit(X_train_scaled, y_train)
                self.is_fitted = True
                
                print(f"Initial model fitted with {len(X_train)} samples")
        
        except Exception as e:
            print(f"Initial model fitting error: {e}")
    
    def get_performance_summary(self):
        """Get overall performance summary"""
        if len(self.predictions) < 2:
            return "Not enough predictions for performance summary"
        
        mae = mean_absolute_error(self.actual_values, self.predictions)
        rmse = np.sqrt(mean_squared_error(self.actual_values, self.predictions))
        r2 = r2_score(self.actual_values, self.predictions) if len(set(self.actual_values)) > 1 else 0
        
        summary = f"""
                    === Performance Summary for {self.target_pollutant} Forecasting ===
                    Total Predictions: {len(self.predictions)}
                    Mean Absolute Error (MAE): {mae:.2f} µg/m³
                    Root Mean Square Error (RMSE): {rmse:.2f} µg/m³
                    R² Score: {r2:.3f}
                    Average Error: {np.mean(self.prediction_errors):.2f} µg/m³
                    Median Error: {np.median(self.prediction_errors):.2f} µg/m³

                    Station-specific Performance:"""
        
        for station, perf in self.station_performance.items():
            if len(perf['errors']) > 0:
                station_mae = np.mean(perf['errors'])
                summary += f"\n  {station}: MAE = {station_mae:.2f} µg/m³ ({len(perf['errors'])} predictions)"
        
        return summary
    
    def plot_results(self, last_n_hours=168):  # Last week by default
        """Plot forecasting results"""
        if len(self.predictions) < 2:
            print("Not enough predictions to plot")
            return
        
        # Limit to last N hours
        start_idx = max(0, len(self.predictions) - last_n_hours)
        recent_times = self.timestamps[start_idx:]
        recent_predictions = self.predictions[start_idx:]
        recent_actual = self.actual_values[start_idx:]
        recent_errors = self.prediction_errors[start_idx:]
        
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))
        
        # Plot 1: Time series comparison
        ax1.plot(recent_times, recent_actual, 'b-', label='Actual', alpha=0.8, linewidth=1.5)
        ax1.plot(recent_times, recent_predictions, 'r--', label='Predicted', alpha=0.8, linewidth=1.5)
        ax1.set_title(f'{self.target_pollutant} Forecasting: Actual vs Predicted')
        ax1.set_xlabel('Time')
        ax1.set_ylabel(f'{self.target_pollutant} Concentration (µg/m³)')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        ax1.tick_params(axis='x', rotation=45)
        
        # Plot 2: Prediction errors over time
        ax2.plot(recent_times, recent_errors, 'g-', alpha=0.7)
        ax2.axhline(y=np.mean(recent_errors), color='red', linestyle='--', 
                   label=f'Mean Error: {np.mean(recent_errors):.1f}')
        ax2.set_title('Prediction Errors Over Time')
        ax2.set_xlabel('Time')
        ax2.set_ylabel('Absolute Error (µg/m³)')
        ax2.legend()
        ax2.grid(True, alpha=0.3)
        ax2.tick_params(axis='x', rotation=45)
        
        # Plot 3: Scatter plot
        ax3.scatter(recent_actual, recent_predictions, alpha=0.6, s=20)
        min_val = min(min(recent_actual), min(recent_predictions))
        max_val = max(max(recent_actual), max(recent_predictions))
        ax3.plot([min_val, max_val], [min_val, max_val], 'r--', alpha=0.8)
        ax3.set_xlabel(f'Actual {self.target_pollutant} (µg/m³)')
        ax3.set_ylabel(f'Predicted {self.target_pollutant} (µg/m³)')
        ax3.set_title('Predicted vs Actual Values')
        ax3.grid(True, alpha=0.3)
        
        # Plot 4: Performance metrics over time
        if len(self.mae_history) > 1:
            recent_mae = self.mae_history[start_idx:] if len(self.mae_history) > start_idx else self.mae_history
            recent_r2 = self.r2_history[start_idx:] if len(self.r2_history) > start_idx else self.r2_history
            
            ax4_twin = ax4.twinx()
            line1 = ax4.plot(range(len(recent_mae)), recent_mae, 'purple', label='MAE')
            line2 = ax4_twin.plot(range(len(recent_r2)), recent_r2, 'orange', label='R²')
            
            ax4.set_xlabel('Time Period')
            ax4.set_ylabel('MAE (µg/m³)', color='purple')
            ax4_twin.set_ylabel('R² Score', color='orange')
            ax4.set_title('Performance Metrics Over Time')
            
            lines = line1 + line2
            labels = [l.get_label() for l in lines]
            ax4.legend(lines, labels, loc='upper left')
            ax4.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()



In [None]:
# Example usage and data loading functions
def load_beijing_data(file_path):
    """Load Beijing Air Quality dataset"""
    try:
        df = pd.read_csv(file_path)
        
        # Clean column names
        df.columns = df.columns.str.strip()
        
        # Handle missing values
        numeric_columns = ['PM2.5', 'PM10', 'SO2', 'NO2', 'CO', 'O3', 'TEMP', 'PRES', 'DEWP', 'RAIN', 'WSPM']
        for col in numeric_columns:
            if col in df.columns:
                df[col] = pd.to_numeric(df[col], errors='coerce')
        
        # Sort by time
        df = df.sort_values(['station', 'year', 'month', 'day', 'hour']).reset_index(drop=True)
        
        print(f"Loaded {len(df)} records from {df['station'].nunique()} stations")
        print(f"Time range: {df['year'].min()}-{df['month'].min():02d} to {df['year'].max()}-{df['month'].max():02d}")
        print(f"Available pollutants: {[col for col in numeric_columns if col in df.columns]}")
        
        return df
        
    except Exception as e:
        print(f"Error loading data: {e}")
        return None

def simulate_streaming_data(df, forecaster, start_idx=0, n_points=1000):
    """Simulate streaming data arrival"""
    print(f"Starting streaming simulation with {n_points} points...")
    print("=" * 70)
    
    end_idx = min(start_idx + n_points, len(df))
    
    for i in range(start_idx, end_idx):
        data_point = df.iloc[i].to_dict()
        forecaster.process_new_point(data_point)
        
        # Print summary every 24 hours
        if (i - start_idx + 1) % 24 == 0:
            hours_processed = i - start_idx + 1
            if len(forecaster.predictions) > 0:
                recent_mae = np.mean(forecaster.prediction_errors[-24:]) if len(forecaster.prediction_errors) >= 24 else np.mean(forecaster.prediction_errors)
                print(f"\n--- After {hours_processed} hours ---")
                print(f"Recent 24h Average Error: {recent_mae:.2f} µg/m³")
    
    return forecaster


In [None]:
# Example usage
if __name__ == "__main__":
    # Initialize forecaster for PM2.5 prediction
    forecaster = BeijingAirQualityForecaster(
        target_pollutant='PM2.5',
        model_type='knn',  # or 'hoeffding_tree', 'adaptive_rf'
        window_size=168,   # 1 week of hourly data
        min_samples_to_predict=24  # 1 day minimum
    )
    
    # To use with real data, uncomment these lines:
    # df = load_beijing_data('path/to/your/beijing_air_quality.csv')
    # if df is not None:
    #     forecaster = simulate_streaming_data(df, forecaster, start_idx=1000, n_points=500)
    #     print(forecaster.get_performance_summary())
    #     forecaster.plot_results(last_n_hours=168)
    
    print("Beijing Air Quality Forecaster initialized!")
    print("To use with your data:")
    print("1. df = load_beijing_data('your_file.csv')")
    print("2. forecaster = simulate_streaming_data(df, forecaster, start_idx=1000, n_points=500)")
    print("3. forecaster.plot_results()")