# Air Quality Time Series Prediction with Gray Model (GM(1,1))

This notebook implements **Gray Model (GM(1,1))** for time series forecasting of PM 2.5 air quality.

## Key Concepts:
- **Gray Model (GM(1,1))**: A grey system theory model ideal for small datasets
  - **GM(1,1)**: First-order, one-variable grey model
  - Uses Accumulated Generating Operation (AGO) to build the model
  - Excellent for small sample sizes (typically 4-10 data points minimum)
- **Sliding Windows**: Creating features from past time steps for forecasting
- **Time Series Forecasting**: Using historical patterns to predict future PM 2.5 levels


In [None]:
# Import necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from scipy import stats
from scipy.stats import jarque_bera, probplot
from statsmodels.stats.stattools import durbin_watson
from statsmodels.stats.diagnostic import acorr_ljungbox
import warnings
warnings.filterwarnings('ignore')

# Set style for better plots
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")


## 1. Load and Explore the Data

**Note**: Gray Model works best with small datasets. We'll use a subset of the data.


In [None]:
# Load the dataset
df = pd.read_csv('Real_Combine.csv')

print("Full Dataset Shape:", df.shape)
print("\nFirst few rows:")
df.head()


In [None]:
# Handle missing values
df = df.dropna()
df = df.reset_index(drop=True)

# For Gray Model, we'll use a smaller subset since it's designed for small datasets
# Let's take the first 200-300 points for better GM performance
subset_size = 250
df_small = df.head(subset_size).copy()

print(f"Using subset of data: {len(df_small)} samples (optimal for Gray Model)")
print(f"\nData shape: {df_small.shape}")
print(f"\nMissing values: {df_small.isnull().sum().sum()}")
print(f"\nData statistics:")
print(df_small['PM 2.5'].describe())


## 2. Time Series Visualization


In [None]:
# Visualize the PM 2.5 time series
plt.figure(figsize=(14, 6))
plt.plot(df_small.index, df_small['PM 2.5'], linewidth=1.5, color='steelblue', marker='o', markersize=3)
plt.title('PM 2.5 Time Series (Subset for Gray Model)', fontsize=16, fontweight='bold')
plt.xlabel('Time Index', fontsize=12)
plt.ylabel('PM 2.5', fontsize=12)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()


## 3. Gray Model (GM(1,1)) Implementation

The Gray Model uses:
1. **Accumulated Generating Operation (AGO)**: Creates cumulative sum
2. **Grey Differential Equation**: Models the AGO sequence
3. **Inverse AGO (IAGO)**: Converts back to original scale


In [None]:
class GrayModel:
    """
    Gray Model (GM(1,1)) implementation for time series forecasting
    """
    
    def __init__(self):
        self.a = None  # Development coefficient
        self.b = None  # Grey input
        self.x0 = None  # Original sequence
        self.x1 = None  # AGO sequence
        
    def fit(self, x0):
        """
        Fit the Gray Model
        
        Parameters:
        -----------
        x0 : array-like
            Original time series data
        """
        x0 = np.array(x0).flatten()
        self.x0 = x0
        
        # Step 1: Accumulated Generating Operation (AGO)
        self.x1 = np.cumsum(x0)
        
        # Step 2: Build background values (mean of consecutive AGO values)
        n = len(x0)
        z1 = np.zeros(n - 1)
        for i in range(n - 1):
            z1[i] = 0.5 * (self.x1[i] + self.x1[i + 1])
        
        # Step 3: Build matrices for least squares
        B = np.zeros((n - 1, 2))
        Y = np.zeros(n - 1)
        
        for i in range(n - 1):
            B[i, 0] = -z1[i]
            B[i, 1] = 1
            Y[i] = x0[i + 1]
        
        # Step 4: Solve for parameters using least squares
        # [a, b]^T = (B^T * B)^(-1) * B^T * Y
        try:
            params = np.linalg.lstsq(B, Y, rcond=None)[0]
            self.a = params[0]
            self.b = params[1]
        except:
            # Fallback if matrix is singular
            self.a = -0.01
            self.b = x0.mean()
        
        return self
    
    def predict(self, steps=1, start=0):
        """
        Predict future values (in-sample)
        
        Parameters:
        -----------
        steps : int
            Number of steps ahead to predict
        start : int
            Starting index for prediction
        
        Returns:
        --------
        predictions : array
            Predicted values
        """
        if self.a is None or self.b is None:
            raise ValueError("Model must be fitted before prediction")
        
        predictions = []
        
        for i in range(steps):
            k = start + i + 1  # k is 1-indexed in GM(1,1) formula
            # GM(1,1) AGO prediction: x^(1)(k) = (x^(0)(1) - b/a) * exp(-a*(k-1)) + b/a
            x1_k = (self.x0[0] - self.b / self.a) * np.exp(-self.a * (k - 1)) + self.b / self.a
            
            if k == 1:
                # First value is the original first value
                pred = self.x0[0]
            else:
                # IAGO: x^(0)(k) = x^(1)(k) - x^(1)(k-1)
                x1_k_minus_1 = (self.x0[0] - self.b / self.a) * np.exp(-self.a * (k - 2)) + self.b / self.a
                pred = x1_k - x1_k_minus_1
            
            predictions.append(pred)
        
        return np.array(predictions)
    
    def forecast(self, steps=1):
        """
        Forecast future values (out-of-sample)
        
        Returns:
        --------
        forecasts : array
            Forecasted values
        """
        if self.a is None or self.b is None:
            raise ValueError("Model must be fitted before forecasting")
        
        n = len(self.x0)
        forecasts = []
        
        for i in range(steps):
            k = n + i + 1  # k is 1-indexed, starting from n+1
            # GM(1,1) AGO forecast: x^(1)(k) = (x^(0)(1) - b/a) * exp(-a*(k-1)) + b/a
            x1_k = (self.x0[0] - self.b / self.a) * np.exp(-self.a * (k - 1)) + self.b / self.a
            
            # IAGO: x^(0)(k) = x^(1)(k) - x^(1)(k-1)
            if k == n + 1:
                # First forecast uses last known AGO value
                x1_k_minus_1 = self.x1[-1]
            else:
                x1_k_minus_1 = (self.x0[0] - self.b / self.a) * np.exp(-self.a * (k - 2)) + self.b / self.a
            
            forecast = x1_k - x1_k_minus_1
            forecasts.append(forecast)
        
        return np.array(forecasts)

print("Gray Model class implemented successfully!")


## 4. Sliding Window Function

Create sliding windows for time series prediction using Gray Model.


In [None]:
def create_sliding_windows_gm(data, window_size, forecast_horizon=1):
    """
    Create sliding windows for Gray Model time series prediction.
    
    Parameters:
    -----------
    data : array-like
        Time series data
    window_size : int
        Number of past time steps to use as features (window size)
    forecast_horizon : int
        Number of steps ahead to predict (default=1)
    
    Returns:
    --------
    windows : list
        List of window arrays
    targets : list
        List of target arrays
    """
    windows = []
    targets = []
    
    data = np.array(data).flatten()
    n_samples = len(data)
    
    # Create sliding windows
    for i in range(window_size, n_samples - forecast_horizon + 1):
        # Features: past window_size time steps
        window = data[i-window_size:i]
        # Target: future forecast_horizon time steps
        target = data[i:i+forecast_horizon]
        
        windows.append(window)
        targets.append(target)
    
    return windows, targets

print("Sliding window function for Gray Model created successfully!")


## 5. Prepare Data for Sliding Windows


In [None]:
# Extract PM 2.5 time series
ts = df_small['PM 2.5'].values

print(f"Time series length: {len(ts)}")
print(f"Time series statistics:")
print(f"  Mean: {ts.mean():.4f}")
print(f"  Std: {ts.std():.4f}")
print(f"  Min: {ts.min():.4f}")
print(f"  Max: {ts.max():.4f}")


## 6. Create Sliding Windows

We'll experiment with different window sizes. For Gray Model, smaller windows (5-15) work well.


In [None]:
# Define window size (number of past time steps to use)
# For Gray Model, smaller windows (5-10) are typically used
window_size = 8  # Using 8 past time steps
forecast_horizon = 1  # Predicting 1 step ahead

# Create sliding windows
windows, targets = create_sliding_windows_gm(ts, window_size=window_size, forecast_horizon=forecast_horizon)

print(f"Number of windows created: {len(windows)}")
print(f"Window size: {window_size}")
print(f"Forecast horizon: {forecast_horizon}")
print(f"\nExample window (first window):")
print(f"  Values: {windows[0]}")
print(f"  Target: {targets[0]}")
print(f"\nThis means:")
print(f"  - We use data from time steps 0 to {window_size-1} to predict time step {window_size}")
print(f"  - We use data from time steps 1 to {window_size} to predict time step {window_size+1}")
print(f"  - And so on...")


## 7. Train-Test Split for Time Series

**Important**: For time series, we must use chronological split (not random) to preserve temporal order.


In [None]:
# Time series split: use first 80% for training, last 20% for testing
# IMPORTANT: Both train and test sets use sliding windows!
split_index = int(len(windows) * 0.8)

train_windows = windows[:split_index]
test_windows = windows[split_index:]
train_targets = targets[:split_index]
test_targets = targets[split_index:]

print(f"Training set: {len(train_windows)} windows (each with {window_size} past time steps)")
print(f"Test set: {len(test_windows)} windows (each with {window_size} past time steps)")
print(f"\nTraining period: windowed samples 0 to {split_index-1}")
print(f"Test period: windowed samples {split_index} to {len(windows)-1}")
print(f"\n✓ Both training and testing use sliding windows with window_size={window_size}")


## 8. Train Gray Model on Sliding Windows


In [None]:
# Train Gray Model on each training window and make predictions
train_predictions = []
train_actuals = []

print("Training Gray Model on sliding windows...")
for i, window in enumerate(train_windows):
    try:
        # Fit Gray Model on this window
        gm = GrayModel()
        gm.fit(window)
        
        # Predict the next value (in-sample)
        # For in-sample, we predict the value at position len(window) using start=len(window)-1
        pred = gm.predict(steps=1, start=len(window)-1)[0]
        train_predictions.append(pred)
        train_actuals.append(train_targets[i][0])
    except Exception as e:
        # If model fails, use mean of window
        train_predictions.append(np.mean(window))
        train_actuals.append(train_targets[i][0])

train_predictions = np.array(train_predictions)
train_actuals = np.array(train_actuals)

print(f"Training completed!")
print(f"Number of training predictions: {len(train_predictions)}")
print(f"Model parameters (last window):")
if len(train_windows) > 0:
    try:
        gm_last = GrayModel()
        gm_last.fit(train_windows[-1])
        print(f"  a (development coefficient): {gm_last.a:.6f}")
        print(f"  b (grey input): {gm_last.b:.6f}")
    except:
        print("  Could not extract parameters")


## 9. Make Predictions on Test Set


In [None]:
# Make predictions on test set using sliding windows
test_predictions = []
test_actuals = []

print("Making predictions on test set...")
for i, window in enumerate(test_windows):
    try:
        # Fit Gray Model on this window
        gm = GrayModel()
        gm.fit(window)
        
        # Forecast the next value (out-of-sample)
        forecast = gm.forecast(steps=1)[0]
        test_predictions.append(forecast)
        test_actuals.append(test_targets[i][0])
    except Exception as e:
        # If model fails, use mean of window
        test_predictions.append(np.mean(window))
        test_actuals.append(test_targets[i][0])

test_predictions = np.array(test_predictions)
test_actuals = np.array(test_actuals)

print(f"Predictions completed!")
print(f"Number of test predictions: {len(test_predictions)}")
print(f"\nFirst few predictions:")
for i in range(min(5, len(test_predictions))):
    print(f"  Actual: {test_actuals[i]:.4f}, Predicted: {test_predictions[i]:.4f}")


## 10. Model Evaluation


In [None]:
# Calculate evaluation metrics
train_mae = mean_absolute_error(train_actuals, train_predictions)
train_mse = mean_squared_error(train_actuals, train_predictions)
train_rmse = np.sqrt(train_mse)
train_r2 = r2_score(train_actuals, train_predictions)

test_mae = mean_absolute_error(test_actuals, test_predictions)
test_mse = mean_squared_error(test_actuals, test_predictions)
test_rmse = np.sqrt(test_mse)
test_r2 = r2_score(test_actuals, test_predictions)

# Display results
print("=" * 60)
print("MODEL EVALUATION METRICS")
print("=" * 60)
print(f"\n{'Metric':<20} {'Training':<20} {'Test':<20}")
print("-" * 60)
print(f"{'R² Score':<20} {train_r2:<20.4f} {test_r2:<20.4f}")
print(f"{'MAE':<20} {train_mae:<20.4f} {test_mae:<20.4f}")
print(f"{'MSE':<20} {train_mse:<20.4f} {test_mse:<20.4f}")
print(f"{'RMSE':<20} {train_rmse:<20.4f} {test_rmse:<20.4f}")
print("=" * 60)


## 10.5. Residual Diagnostic Plots

**Checking the Four Key Assumptions of Regression:**

1. **Linearity**: The relationship between predictors and response is linear
2. **Independence**: Residuals are independent (especially important for time series)
3. **Homoscedasticity**: Constant variance of residuals across all fitted values
4. **Normality**: Residuals are normally distributed


In [None]:
# Calculate residuals for both train and test sets
train_residuals = train_actuals - train_predictions
test_residuals = test_actuals - test_predictions

# Combine for overall analysis
all_residuals = np.concatenate([train_residuals, test_residuals])
all_fitted = np.concatenate([train_predictions, test_predictions])
all_order = np.arange(len(all_residuals))

print("Residuals calculated for diagnostic analysis")
print(f"Train residuals: mean={np.mean(train_residuals):.4f}, std={np.std(train_residuals):.4f}")
print(f"Test residuals: mean={np.mean(test_residuals):.4f}, std={np.std(test_residuals):.4f}")


In [None]:
# Create comprehensive residual diagnostic plots
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('Residual Diagnostic Plots for Gray Model (GM(1,1))', 
             fontsize=16, fontweight='bold', y=0.995)

# ============================================
# 1. Normal Probability Plot (Q-Q Plot) - Checking NORMALITY
# ============================================
probplot(all_residuals, dist="norm", plot=axes[0, 0])
axes[0, 0].set_title('Normal Probability Plot of Residuals\n(Checking Normality)', 
                     fontsize=12, fontweight='bold')
axes[0, 0].set_xlabel('Theoretical Quantiles', fontsize=10)
axes[0, 0].set_ylabel('Sample Quantiles (Residuals)', fontsize=10)
axes[0, 0].grid(True, alpha=0.3)

# Perform normality tests
if len(all_residuals) > 0:
    shapiro_stat, shapiro_p = stats.shapiro(all_residuals[:5000]) if len(all_residuals) > 5000 else stats.shapiro(all_residuals)
    jb_stat, jb_p = jarque_bera(all_residuals)
    
    axes[0, 0].text(0.05, 0.95, 
                    f'Shapiro-Wilk p-value: {shapiro_p:.4f}\nJarque-Bera p-value: {jb_p:.4f}',
                    transform=axes[0, 0].transAxes,
                    verticalalignment='top',
                    bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5),
                    fontsize=9)
    
    if shapiro_p > 0.05:
        axes[0, 0].text(0.95, 0.05, '✓ Normality OK', 
                       transform=axes[0, 0].transAxes,
                       verticalalignment='bottom',
                       horizontalalignment='right',
                       bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.7),
                       fontsize=10, fontweight='bold')
    else:
        axes[0, 0].text(0.95, 0.05, '✗ Normality Violated', 
                       transform=axes[0, 0].transAxes,
                       verticalalignment='bottom',
                       horizontalalignment='right',
                       bbox=dict(boxstyle='round', facecolor='lightcoral', alpha=0.7),
                       fontsize=10, fontweight='bold')

# ============================================
# 2. Residuals vs Fitted Values - Checking LINEARITY & HOMOSCEDASTICITY
# ============================================
scatter1 = axes[0, 1].scatter(all_fitted, all_residuals, alpha=0.6, s=30, c=all_order, 
                              cmap='viridis', edgecolors='black', linewidth=0.5)
axes[0, 1].axhline(y=0, color='red', linestyle='--', linewidth=2, label='Zero Residual Line')
axes[0, 1].set_title('Residuals vs Fitted Values\n(Checking Linearity & Homoscedasticity)', 
                     fontsize=12, fontweight='bold')
axes[0, 1].set_xlabel('Fitted Values (Predicted PM 2.5)', fontsize=10)
axes[0, 1].set_ylabel('Residuals (Actual - Predicted)', fontsize=10)
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# Levene's test for equal variances
try:
    n_regions = 5
    fitted_sorted_idx = np.argsort(all_fitted)
    region_size = len(all_fitted) // n_regions
    region_groups = []
    for i in range(n_regions):
        start_idx = i * region_size
        end_idx = (i + 1) * region_size if i < n_regions - 1 else len(all_fitted)
        region_groups.append(all_residuals[fitted_sorted_idx[start_idx:end_idx]])
    
    levene_stat, levene_p = stats.levene(*region_groups)
    axes[0, 1].text(0.05, 0.95, 
                   f'Levene Test p-value: {levene_p:.4f}',
                   transform=axes[0, 1].transAxes,
                   verticalalignment='top',
                   bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5),
                   fontsize=9)
    
    if levene_p > 0.05:
        axes[0, 1].text(0.95, 0.05, '✓ Homoscedasticity OK', 
                       transform=axes[0, 1].transAxes,
                       verticalalignment='bottom',
                       horizontalalignment='right',
                       bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.7),
                       fontsize=10, fontweight='bold')
    else:
        axes[0, 1].text(0.95, 0.05, '✗ Heteroscedasticity Detected', 
                       transform=axes[0, 1].transAxes,
                       verticalalignment='bottom',
                       horizontalalignment='right',
                       bbox=dict(boxstyle='round', facecolor='lightcoral', alpha=0.7),
                       fontsize=10, fontweight='bold')
except:
    levene_p = None

# ============================================
# 3. Histogram of Residuals - Checking NORMALITY
# ============================================
n, bins, patches = axes[1, 0].hist(all_residuals, bins=30, edgecolor='black', 
                                   linewidth=1.2, alpha=0.7, color='steelblue', density=True)
axes[1, 0].axvline(x=0, color='red', linestyle='--', linewidth=2, label='Zero Line')

# Overlay normal distribution
mu, sigma = np.mean(all_residuals), np.std(all_residuals)
x_norm = np.linspace(all_residuals.min(), all_residuals.max(), 100)
y_norm = stats.norm.pdf(x_norm, mu, sigma)
axes[1, 0].plot(x_norm, y_norm, 'r-', linewidth=2, label=f'Normal(μ={mu:.2f}, σ={sigma:.2f})')
axes[1, 0].set_title('Histogram of Residuals\n(Checking Normality)', 
                     fontsize=12, fontweight='bold')
axes[1, 0].set_xlabel('Residuals', fontsize=10)
axes[1, 0].set_ylabel('Density', fontsize=10)
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# Add skewness and kurtosis
skewness = stats.skew(all_residuals)
kurtosis = stats.kurtosis(all_residuals)
axes[1, 0].text(0.95, 0.95, 
               f'Skewness: {skewness:.3f}\nKurtosis: {kurtosis:.3f}\n(Normal: 0, 0)',
               transform=axes[1, 0].transAxes,
               verticalalignment='top',
               horizontalalignment='right',
               bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5),
               fontsize=9)

# ============================================
# 4. Residuals vs Order - Checking INDEPENDENCE (Critical for Time Series!)
# ============================================
axes[1, 1].scatter(all_order, all_residuals, alpha=0.6, s=30, c=all_order, 
                  cmap='viridis', edgecolors='black', linewidth=0.5)
axes[1, 1].plot(all_order, all_residuals, 'b-', alpha=0.3, linewidth=1, label='Residuals')
axes[1, 1].axhline(y=0, color='red', linestyle='--', linewidth=2, label='Zero Residual Line')
axes[1, 1].set_title('Residuals vs Observation Order\n(Checking Independence/Autocorrelation)', 
                     fontsize=12, fontweight='bold')
axes[1, 1].set_xlabel('Observation Order (Time Index)', fontsize=10)
axes[1, 1].set_ylabel('Residuals', fontsize=10)
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

# Test for autocorrelation (Durbin-Watson test)
try:
    dw_stat = durbin_watson(all_residuals)
    axes[1, 1].text(0.05, 0.95, 
                   f'Durbin-Watson: {dw_stat:.4f}\n(2.0 = no autocorr)',
                   transform=axes[1, 1].transAxes,
                   verticalalignment='top',
                   bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5),
                   fontsize=9)
    
    # Durbin-Watson interpretation: close to 2 = no autocorrelation
    if 1.5 < dw_stat < 2.5:
        axes[1, 1].text(0.95, 0.05, '✓ Independence OK', 
                       transform=axes[1, 1].transAxes,
                       verticalalignment='bottom',
                       horizontalalignment='right',
                       bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.7),
                       fontsize=10, fontweight='bold')
    else:
        axes[1, 1].text(0.95, 0.05, '✗ Autocorrelation Detected', 
                       transform=axes[1, 1].transAxes,
                       verticalalignment='bottom',
                       horizontalalignment='right',
                       bbox=dict(boxstyle='round', facecolor='lightcoral', alpha=0.7),
                       fontsize=10, fontweight='bold')
except ImportError:
    dw_stat = None
    # Alternative: Ljung-Box test
    try:
        lb_stat, lb_p = acorr_ljungbox(all_residuals, lags=10, return_df=False)
        axes[1, 1].text(0.05, 0.95, 
                       f'Ljung-Box p-value: {lb_p[-1]:.4f}',
                       transform=axes[1, 1].transAxes,
                       verticalalignment='top',
                       bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5),
                       fontsize=9)
    except:
        autocorr = np.corrcoef(all_residuals[:-1], all_residuals[1:])[0, 1]
        axes[1, 1].text(0.05, 0.95, 
                       f'Lag-1 Autocorr: {autocorr:.4f}',
                       transform=axes[1, 1].transAxes,
                       verticalalignment='top',
                       bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5),
                       fontsize=9)

plt.tight_layout()
plt.show()

print("\n" + "="*70)
print("RESIDUAL DIAGNOSTIC SUMMARY")
print("="*70)
print("\n1. NORMALITY:")
print(f"   - Mean of residuals: {np.mean(all_residuals):.4f} (should be ~0)")
print(f"   - Skewness: {skewness:.4f} (should be ~0 for normality)")
print(f"   - Kurtosis: {kurtosis:.4f} (should be ~0 for normality)")
print(f"   - Shapiro-Wilk p-value: {shapiro_p:.4f} (>0.05 indicates normality)")

print("\n2. LINEARITY & HOMOSCEDASTICITY:")
print(f"   - Residuals should be randomly scattered around zero")
print(f"   - No clear patterns or funnel shapes in residuals vs fitted plot")
if levene_p is not None:
    print(f"   - Levene test p-value: {levene_p:.4f} (>0.05 indicates constant variance)")
else:
    print("   - Levene test p-value: N/A (test not performed)")

print("\n3. INDEPENDENCE (Critical for Time Series):")
if dw_stat is not None:
    print(f"   - Durbin-Watson statistic: {dw_stat:.4f} (close to 2.0 = no autocorrelation)")
    if 1.5 < dw_stat < 2.5:
        print("   - ✓ No significant autocorrelation detected")
    else:
        print("   - ✗ Autocorrelation detected - model may not capture temporal dependencies")
else:
    print("   - Check the residuals vs order plot for patterns")

print("\n" + "="*70)


## 11. Visualize Predictions


In [None]:
# Plot predictions vs actual
fig, axes = plt.subplots(2, 1, figsize=(15, 10))

# Plot 1: Full time series with predictions
axes[0].plot(range(len(train_actuals)), train_actuals, label='Actual (Train)', color='blue', alpha=0.7, linewidth=1.5)
axes[0].plot(range(len(train_actuals)), train_predictions, label='Predicted (Train)', color='green', alpha=0.7, linewidth=1.5)
axes[0].plot(range(len(train_actuals), len(train_actuals) + len(test_actuals)), test_actuals, 
             label='Actual (Test)', color='red', alpha=0.7, linewidth=1.5)
axes[0].plot(range(len(train_actuals), len(train_actuals) + len(test_actuals)), test_predictions, 
             label='Predicted (Test)', color='orange', alpha=0.7, linewidth=1.5)
axes[0].axvline(x=len(train_actuals), color='black', linestyle='--', linewidth=2, label='Train/Test Split')
axes[0].set_title('Gray Model Time Series Predictions: Actual vs Predicted', fontsize=14, fontweight='bold')
axes[0].set_xlabel('Time Index', fontsize=12)
axes[0].set_ylabel('PM 2.5', fontsize=12)
axes[0].legend(loc='best')
axes[0].grid(True, alpha=0.3)

# Plot 2: Scatter plot for test set
axes[1].scatter(test_actuals, test_predictions, alpha=0.6, s=50)
axes[1].plot([test_actuals.min(), test_actuals.max()], [test_actuals.min(), test_actuals.max()], 
             'r--', lw=2, label='Perfect Prediction')
axes[1].set_xlabel('Actual PM 2.5', fontsize=12)
axes[1].set_ylabel('Predicted PM 2.5', fontsize=12)
axes[1].set_title(f'Test Set: Actual vs Predicted (R² = {test_r2:.4f})', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()


## 12. Experiment with Different Window Sizes

Let's test different window sizes to find the optimal one for Gray Model.


In [None]:
# Test different window sizes (smaller windows for Gray Model)
window_sizes = [5, 6, 7, 8, 10, 12, 15]
results = []

for ws in window_sizes:
    try:
        # Create windows with this size
        windows_ws, targets_ws = create_sliding_windows_gm(ts, window_size=ws, forecast_horizon=1)
        
        # Split chronologically
        split_idx = int(len(windows_ws) * 0.8)
        train_windows_ws = windows_ws[:split_idx]
        test_windows_ws = windows_ws[split_idx:]
        train_targets_ws = targets_ws[:split_idx]
        test_targets_ws = targets_ws[split_idx:]
        
        # Train and predict on training set
        train_preds_ws = []
        train_acts_ws = []
        for window in train_windows_ws:
            try:
                gm = GrayModel()
                gm.fit(window)
                pred = gm.predict(steps=1, start=len(window)-1)[0]
                train_preds_ws.append(pred)
            except:
                train_preds_ws.append(np.mean(window))
        
        for target in train_targets_ws:
            train_acts_ws.append(target[0])
        
        # Predict on test set
        test_preds_ws = []
        test_acts_ws = []
        for window in test_windows_ws:
            try:
                gm = GrayModel()
                gm.fit(window)
                forecast = gm.forecast(steps=1)[0]
                test_preds_ws.append(forecast)
            except:
                test_preds_ws.append(np.mean(window))
        
        for target in test_targets_ws:
            test_acts_ws.append(target[0])
        
        # Calculate metrics
        train_r2_ws = r2_score(train_acts_ws, train_preds_ws)
        test_r2_ws = r2_score(test_acts_ws, test_preds_ws)
        test_rmse_ws = np.sqrt(mean_squared_error(test_acts_ws, test_preds_ws))
        
        results.append({
            'Window Size': ws,
            'Train R²': train_r2_ws,
            'Test R²': test_r2_ws,
            'Test RMSE': test_rmse_ws
        })
        
        print(f"Window Size {ws:2d}: Test R² = {test_r2_ws:.4f}, Test RMSE = {test_rmse_ws:.4f}")
    except Exception as e:
        print(f"Window Size {ws:2d}: Failed - {str(e)}")

# Create results dataframe
results_df = pd.DataFrame(results)
if len(results_df) > 0:
    print("\n" + "=" * 60)
    print("SUMMARY OF RESULTS")
    print("=" * 60)
    print(results_df.to_string(index=False))
    
    # Visualize window size comparison
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))
    
    axes[0].plot(results_df['Window Size'], results_df['Train R²'], marker='o', label='Train R²', linewidth=2, markersize=8)
    axes[0].plot(results_df['Window Size'], results_df['Test R²'], marker='s', label='Test R²', linewidth=2, markersize=8)
    axes[0].set_xlabel('Window Size', fontsize=12)
    axes[0].set_ylabel('R² Score', fontsize=12)
    axes[0].set_title('R² Score vs Window Size', fontsize=14, fontweight='bold')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    axes[1].plot(results_df['Window Size'], results_df['Test RMSE'], marker='o', color='red', linewidth=2, markersize=8)
    axes[1].set_xlabel('Window Size', fontsize=12)
    axes[1].set_ylabel('Test RMSE', fontsize=12)
    axes[1].set_title('Test RMSE vs Window Size', fontsize=14, fontweight='bold')
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # Find best window size
    best_window = results_df.loc[results_df['Test R²'].idxmax()]
    print(f"\nBest Window Size: {int(best_window['Window Size'])}")
    print(f"Best Test R²: {best_window['Test R²']:.4f}")
    print(f"Best Test RMSE: {best_window['Test RMSE']:.4f}")


## 13. Summary and Conclusions

### Key Findings:
1. **Gray Model (GM(1,1))**: Successfully implemented for small dataset time series forecasting
2. **Sliding Windows**: Applied to create features from past time steps
3. **Window Size Optimization**: Tested multiple window sizes to find optimal configuration
4. **Residual Diagnostics**: Comprehensive evaluation of model assumptions

### Advantages of Gray Model:
- Excellent for small datasets (typically 4-10+ data points)
- Simple and computationally efficient
- No strict assumptions about data distribution
- Good for short-term forecasting
- Works well with limited historical data

### Model Performance:
- Performance depends on the selected window size
- Smaller windows (5-10) typically work best for Gray Model
- Experiment with different window sizes to optimize results
- Residual diagnostics help validate model assumptions
