---

## 7. Interview Questions

### Conceptual Questions

**Q1: What is stationarity and why is it important in time series analysis?**

**Answer**: A time series is stationary if its statistical properties (mean, variance, autocorrelation) remain constant over time. Specifically, weak stationarity requires:
1. Constant mean: $E[y_t] = \mu$
2. Constant variance: $Var(y_t) = \sigma^2$
3. Autocorrelation depends only on lag, not time: $Cov(y_t, y_{t-k})$ is function of $k$ only

Stationarity is important because:
- Most statistical forecasting methods (ARIMA, etc.) assume stationarity
- Makes the data easier to model and understand
- Non-stationary series can lead to spurious correlations
- Enables us to use historical statistics for future predictions

---

**Q2: Explain the difference between AR, MA, and ARMA models.**

**Answer**:
- **AR(p) - AutoRegressive**: Current value depends on past values
  - $y_t = c + \phi_1 y_{t-1} + \phi_2 y_{t-2} + \cdots + \phi_p y_{t-p} + \epsilon_t$
  - PACF cuts off after lag $p$
  - Good when current value is correlated with recent past values

- **MA(q) - Moving Average**: Current value depends on past forecast errors
  - $y_t = c + \epsilon_t + \theta_1 \epsilon_{t-1} + \theta_2 \epsilon_{t-2} + \cdots + \theta_q \epsilon_{t-q}$
  - ACF cuts off after lag $q$
  - Good for modeling \"shock\" effects that persist for limited time

- **ARMA(p,q)**: Combines both AR and MA components
  - Captures both value dependencies and error dependencies
  - More flexible than pure AR or MA models

---

**Q3: Why can't we use standard k-fold cross-validation for time series?**

**Answer**: Standard k-fold CV randomly splits data into folds, which:
1. **Violates temporal order**: Future data could end up in training set
2. **Causes data leakage**: Model could \"see\" future information
3. **Unrealistic evaluation**: In practice, we only have past data to predict future

Instead, use **time series cross-validation** (walk-forward validation):
- Training set always comes before test set
- Maintains temporal order
- Mimics real-world forecasting scenario

---

**Q4: Compare LSTM vs Transformer for time series forecasting.**

**Answer**:

**LSTM:**
- ✓ Designed specifically for sequences
- ✓ Maintains cell state for long-term memory
- ✓ Works well with smaller datasets
- ✗ Sequential processing (slow)
- ✗ Vanishing gradient for very long sequences
- ✗ Hidden state size limits information retention

**Transformer:**
- ✓ Parallel processing (fast)
- ✓ Attention captures any-range dependencies directly
- ✓ No vanishing gradient issues
- ✓ Interpretable via attention weights
- ✗ Requires more data
- ✗ Higher memory usage
- ✗ Needs positional encoding

**When to use which:**
- LSTM: Small datasets, shorter sequences, limited compute
- Transformer: Large datasets, long sequences, when speed and interpretability matter

---

**Q5: What is the difference between additive and multiplicative seasonality?**

**Answer**:

**Additive**: $y_t = T_t + S_t + R_t$
- Seasonal variation is constant (same amplitude)
- Use when seasonal fluctuations don't change with level
- Example: Temperature variations stay roughly same magnitude year-round

**Multiplicative**: $y_t = T_t \\times S_t \\times R_t$
- Seasonal variation increases/decreases with level
- Use when seasonal fluctuations grow with the trend
- Example: Retail sales - holiday spike is 20% of current level, so grows as business grows

**How to choose:**
- Plot the data and observe if seasonal swings grow with level
- If yes → multiplicative
- If no → additive
- Can apply log transform to convert multiplicative to additive

---

**Q6: Explain how SARIMA extends ARIMA to handle seasonality.**

**Answer**:

**ARIMA(p,d,q)** has three components:
- $p$: AR order
- $d$: differencing order
- $q$: MA order

**SARIMA(p,d,q)(P,D,Q)s** adds seasonal components:
- $P$: seasonal AR order
- $D$: seasonal differencing order
- $Q$: seasonal MA order
- $s$: seasonal period (12 for monthly with annual pattern)

Example: SARIMA(1,1,1)(1,1,1)₁₂
- Non-seasonal: AR(1), 1 difference, MA(1)
- Seasonal: SAR(1), 1 seasonal difference, SMA(1), period=12

The model applies:
1. Regular differencing $d$ times
2. Seasonal differencing $D$ times (at lag $s$)
3. AR and MA terms at both regular and seasonal lags

---

**Q7: What are the advantages of using deep learning for time series vs traditional methods?**

**Answer**:

**Advantages of Deep Learning:**
1. **Non-linear patterns**: Can capture complex, non-linear relationships
2. **Multivariate**: Naturally handles multiple input features
3. **Automatic feature extraction**: Learns relevant features from raw data
4. **Scalability**: Parallelizable, handles large datasets efficiently
5. **Transfer learning**: Pre-trained models can be fine-tuned
6. **Multiple horizons**: Can forecast multiple steps simultaneously

**Advantages of Traditional Methods:**
1. **Interpretability**: Clear mathematical formulation
2. **Small data**: Works well with limited historical data
3. **Uncertainty quantification**: Confidence intervals are straightforward
4. **Computational efficiency**: Faster training, no GPU needed
5. **Theoretical guarantees**: Well-studied statistical properties
6. **Debugging**: Easier to diagnose what's wrong

**Best practice**: Start with traditional methods (SARIMA) as baseline, then try deep learning if:
- You have large datasets (1000+ samples)
- Patterns are complex/non-linear
- Multiple input features available
- Computational resources available

---

**Q8: How would you detect and handle outliers in time series data?**

**Answer**:

**Detection methods:**

1. **Statistical methods:**
   - Z-score: Points beyond 3 standard deviations
   - IQR method: Points beyond Q1 - 1.5×IQR or Q3 + 1.5×IQR
   - Moving average residuals: Large deviations from moving average

2. **Model-based:**
   - Fit ARIMA, identify points with large residuals
   - Seasonal decomposition: outliers in residual component
   - LSTM reconstruction: High reconstruction error

3. **Domain-specific:**
   - Business logic (e.g., negative sales impossible)
   - Known events (Black Friday spike is expected, not outlier)

**Handling strategies:**

1. **Remove**: If clearly erroneous (measurement error)
2. **Impute**: Replace with interpolated value
   - Linear interpolation
   - Seasonal average
   - Model-based prediction
3. **Cap**: Limit to maximum reasonable value
4. **Model separately**: Add binary indicator variable
5. **Robust methods**: Use models resistant to outliers (e.g., MAE loss)

**Important**: Document all outlier handling decisions!

---

### Coding Questions

**Q9: Implement a function to create sliding window sequences from a time series.**

```python
def create_sequences(data, window_size, horizon=1):
    \"\"\"
    Create input-output sequences using sliding window.
    
    Args:
        data: 1D array of time series values
        window_size: Number of past observations for input
        horizon: Number of future steps to predict
    
    Returns:
        X: Array of shape (n_samples, window_size)
        y: Array of shape (n_samples, horizon)
    \"\"\"
    X, y = [], []
    
    for i in range(len(data) - window_size - horizon + 1):
        X.append(data[i:i + window_size])
        y.append(data[i + window_size:i + window_size + horizon])
    
    return np.array(X), np.array(y)

# Test
data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
X, y = create_sequences(data, window_size=3, horizon=2)
print(\"X:\", X)
print(\"y:\", y)
```

---

**Q10: How would you implement walk-forward validation for time series?**

```python
def walk_forward_validation(data, model_fn, window_size=100, test_size=50):
    \"\"\"
    Perform walk-forward validation.
    
    Args:
        data: Time series data
        model_fn: Function that fits model and returns predictions
        window_size: Minimum training window size
        test_size: Size of each test set
    
    Returns:
        List of (predictions, actuals) tuples
    \"\"\"
    results = []
    
    for i in range(window_size, len(data) - test_size + 1, test_size):
        # Split data
        train = data[:i]
        test = data[i:i + test_size]
        
        # Fit model and predict
        predictions = model_fn(train, len(test))
        
        # Store results
        results.append((predictions, test))
    
    return results

# Example usage
def simple_model(train, n_steps):
    # Simple: forecast mean of last 10 values
    return np.full(n_steps, train[-10:].mean())

results = walk_forward_validation(ts_full.values, simple_model)
print(f\"Number of folds: {len(results)}\")
```

---

## Summary

This notebook covered:

✅ **Time Series Fundamentals**: Components, stationarity, ACF/PACF, decomposition
✅ **Classical Methods**: Moving averages, exponential smoothing, ARIMA, SARIMA
✅ **Deep Learning**: LSTM, 1D CNN, Transformers
✅ **Best Practices**: Cross-validation, evaluation metrics, production considerations

**Key Takeaways:**

1. **Always check for stationarity** before applying statistical methods
2. **Start simple**: Baseline models (naive, moving average) establish minimum performance
3. **Classical methods often work well** with proper parameter tuning
4. **Deep learning shines** with large datasets and complex patterns
5. **Proper validation is critical**: Use time series split, not random k-fold
6. **Multiple metrics**: MAE, RMSE, MAPE tell different stories
7. **Domain knowledge**: Understanding the data source is as important as the model

**Next Steps:**

- Try these methods on real-world datasets (stock prices, weather, sales)
- Experiment with hyperparameter tuning
- Implement ensemble methods (combining multiple models)
- Explore multivariate time series forecasting
- Study probabilistic forecasting (prediction intervals)

---

**Happy Forecasting! 📈**"

In [None]:
def calculate_all_metrics(y_true, y_pred):
    """
    Calculate all common time series forecasting metrics.
    """
    # MAE
    mae = mean_absolute_error(y_true, y_pred)
    
    # RMSE
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    
    # MAPE (avoid division by zero)
    mape = np.mean(np.abs((y_true - y_pred) / (y_true + 1e-10))) * 100
    
    # sMAPE
    smape = np.mean(2.0 * np.abs(y_pred - y_true) / (np.abs(y_true) + np.abs(y_pred) + 1e-10)) * 100
    
    # MASE (using naive forecast as baseline)
    naive_forecast_mae = np.mean(np.abs(np.diff(y_true)))
    mase = mae / (naive_forecast_mae + 1e-10)
    
    # R-squared
    r2 = r2_score(y_true, y_pred)
    
    return {
        'MAE': mae,
        'RMSE': rmse,
        'MAPE': mape,
        'sMAPE': smape,
        'MASE': mase,
        'R²': r2
    }

# Calculate metrics for best model
best_model_name = model_metrics_sorted[0][0]
best_forecast = models_comparison[best_model_name][0]

metrics = calculate_all_metrics(test.values, best_forecast)

print(f\"\\nDetailed Metrics for Best Model ({best_model_name}):\")
print(\"=\"*50)
for metric, value in metrics.items():
    if metric == 'R²':
        print(f\"{metric:<15} {value:.4f}\")
    elif metric in ['MAPE', 'sMAPE']:
        print(f\"{metric:<15} {value:.2f}%\")
    else:
        print(f\"{metric:<15} {value:.4f}\")
print(\"=\"*50)"

### 6.2 Evaluation Metrics for Time Series

Different metrics capture different aspects of forecast quality:

**1. Mean Absolute Error (MAE):**
$$\text{MAE} = \frac{1}{n}\sum_{i=1}^{n}|y_i - \hat{y}_i|$$
- **Pros**: Easy to interpret, same units as data, robust to outliers
- **Cons**: Doesn't penalize large errors heavily

**2. Root Mean Squared Error (RMSE):**
$$\text{RMSE} = \sqrt{\frac{1}{n}\sum_{i=1}^{n}(y_i - \hat{y}_i)^2}$$
- **Pros**: Penalizes large errors, same units as data
- **Cons**: Sensitive to outliers

**3. Mean Absolute Percentage Error (MAPE):**
$$\text{MAPE} = \frac{100\%}{n}\sum_{i=1}^{n}\left|\frac{y_i - \hat{y}_i}{y_i}\right|$$
- **Pros**: Scale-independent, easy to interpret (percentage)
- **Cons**: Undefined when $y_i = 0$, asymmetric (penalizes over-forecasts more)

**4. Symmetric MAPE (sMAPE):**
$$\text{sMAPE} = \frac{100\%}{n}\sum_{i=1}^{n}\frac{|y_i - \hat{y}_i|}{(|y_i| + |\hat{y}_i|)/2}$$
- **Pros**: Symmetric, bounded between 0% and 200%

**5. Mean Absolute Scaled Error (MASE):**
$$\text{MASE} = \frac{\text{MAE}}{\frac{1}{n-1}\sum_{i=2}^{n}|y_i - y_{i-1}|}$$
- **Pros**: Scale-independent, works for zero values, symmetric
- **Cons**: Less intuitive

**When to use which:**
- **MAE**: General purpose, when all errors are equally important
- **RMSE**: When large errors are particularly undesirable
- **MAPE**: When relative errors matter (e.g., financial forecasting)
- **MASE**: When comparing across different time series"

In [None]:
from sklearn.model_selection import TimeSeriesSplit

# Demonstrate time series cross-validation
n_splits = 5
tscv = TimeSeriesSplit(n_splits=n_splits)

plt.figure(figsize=(15, 8))

for i, (train_index, test_index) in enumerate(tscv.split(ts_full)):
    # Get train and test data
    train_fold = ts_full.iloc[train_index]
    test_fold = ts_full.iloc[test_index]
    
    # Plot
    plt.subplot(n_splits, 1, i+1)
    plt.plot(train_fold.index, np.ones(len(train_fold)) * (i+1), 'b-', linewidth=8, label='Train' if i == 0 else '')
    plt.plot(test_fold.index, np.ones(len(test_fold)) * (i+1), 'r-', linewidth=8, label='Test' if i == 0 else '')
    plt.ylim(0.5, 1.5)
    plt.yticks([])
    plt.ylabel(f'Fold {i+1}', rotation=0, labelpad=20)
    
    if i == 0:
        plt.legend(loc='upper right')
    if i == n_splits - 1:
        plt.xlabel('Time')

plt.suptitle('Time Series Cross-Validation (Walk-Forward)', fontsize=14, fontweight='bold', y=0.995)
plt.tight_layout()
plt.show()

print(f\"Number of splits: {n_splits}\")
print(f\"Total data points: {len(ts_full)}\")"

---

## 6. Best Practices & Production

### 6.1 Time Series Cross-Validation

**Standard cross-validation does NOT work for time series** because it violates temporal order.

**Time Series Split (Walk-Forward Validation):**
```
Training Set 1   | Test Set 1
Training Set 1 + Test Set 1   | Test Set 2
Training Set 1 + Test Set 1 + Test Set 2   | Test Set 3
...
```

This maintains temporal order and prevents data leakage.

In [None]:
# Compare all models
models_comparison = {
    'Simple ES': (ses_forecast, mean_absolute_error(test.values, ses_forecast)),
    'Double ES': (des_forecast, mean_absolute_error(test.values, des_forecast)),
    'Triple ES': (tes_forecast, mean_absolute_error(test.values, tes_forecast)),
    'ARIMA': (arima_forecast.values, arima_mae),
    'SARIMA': (sarima_forecast.values, sarima_mae),
    'LSTM': (lstm_forecast, lstm_mae),
    '1D CNN': (cnn_forecast, cnn_mae),
    'Transformer': (transformer_forecast, transformer_mae)
}

# Plot all forecasts
plt.figure(figsize=(18, 10))

# Top plot: All forecasts
plt.subplot(2, 1, 1)
plt.plot(train.index[-100:], train.values[-100:], label='Train (last 100)', alpha=0.5, color='gray')
plt.plot(test.index, test.values, label='Actual', color='black', linewidth=3, alpha=0.8)

colors = plt.cm.tab10(np.linspace(0, 1, len(models_comparison)))
for (name, (forecast, _)), color in zip(models_comparison.items(), colors):
    plt.plot(test.index, forecast, label=name, linestyle='--', alpha=0.7, color=color)

plt.axvline(test.index[0], color='red', linestyle=':', alpha=0.3)
plt.title('All Models Comparison - Forecasts', fontsize=14, fontweight='bold')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend(loc='upper left', ncol=3)
plt.grid(True, alpha=0.3)

# Bottom plot: MAE comparison
plt.subplot(2, 1, 2)
model_names = list(models_comparison.keys())
maes = [models_comparison[name][1] for name in model_names]

bars = plt.bar(model_names, maes, color=colors, alpha=0.7, edgecolor='black')
plt.xlabel('Model')
plt.ylabel('MAE')
plt.title('Model Performance Comparison (MAE - Lower is Better)', fontsize=14, fontweight='bold')
plt.xticks(rotation=45, ha='right')
plt.grid(True, alpha=0.3, axis='y')

# Add value labels on bars
for bar, mae in zip(bars, maes):
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2., height,
             f'{mae:.4f}', ha='center', va='bottom', fontsize=10, fontweight='bold')

plt.tight_layout()
plt.show()

# Print detailed comparison table
print(\"\\n\" + \"=\"*80)
print(\" \" * 25 + \"MODEL PERFORMANCE COMPARISON\")
print(\"=\"*80)
print(f\"{'Model':<20} {'MAE':<12} {'RMSE':<12} {'Rank':<10}\")\
print(\"-\"*80)

# Calculate RMSE for all models
model_metrics = []
for name, (forecast, mae) in models_comparison.items():
    rmse = np.sqrt(mean_squared_error(test.values, forecast))
    model_metrics.append((name, mae, rmse))

# Sort by MAE
model_metrics_sorted = sorted(model_metrics, key=lambda x: x[1])

for rank, (name, mae, rmse) in enumerate(model_metrics_sorted, 1):
    marker = \" ← BEST\" if rank == 1 else \"\"
    print(f\"{name:<20} {mae:<12.4f} {rmse:<12.4f} {rank:<10}{marker}\")

print(\"=\"*80)

# Statistical summary
print(f\"\\nBest Model: {model_metrics_sorted[0][0]}\")
print(f\"Worst Model: {model_metrics_sorted[-1][0]}\")
improvement = ((model_metrics_sorted[-1][1] - model_metrics_sorted[0][1]) / model_metrics_sorted[-1][1] * 100)
print(f\"Improvement from worst to best: {improvement:.2f}%\")"

### 4.5 Model Comparison

Let's compare all the models we've trained on the same test set.

In [None]:
class PositionalEncoding(nn.Module):
    """
    Positional encoding for Transformer to inject sequence order information.
    """
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        
        # Create positional encoding matrix
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        pe = pe.unsqueeze(0)  # (1, max_len, d_model)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        """
        Args:
            x: Tensor of shape (batch_size, seq_len, d_model)
        """
        return x + self.pe[:, :x.size(1), :]

class TransformerForecaster(nn.Module):
    """
    Transformer model for time series forecasting.
    """
    def __init__(self, input_size=1, d_model=64, nhead=4, num_layers=2, 
                 dim_feedforward=256, dropout=0.1, output_size=1):
        """
        Args:
            input_size: Number of input features
            d_model: Dimension of model
            nhead: Number of attention heads
            num_layers: Number of transformer layers
            dim_feedforward: Dimension of feedforward network
            dropout: Dropout probability
            output_size: Number of output values
        """
        super(TransformerForecaster, self).__init__()
        
        # Input projection
        self.input_projection = nn.Linear(input_size, d_model)
        
        # Positional encoding
        self.pos_encoder = PositionalEncoding(d_model)
        
        # Transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        # Output layers
        self.fc = nn.Linear(d_model, output_size)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        """
        Args:
            x: Input tensor of shape (batch_size, seq_len, input_size)
        
        Returns:
            Output tensor of shape (batch_size, output_size)
        """
        # Project input to d_model dimension
        x = self.input_projection(x)  # (batch, seq_len, d_model)
        
        # Add positional encoding
        x = self.pos_encoder(x)
        
        # Transformer encoding
        x = self.transformer_encoder(x)  # (batch, seq_len, d_model)
        
        # Use the last time step for prediction
        x = x[:, -1, :]  # (batch, d_model)
        
        # Output projection
        x = self.dropout(x)
        x = self.fc(x)  # (batch, output_size)
        
        return x

# Initialize Transformer model
transformer_model = TransformerForecaster(
    input_size=1,
    d_model=64,
    nhead=4,
    num_layers=2,
    dim_feedforward=256,
    dropout=0.1,
    output_size=horizon
).to(device)

optimizer_transformer = optim.Adam(transformer_model.parameters(), lr=0.001)
scheduler_transformer = optim.lr_scheduler.ReduceLROnPlateau(optimizer_transformer, mode='min', factor=0.5, patience=5)

print(transformer_model)
print(f"\\nTotal parameters: {sum(p.numel() for p in transformer_model.parameters()):,}")

# Train Transformer model
print("\\nTraining Transformer model...")
train_losses_transformer, val_losses_transformer = train_model(
    transformer_model, train_loader, val_loader,
    criterion, optimizer_transformer, scheduler_transformer, epochs=100
)

# Forecast with Transformer
transformer_forecast = forecast_lstm(transformer_model, last_train_window, len(test), scaler)

# Calculate errors
transformer_mae = mean_absolute_error(test.values, transformer_forecast)
transformer_rmse = np.sqrt(mean_squared_error(test.values, transformer_forecast))

print(f"\\nTransformer Forecast Accuracy:")
print(f"MAE:  {transformer_mae:.4f}")
print(f"RMSE: {transformer_rmse:.4f}")

### 4.4 Transformer for Time Series

**Transformers** use self-attention mechanisms to capture dependencies regardless of distance.

**Self-Attention Mechanism:**

$$
\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V
$$

where:
- $Q$ = Query matrix
- $K$ = Key matrix
- $V$ = Value matrix
- $d_k$ = Dimension of key vectors

**Multi-Head Attention:**
$$
\text{MultiHead}(Q, K, V) = \text{Concat}(\text{head}_1, ..., \text{head}_h)W^O
$$
$$
\text{head}_i = \text{Attention}(QW_i^Q, KW_i^K, VW_i^V)
$$

**Advantages for Time Series:**
- Captures long-range dependencies efficiently
- Parallel processing (unlike RNNs)
- Attention weights provide interpretability
- Positional encoding maintains temporal order

In [None]:
class CNN1DForecaster(nn.Module):
    """
    1D CNN model for time series forecasting.
    """
    def __init__(self, input_size=1, num_filters=64, kernel_size=3, output_size=1):
        """
        Args:
            input_size: Number of features per time step
            num_filters: Number of filters in conv layers
            kernel_size: Size of convolutional kernel
            output_size: Number of output values
        """
        super(CNN1DForecaster, self).__init__()
        
        # Convolutional layers
        self.conv1 = nn.Conv1d(input_size, num_filters, kernel_size, padding=kernel_size//2)
        self.conv2 = nn.Conv1d(num_filters, num_filters*2, kernel_size, padding=kernel_size//2)
        self.conv3 = nn.Conv1d(num_filters*2, num_filters*4, kernel_size, padding=kernel_size//2)
        
        # Pooling and activation
        self.pool = nn.MaxPool1d(2)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)
        
        # Global average pooling
        self.global_pool = nn.AdaptiveAvgPool1d(1)
        
        # Fully connected layers
        self.fc1 = nn.Linear(num_filters*4, 128)
        self.fc2 = nn.Linear(128, output_size)
    
    def forward(self, x):
        """
        Args:
            x: Input tensor of shape (batch_size, seq_len, input_size)
        
        Returns:
            Output tensor of shape (batch_size, output_size)
        """
        # Transpose for Conv1d: (batch, features, seq_len)
        x = x.transpose(1, 2)
        
        # Conv blocks
        x = self.relu(self.conv1(x))
        x = self.pool(x)
        x = self.dropout(x)
        
        x = self.relu(self.conv2(x))
        x = self.pool(x)
        x = self.dropout(x)
        
        x = self.relu(self.conv3(x))
        
        # Global pooling
        x = self.global_pool(x)  # (batch, channels, 1)
        x = x.squeeze(-1)  # (batch, channels)
        
        # Fully connected
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        
        return x

# Initialize CNN model
cnn_model = CNN1DForecaster(
    input_size=1,
    num_filters=64,
    kernel_size=5,
    output_size=horizon
).to(device)

optimizer_cnn = optim.Adam(cnn_model.parameters(), lr=0.001)
scheduler_cnn = optim.lr_scheduler.ReduceLROnPlateau(optimizer_cnn, mode='min', factor=0.5, patience=5)

print(cnn_model)
print(f"\\nTotal parameters: {sum(p.numel() for p in cnn_model.parameters()):,}")

# Train CNN model
print("\\nTraining 1D CNN model...")
train_losses_cnn, val_losses_cnn = train_model(
    cnn_model, train_loader, val_loader,
    criterion, optimizer_cnn, scheduler_cnn, epochs=100
)

# Forecast with CNN
cnn_forecast = forecast_lstm(cnn_model, last_train_window, len(test), scaler)

# Calculate errors
cnn_mae = mean_absolute_error(test.values, cnn_forecast)
cnn_rmse = np.sqrt(mean_squared_error(test.values, cnn_forecast))

print(f"\\n1D CNN Forecast Accuracy:")
print(f"MAE:  {cnn_mae:.4f}")
print(f"RMSE: {cnn_rmse:.4f}")

### 4.3 1D CNN for Time Series

**1D Convolutional Neural Networks** can capture local patterns and are computationally efficient.

**1D Convolution:**
$$y_i = \sum_{k=0}^{K-1} w_k \cdot x_{i+k} + b$$

**Advantages:**
- Fast training compared to RNNs
- Parallel processing (no sequential dependency)
- Good at detecting local patterns
- Parameter sharing through convolutional filters

**Architecture:**
```
Input → Conv1D → ReLU → MaxPool → Conv1D → ReLU → MaxPool → Flatten → Dense → Output
```

In [None]:
# Multi-step ahead forecasting with LSTM
def forecast_lstm(model, last_sequence, steps, scaler):
    """
    Perform multi-step ahead forecasting using trained LSTM model.
    
    Args:
        model: Trained LSTM model
        last_sequence: Last window_size observations (scaled)
        steps: Number of steps to forecast
        scaler: Scaler used for normalization
    
    Returns:
        Array of forecasted values (original scale)
    """
    model.eval()
    forecasts = []
    current_seq = torch.FloatTensor(last_sequence).unsqueeze(0).unsqueeze(-1).to(device)
    
    with torch.no_grad():
        for _ in range(steps):
            # Predict next value
            pred = model(current_seq)
            forecasts.append(pred.cpu().numpy()[0, 0])
            
            # Update sequence: remove first, append prediction
            current_seq = torch.cat([
                current_seq[:, 1:, :],
                pred.unsqueeze(1).unsqueeze(-1)
            ], dim=1)
    
    # Inverse transform to original scale
    forecasts = np.array(forecasts).reshape(-1, 1)
    forecasts = scaler.inverse_transform(forecasts).flatten()
    
    return forecasts

# Get last window from training data
last_train_window = train_scaled[-window_size:]

# Forecast test period
lstm_forecast = forecast_lstm(lstm_model, last_train_window, len(test), scaler)

# Plot results
plt.figure(figsize=(15, 6))
plt.plot(train.index, train.values, label='Training Data', alpha=0.7)
plt.plot(test.index, test.values, label='Actual', color='black', linewidth=2)
plt.plot(test.index, lstm_forecast, label='LSTM Forecast', linestyle='--', linewidth=2, color='purple')
plt.axvline(test.index[0], color='red', linestyle=':', alpha=0.5, label='Forecast Start')
plt.title('LSTM Time Series Forecasting')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# Calculate errors
lstm_mae = mean_absolute_error(test.values, lstm_forecast)
lstm_rmse = np.sqrt(mean_squared_error(test.values, lstm_forecast))

print(f"\\nLSTM Forecast Accuracy:")
print(f"MAE:  {lstm_mae:.4f}")
print(f"RMSE: {lstm_rmse:.4f}")

print(f"\\nComparison with SARIMA:")
print(f"SARIMA MAE: {sarima_mae:.4f}")
print(f"LSTM MAE:   {lstm_mae:.4f}")
improvement = ((sarima_mae - lstm_mae) / sarima_mae * 100)
print(f"Improvement: {improvement:.2f}%\" if improvement > 0 else f\"Degradation: {-improvement:.2f}%\")")

In [None]:
# Training loop
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs=50):
    """
    Train the time series forecasting model.
    """
    train_losses = []
    val_losses = []
    best_val_loss = float('inf')
    patience_counter = 0
    early_stop_patience = 10
    
    for epoch in range(epochs):
        # Training phase
        model.train()
        train_loss = 0.0
        
        for batch_x, batch_y in train_loader:
            # Move to device and add feature dimension
            batch_x = batch_x.unsqueeze(-1).to(device)  # (batch, seq_len, 1)
            batch_y = batch_y.to(device)
            
            # Forward pass
            outputs = optimizer.zero_grad()
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            
            # Backward pass
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            
            train_loss += loss.item()
        
        train_loss /= len(train_loader)
        train_losses.append(train_loss)
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        
        with torch.no_grad():
            for batch_x, batch_y in val_loader:
                batch_x = batch_x.unsqueeze(-1).to(device)
                batch_y = batch_y.to(device)
                
                outputs = model(batch_x)
                loss = criterion(outputs, batch_y)
                val_loss += loss.item()
        
        val_loss /= len(val_loader)
        val_losses.append(val_loss)
        
        # Learning rate scheduling
        scheduler.step(val_loss)
        
        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            # Save best model
            torch.save(model.state_dict(), 'best_lstm_model.pth')
        else:
            patience_counter += 1
        
        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{epochs}] - Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}')
        
        if patience_counter >= early_stop_patience:
            print(f'Early stopping at epoch {epoch+1}')
            break
    
    # Load best model
    model.load_state_dict(torch.load('best_lstm_model.pth'))
    
    return train_losses, val_losses

# Train the model
print("Training LSTM model...")
train_losses, val_losses = train_model(
    lstm_model, train_loader, val_loader, 
    criterion, optimizer, scheduler, epochs=100
)

# Plot training history
plt.figure(figsize=(12, 4))
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss (MSE)')
plt.title('LSTM Training History')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

print(f"\\nBest validation loss: {min(val_losses):.6f}")

In [None]:
class LSTMForecaster(nn.Module):
    """
    LSTM model for time series forecasting.
    """
    def __init__(self, input_size=1, hidden_size=64, num_layers=2, dropout=0.2, output_size=1):
        """
        Args:
            input_size: Number of features per time step
            hidden_size: Number of LSTM hidden units
            num_layers: Number of LSTM layers
            dropout: Dropout probability
            output_size: Number of output values (forecast horizon)
        """
        super(LSTMForecaster, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM layers
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=True
        )
        
        # Fully connected output layer
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        """
        Args:
            x: Input tensor of shape (batch_size, seq_len, input_size)
        
        Returns:
            Output tensor of shape (batch_size, output_size)
        """
        # x shape: (batch_size, seq_len, input_size)
        
        # LSTM forward pass
        # lstm_out shape: (batch_size, seq_len, hidden_size)
        # h_n shape: (num_layers, batch_size, hidden_size)
        lstm_out, (h_n, c_n) = self.lstm(x)
        
        # Use the last hidden state for prediction
        # h_n[-1] shape: (batch_size, hidden_size)
        out = self.fc(h_n[-1])
        
        return out

# Initialize model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
lstm_model = LSTMForecaster(
    input_size=1,
    hidden_size=64,
    num_layers=2,
    dropout=0.2,
    output_size=horizon
).to(device)

# Training configuration
criterion = nn.MSELoss()
optimizer = optim.Adam(lstm_model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)

print(lstm_model)
print(f"\\nTotal parameters: {sum(p.numel() for p in lstm_model.parameters()):,}")
print(f"Training on device: {device}")

### 4.2 LSTM for Time Series

**Long Short-Term Memory (LSTM)** networks are designed to handle sequential data and can learn long-term dependencies.

**LSTM Cell Equations:**

$$
\begin{align}
f_t &= \sigma(W_f \cdot [h_{t-1}, x_t] + b_f) \quad \text{(Forget gate)} \\
i_t &= \sigma(W_i \cdot [h_{t-1}, x_t] + b_i) \quad \text{(Input gate)} \\
\tilde{C}_t &= \tanh(W_C \cdot [h_{t-1}, x_t] + b_C) \quad \text{(Candidate cell state)} \\
C_t &= f_t \odot C_{t-1} + i_t \odot \tilde{C}_t \quad \text{(Cell state)} \\
o_t &= \sigma(W_o \cdot [h_{t-1}, x_t] + b_o) \quad \text{(Output gate)} \\
h_t &= o_t \odot \tanh(C_t) \quad \text{(Hidden state)}
\end{align}
$$

where:
- $\sigma$ is the sigmoid function
- $\odot$ is element-wise multiplication
- $f_t, i_t, o_t$ are forget, input, and output gates
- $C_t$ is the cell state (long-term memory)
- $h_t$ is the hidden state (short-term memory)

In [None]:
class TimeSeriesDataset(Dataset):
    """
    PyTorch Dataset for time series with sliding window.
    """
    def __init__(self, data, window_size=20, horizon=1):
        """
        Args:
            data: Time series data (numpy array or pandas Series)
            window_size: Number of past time steps to use as input
            horizon: Number of future time steps to predict
        """
        self.data = data if isinstance(data, np.ndarray) else data.values
        self.window_size = window_size
        self.horizon = horizon
        
        # Create sequences
        self.X, self.y = self._create_sequences()
    
    def _create_sequences(self):
        """Create input-output sequences using sliding window."""
        X, y = [], []
        
        for i in range(len(self.data) - self.window_size - self.horizon + 1):
            X.append(self.data[i:i + self.window_size])
            y.append(self.data[i + self.window_size:i + self.window_size + self.horizon])
        
        return np.array(X), np.array(y)
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return (
            torch.FloatTensor(self.X[idx]),
            torch.FloatTensor(self.y[idx])
        )

# Prepare data for deep learning
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train.values.reshape(-1, 1)).flatten()
test_scaled = scaler.transform(test.values.reshape(-1, 1)).flatten()

# Create datasets
window_size = 30
horizon = 1

train_dataset = TimeSeriesDataset(train_scaled, window_size=window_size, horizon=horizon)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Create a validation set from the last 20% of training data
val_size = int(0.2 * len(train_dataset))
train_size = len(train_dataset) - val_size
train_subset, val_subset = torch.utils.data.random_split(train_dataset, [train_size, val_size])
val_loader = DataLoader(val_subset, batch_size=32, shuffle=False)

print(f"Training samples: {len(train_subset)}")
print(f"Validation samples: {len(val_subset)}")
print(f"Window size: {window_size}")
print(f"Forecast horizon: {horizon}")
print(f"\\nSample input shape: {train_dataset[0][0].shape}")
print(f"Sample output shape: {train_dataset[0][1].shape}")

---

## 4. Deep Learning for Time Series

Deep learning methods can capture complex patterns and non-linear relationships in time series data.

### 4.1 Time Series Dataset Preparation

For supervised learning, we need to convert time series into input-output pairs using a **sliding window** approach.

**Example:**
```
Original series: [1, 2, 3, 4, 5, 6, 7, 8]
Window size = 3, horizon = 1

X                 y
[1, 2, 3]    →    4
[2, 3, 4]    →    5
[3, 4, 5]    →    6
[4, 5, 6]    →    7
[5, 6, 7]    →    8
```

# Time Series Analysis & Forecasting: Complete Guide

**Comprehensive coverage from classical methods to modern deep learning approaches**

---

## Table of Contents

1. [Time Series Fundamentals](#1-time-series-fundamentals)
   - Components of time series
   - Stationarity and differencing
   - ACF/PACF analysis
   - Decomposition methods

2. [Classical Statistical Methods](#2-classical-statistical-methods)
   - Moving Averages
   - Exponential Smoothing (SES, DES, Holt-Winters)
   - ARIMA Models
   - SARIMA (Seasonal ARIMA)

3. [Modern Statistical Methods](#3-modern-statistical-methods)
   - State Space Models
   - Prophet (Facebook)
   - Vector Autoregression (VAR)

4. [Deep Learning for Time Series](#4-deep-learning-for-time-series)
   - RNN/LSTM/GRU
   - 1D CNNs
   - Transformers for Time Series
   - Temporal Convolutional Networks (TCN)

5. [Advanced Topics](#5-advanced-topics)
   - Multivariate Time Series
   - Anomaly Detection
   - Probabilistic Forecasting
   - Transfer Learning

6. [Best Practices & Production](#6-best-practices-production)
   - Cross-validation strategies
   - Evaluation metrics
   - Feature engineering

7. [Interview Questions](#7-interview-questions)

---

In [None]:
# Essential imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import warnings
warnings.filterwarnings('ignore')

# Time series specific
from statsmodels.tsa.stattools import adfuller, acf, pacf
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

# Deep learning
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# Sklearn
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# Plotting style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette('husl')

# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)

print("All imports successful!")
print(f"PyTorch version: {torch.__version__}")
print(f"Device: {'cuda' if torch.cuda.is_available() else 'cpu'}")

---

## 1. Time Series Fundamentals

### 1.1 What is a Time Series?

A **time series** is a sequence of data points indexed in time order. Each observation is associated with a specific time point.

**Mathematical Definition:**

$$Y = \{y_t : t \in T\}$$

where:
- $y_t$ is the observation at time $t$
- $T$ is the index set (usually integers or datetime)

### 1.2 Components of Time Series

Any time series can be decomposed into:

1. **Trend ($T_t$)**: Long-term increase or decrease
2. **Seasonality ($S_t$)**: Regular, periodic fluctuations
3. **Cyclic ($C_t$)**: Non-periodic fluctuations
4. **Irregular/Noise ($I_t$)**: Random variation

**Additive Model:**
$$y_t = T_t + S_t + C_t + I_t$$

**Multiplicative Model:**
$$y_t = T_t \times S_t \times C_t \times I_t$$

### 1.3 Stationarity

A time series is **stationary** if its statistical properties (mean, variance, autocorrelation) are constant over time.

**Weak Stationarity** (2nd order stationarity):
1. $E[y_t] = \mu$ (constant mean)
2. $Var(y_t) = \sigma^2$ (constant variance)
3. $Cov(y_t, y_{t-k})$ depends only on lag $k$, not on $t$

**Why is stationarity important?**
- Most statistical forecasting methods assume stationarity
- Non-stationary data can lead to spurious relationships
- Makes the data easier to model

In [None]:
# Generate synthetic time series with different components
def generate_time_series(n=500, trend=True, seasonal=True, noise_level=1.0):
    """
    Generate synthetic time series with trend, seasonality, and noise.
    
    Args:
        n: Number of time points
        trend: Whether to include trend component
        seasonal: Whether to include seasonal component
        noise_level: Standard deviation of noise
    
    Returns:
        pd.Series: Generated time series
    """
    t = np.arange(n)
    
    # Trend component (linear)
    trend_component = 0.05 * t if trend else np.zeros(n)
    
    # Seasonal component (annual + quarterly)
    seasonal_component = 0
    if seasonal:
        seasonal_component = (
            10 * np.sin(2 * np.pi * t / 50) +  # Annual
            5 * np.sin(2 * np.pi * t / 12.5)   # Quarterly
        )
    
    # Noise component
    noise = np.random.normal(0, noise_level, n)
    
    # Combine components
    series = trend_component + seasonal_component + noise
    
    # Create time index
    dates = pd.date_range(start='2020-01-01', periods=n, freq='D')
    return pd.Series(series, index=dates)

# Generate different types of time series
ts_stationary = generate_time_series(500, trend=False, seasonal=False, noise_level=1.0)
ts_trend = generate_time_series(500, trend=True, seasonal=False, noise_level=1.0)
ts_seasonal = generate_time_series(500, trend=False, seasonal=True, noise_level=1.0)
ts_full = generate_time_series(500, trend=True, seasonal=True, noise_level=1.0)

# Visualize
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

ts_stationary.plot(ax=axes[0, 0], title='Stationary (White Noise)')
axes[0, 0].set_ylabel('Value')

ts_trend.plot(ax=axes[0, 1], title='Trend Only')
axes[0, 1].set_ylabel('Value')

ts_seasonal.plot(ax=axes[1, 0], title='Seasonal Only')
axes[1, 0].set_ylabel('Value')

ts_full.plot(ax=axes[1, 1], title='Trend + Seasonal + Noise')
axes[1, 1].set_ylabel('Value')

plt.tight_layout()
plt.show()

print("Generated 4 types of time series")
print(f"Series length: {len(ts_full)}")
print(f"Date range: {ts_full.index[0]} to {ts_full.index[-1]}")

### 1.4 Testing for Stationarity: Augmented Dickey-Fuller Test

The **ADF test** tests the null hypothesis that a unit root is present (i.e., the series is non-stationary).

**Hypotheses:**
- $H_0$: The series has a unit root (non-stationary)
- $H_1$: The series has no unit root (stationary)

**Test Equation:**
$$\Delta y_t = \alpha + \beta t + \gamma y_{t-1} + \sum_{i=1}^{p} \delta_i \Delta y_{t-i} + \epsilon_t$$

**Decision Rule:**
- If p-value < 0.05: Reject $H_0$ (series is stationary)
- If p-value >= 0.05: Fail to reject $H_0$ (series is non-stationary)

In [None]:
def adf_test(series, name=''):
    """
    Perform Augmented Dickey-Fuller test for stationarity.
    
    Args:
        series: Time series data
        name: Name of the series for display
    
    Returns:
        dict: Test results
    """
    result = adfuller(series.dropna(), autolag='AIC')
    
    output = {
        'test_statistic': result[0],
        'p_value': result[1],
        'lags_used': result[2],
        'n_observations': result[3],
        'critical_values': result[4],
        'is_stationary': result[1] < 0.05
    }
    
    print(f"\n{'='*60}")
    print(f"ADF Test Results for {name}")
    print(f"{'='*60}")
    print(f"Test Statistic:      {output['test_statistic']:.6f}")
    print(f"P-value:             {output['p_value']:.6f}")
    print(f"Lags Used:           {output['lags_used']}")
    print(f"Number of Obs:       {output['n_observations']}")
    print(f"\nCritical Values:")
    for key, value in output['critical_values'].items():
        print(f"  {key}: {value:.3f}")
    
    if output['is_stationary']:
        print(f"\n✓ Series IS stationary (p-value < 0.05)")
    else:
        print(f"\n✗ Series is NOT stationary (p-value >= 0.05)")
    
    return output

# Test all series
adf_test(ts_stationary, 'White Noise (Expected: Stationary)')
adf_test(ts_trend, 'Trend Series (Expected: Non-Stationary)')
adf_test(ts_seasonal, 'Seasonal Series')
adf_test(ts_full, 'Full Series (Expected: Non-Stationary)')

### 1.5 Making Time Series Stationary: Differencing

**Differencing** is the transformation that computes the difference between consecutive observations.

**First-order differencing:**
$$\Delta y_t = y_t - y_{t-1}$$

**Second-order differencing:**
$$\Delta^2 y_t = \Delta y_t - \Delta y_{t-1} = y_t - 2y_{t-1} + y_{t-2}$$

**Seasonal differencing (period $s$):**
$$\Delta_s y_t = y_t - y_{t-s}$$

In [None]:
# Apply differencing to make non-stationary series stationary
def make_stationary(series, max_diff=3):
    """
    Apply differencing until series becomes stationary.
    
    Args:
        series: Time series to transform
        max_diff: Maximum number of differences to try
    
    Returns:
        tuple: (stationary_series, num_differences)
    """
    diff_series = series.copy()
    
    for d in range(max_diff + 1):
        result = adfuller(diff_series.dropna(), autolag='AIC')
        p_value = result[1]
        
        print(f"Difference order {d}: p-value = {p_value:.6f}", end='')
        
        if p_value < 0.05:
            print(" ✓ STATIONARY")
            return diff_series, d
        else:
            print(" - Not stationary")
            if d < max_diff:
                diff_series = diff_series.diff()
    
    return diff_series, max_diff

print("Making trend series stationary:")
ts_trend_stationary, d_trend = make_stationary(ts_trend)

print("\nMaking full series stationary:")
ts_full_stationary, d_full = make_stationary(ts_full)

# Visualize original vs differenced
fig, axes = plt.subplots(2, 2, figsize=(15, 8))

ts_trend.plot(ax=axes[0, 0], title='Original Trend Series')
axes[0, 0].set_ylabel('Value')

ts_trend_stationary.plot(ax=axes[0, 1], title=f'After {d_trend} Difference(s)')
axes[0, 1].set_ylabel('Value')

ts_full.plot(ax=axes[1, 0], title='Original Full Series')
axes[1, 0].set_ylabel('Value')

ts_full_stationary.plot(ax=axes[1, 1], title=f'After {d_full} Difference(s)')
axes[1, 1].set_ylabel('Value')

plt.tight_layout()
plt.show()

### 1.6 Autocorrelation Function (ACF) and Partial Autocorrelation (PACF)

**Autocorrelation** measures the correlation between a time series and a lagged version of itself.

**ACF at lag $k$:**
$$\rho_k = \frac{Cov(y_t, y_{t-k})}{Var(y_t)} = \frac{\sum_{t=k+1}^{n}(y_t - \bar{y})(y_{t-k} - \bar{y})}{\sum_{t=1}^{n}(y_t - \bar{y})^2}$$

**PACF at lag $k$:**
The correlation between $y_t$ and $y_{t-k}$ after removing the effect of intermediate lags.

**Why are ACF/PACF important?**
- Identify patterns in time series
- Determine order of ARIMA models:
  - ACF cuts off after lag $q$ → MA($q$)
  - PACF cuts off after lag $p$ → AR($p$)
  - Both decay gradually → ARMA($p$, $q$)

In [None]:
def plot_acf_pacf(series, lags=40, title=''):
    """
    Plot ACF and PACF for a time series.
    
    Args:
        series: Time series data
        lags: Number of lags to plot
        title: Title for the plots
    """
    fig, axes = plt.subplots(1, 2, figsize=(15, 4))
    
    # ACF plot
    plot_acf(series.dropna(), lags=lags, ax=axes[0])
    axes[0].set_title(f'ACF - {title}')
    axes[0].set_xlabel('Lag')
    axes[0].set_ylabel('Autocorrelation')
    
    # PACF plot
    plot_pacf(series.dropna(), lags=lags, ax=axes[1])
    axes[1].set_title(f'PACF - {title}')
    axes[1].set_xlabel('Lag')
    axes[1].set_ylabel('Partial Autocorrelation')
    
    plt.tight_layout()
    plt.show()

# Plot for different series
plot_acf_pacf(ts_stationary, title='White Noise')
plot_acf_pacf(ts_seasonal, title='Seasonal Series')
plot_acf_pacf(ts_full_stationary.dropna(), title='Differenced Full Series')

### 1.7 Time Series Decomposition

Decomposition separates a time series into its constituent components.

**Methods:**
1. **Additive Decomposition**: $y_t = T_t + S_t + R_t$
   - Use when seasonal variation is roughly constant over time

2. **Multiplicative Decomposition**: $y_t = T_t \times S_t \times R_t$
   - Use when seasonal variation increases with the level of the series

**Classical Decomposition Algorithm:**
1. Estimate trend using moving average
2. Remove trend to get detrended series
3. Estimate seasonal component by averaging detrended values for each season
4. Remainder = Original - Trend - Seasonal

In [None]:
# Decompose time series
def decompose_series(series, model='additive', period=50):
    """
    Decompose time series into trend, seasonal, and residual components.
    
    Args:
        series: Time series to decompose
        model: 'additive' or 'multiplicative'
        period: Period of seasonal component
    
    Returns:
        Decomposition object
    """
    result = seasonal_decompose(series, model=model, period=period)
    
    # Plot decomposition
    fig, axes = plt.subplots(4, 1, figsize=(15, 10))
    
    result.observed.plot(ax=axes[0])
    axes[0].set_ylabel('Observed')
    axes[0].set_title(f'Time Series Decomposition ({model.capitalize()})')
    
    result.trend.plot(ax=axes[1])
    axes[1].set_ylabel('Trend')
    
    result.seasonal.plot(ax=axes[2])
    axes[2].set_ylabel('Seasonal')
    
    result.resid.plot(ax=axes[3])
    axes[3].set_ylabel('Residual')
    axes[3].set_xlabel('Time')
    
    plt.tight_layout()
    plt.show()
    
    # Print statistics
    print(f"\nDecomposition Statistics:")
    print(f"Trend strength:    {1 - result.resid.var() / (result.trend + result.resid).var():.4f}")
    print(f"Seasonal strength: {1 - result.resid.var() / (result.seasonal + result.resid).var():.4f}")
    
    return result

# Decompose the full series
decomposition = decompose_series(ts_full, model='additive', period=50)

---

## 2. Classical Statistical Methods

### 2.1 Moving Averages

**Simple Moving Average (SMA):**
$$\hat{y}_{t+1} = \frac{1}{k}\sum_{i=0}^{k-1} y_{t-i}$$

**Weighted Moving Average (WMA):**
$$\hat{y}_{t+1} = \sum_{i=0}^{k-1} w_i y_{t-i}, \quad \sum_{i=0}^{k-1} w_i = 1$$

**Properties:**
- Simple to implement and understand
- Good for smoothing noisy data
- Lag behind trends
- All past observations weighted equally (SMA)

In [None]:
class MovingAverage:
    """
    Simple and Weighted Moving Average forecaster.
    """
    def __init__(self, window=10, weights=None):
        """
        Args:
            window: Size of the moving window
            weights: Optional weights for WMA (must sum to 1)
        """
        self.window = window
        self.weights = weights
        
        if weights is not None:
            assert len(weights) == window, "Weights length must equal window size"
            assert abs(sum(weights) - 1.0) < 1e-6, "Weights must sum to 1"
    
    def fit(self, series):
        """Store the series for forecasting."""
        self.series = series.copy()
        return self
    
    def predict(self, steps=1):
        """
        Forecast future values.
        
        Args:
            steps: Number of steps ahead to forecast
        
        Returns:
            Array of forecasted values
        """
        forecasts = []
        series = self.series.values.copy()
        
        for _ in range(steps):
            # Get last 'window' observations
            recent = series[-self.window:]
            
            # Calculate forecast
            if self.weights is None:
                # Simple moving average
                forecast = np.mean(recent)
            else:
                # Weighted moving average
                forecast = np.sum(recent * self.weights[::-1])  # Reverse weights for recency
            
            forecasts.append(forecast)
            series = np.append(series, forecast)
        
        return np.array(forecasts)

# Test SMA vs WMA
window = 20

# Simple MA
sma = MovingAverage(window=window)
sma.fit(ts_full[:-50])
sma_forecast = sma.predict(steps=50)

# Weighted MA (exponentially decaying weights)
weights = np.exp(np.linspace(-2, 0, window))
weights = weights / weights.sum()
wma = MovingAverage(window=window, weights=weights)
wma.fit(ts_full[:-50])
wma_forecast = wma.predict(steps=50)

# Plot results
plt.figure(figsize=(15, 6))
plt.plot(ts_full.index[:-50], ts_full.values[:-50], label='Training Data', alpha=0.7)
plt.plot(ts_full.index[-50:], ts_full.values[-50:], label='Actual', color='black', linewidth=2)
plt.plot(ts_full.index[-50:], sma_forecast, label=f'SMA (window={window})', linestyle='--')
plt.plot(ts_full.index[-50:], wma_forecast, label=f'WMA (window={window})', linestyle='--')
plt.axvline(ts_full.index[-50], color='red', linestyle=':', alpha=0.5, label='Forecast Start')
plt.title('Moving Average Forecasting')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# Calculate errors
actual = ts_full.values[-50:]
print(f"\nSMA - MAE: {mean_absolute_error(actual, sma_forecast):.4f}")
print(f"SMA - RMSE: {np.sqrt(mean_squared_error(actual, sma_forecast)):.4f}")
print(f"\nWMA - MAE: {mean_absolute_error(actual, wma_forecast):.4f}")
print(f"WMA - RMSE: {np.sqrt(mean_squared_error(actual, wma_forecast)):.4f}")

### 2.2 Exponential Smoothing

Exponential smoothing methods weight recent observations more heavily.

#### 2.2.1 Simple Exponential Smoothing (SES)

For data with no trend or seasonality:

$$\hat{y}_{t+1} = \alpha y_t + (1-\alpha)\hat{y}_t = \hat{y}_t + \alpha(y_t - \hat{y}_t)$$

where $0 \leq \alpha \leq 1$ is the smoothing parameter.

**Expanding the recursion:**
$$\hat{y}_{t+1} = \alpha \sum_{i=0}^{t-1} (1-\alpha)^i y_{t-i} + (1-\alpha)^t \hat{y}_1$$

#### 2.2.2 Double Exponential Smoothing (Holt's Method)

For data with trend but no seasonality:

**Level equation:**
$$\ell_t = \alpha y_t + (1-\alpha)(\ell_{t-1} + b_{t-1})$$

**Trend equation:**
$$b_t = \beta(\ell_t - \ell_{t-1}) + (1-\beta)b_{t-1}$$

**Forecast:**
$$\hat{y}_{t+h} = \ell_t + h b_t$$

#### 2.2.3 Triple Exponential Smoothing (Holt-Winters)

For data with trend AND seasonality:

**Additive seasonality:**
$$\ell_t = \alpha (y_t - s_{t-m}) + (1-\alpha)(\ell_{t-1} + b_{t-1})$$
$$b_t = \beta(\ell_t - \ell_{t-1}) + (1-\beta)b_{t-1}$$
$$s_t = \gamma(y_t - \ell_t) + (1-\gamma)s_{t-m}$$
$$\hat{y}_{t+h} = \ell_t + h b_t + s_{t+h-m}$$

where $m$ is the seasonal period.

In [None]:
class ExponentialSmoothing:
    """
    Implementation of Simple, Double, and Triple Exponential Smoothing.
    """
    def __init__(self, method='simple', alpha=0.3, beta=0.1, gamma=0.1, seasonal_period=None):
        """
        Args:
            method: 'simple', 'double', or 'triple'
            alpha: Smoothing parameter for level (0 < alpha < 1)
            beta: Smoothing parameter for trend (0 < beta < 1)
            gamma: Smoothing parameter for seasonality (0 < gamma < 1)
            seasonal_period: Period of seasonality for triple smoothing
        """
        self.method = method
        self.alpha = alpha
        self.beta = beta
        self.gamma = gamma
        self.seasonal_period = seasonal_period
    
    def fit(self, series):
        """Fit the model to the time series."""
        self.series = series.values
        n = len(self.series)
        
        if self.method == 'simple':
            # Simple exponential smoothing
            self.level = np.zeros(n)
            self.level[0] = self.series[0]
            
            for t in range(1, n):
                self.level[t] = self.alpha * self.series[t] + (1 - self.alpha) * self.level[t-1]
        
        elif self.method == 'double':
            # Double exponential smoothing (Holt's method)
            self.level = np.zeros(n)
            self.trend = np.zeros(n)
            
            # Initialize
            self.level[0] = self.series[0]
            self.trend[0] = self.series[1] - self.series[0]
            
            for t in range(1, n):
                self.level[t] = (
                    self.alpha * self.series[t] + 
                    (1 - self.alpha) * (self.level[t-1] + self.trend[t-1])
                )
                self.trend[t] = (
                    self.beta * (self.level[t] - self.level[t-1]) +
                    (1 - self.beta) * self.trend[t-1]
                )
        
        elif self.method == 'triple':
            # Triple exponential smoothing (Holt-Winters)
            if self.seasonal_period is None:
                raise ValueError("seasonal_period must be specified for triple smoothing")
            
            m = self.seasonal_period
            self.level = np.zeros(n)
            self.trend = np.zeros(n)
            self.seasonal = np.zeros(n)
            
            # Initialize level and trend
            self.level[0] = np.mean(self.series[:m])
            self.trend[0] = (np.mean(self.series[m:2*m]) - np.mean(self.series[:m])) / m
            
            # Initialize seasonal components
            for i in range(m):
                self.seasonal[i] = self.series[i] - self.level[0]
            
            # Update components
            for t in range(1, n):
                if t >= m:
                    self.level[t] = (
                        self.alpha * (self.series[t] - self.seasonal[t-m]) +
                        (1 - self.alpha) * (self.level[t-1] + self.trend[t-1])
                    )
                    self.trend[t] = (
                        self.beta * (self.level[t] - self.level[t-1]) +
                        (1 - self.beta) * self.trend[t-1]
                    )
                    self.seasonal[t] = (
                        self.gamma * (self.series[t] - self.level[t]) +
                        (1 - self.gamma) * self.seasonal[t-m]
                    )
                else:
                    self.level[t] = self.level[0]
                    self.trend[t] = self.trend[0]
        
        return self
    
    def predict(self, steps=1):
        """
        Forecast future values.
        
        Args:
            steps: Number of steps ahead to forecast
        
        Returns:
            Array of forecasted values
        """
        forecasts = np.zeros(steps)
        
        if self.method == 'simple':
            # Forecast is just the last level
            forecasts[:] = self.level[-1]
        
        elif self.method == 'double':
            # Forecast includes trend
            for h in range(1, steps + 1):
                forecasts[h-1] = self.level[-1] + h * self.trend[-1]
        
        elif self.method == 'triple':
            # Forecast includes trend and seasonality
            m = self.seasonal_period
            for h in range(1, steps + 1):
                seasonal_idx = -m + ((h - 1) % m)
                forecasts[h-1] = self.level[-1] + h * self.trend[-1] + self.seasonal[seasonal_idx]
        
        return forecasts

# Test all three methods
train_size = len(ts_full) - 50
train = ts_full[:train_size]
test = ts_full[train_size:]

# Simple ES
ses = ExponentialSmoothing(method='simple', alpha=0.3)
ses.fit(train)
ses_forecast = ses.predict(steps=50)

# Double ES
des = ExponentialSmoothing(method='double', alpha=0.3, beta=0.1)
des.fit(train)
des_forecast = des.predict(steps=50)

# Triple ES
tes = ExponentialSmoothing(method='triple', alpha=0.3, beta=0.1, gamma=0.1, seasonal_period=50)
tes.fit(train)
tes_forecast = tes.predict(steps=50)

# Plot results
plt.figure(figsize=(15, 6))
plt.plot(train.index, train.values, label='Training Data', alpha=0.7)
plt.plot(test.index, test.values, label='Actual', color='black', linewidth=2)
plt.plot(test.index, ses_forecast, label='Simple ES', linestyle='--')
plt.plot(test.index, des_forecast, label='Double ES (Holt)', linestyle='--')
plt.plot(test.index, tes_forecast, label='Triple ES (Holt-Winters)', linestyle='--')
plt.axvline(test.index[0], color='red', linestyle=':', alpha=0.5, label='Forecast Start')
plt.title('Exponential Smoothing Methods Comparison')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# Calculate errors
actual = test.values
print("\nForecast Accuracy:")
print(f"Simple ES    - MAE: {mean_absolute_error(actual, ses_forecast):.4f}, RMSE: {np.sqrt(mean_squared_error(actual, ses_forecast)):.4f}")
print(f"Double ES    - MAE: {mean_absolute_error(actual, des_forecast):.4f}, RMSE: {np.sqrt(mean_squared_error(actual, des_forecast)):.4f}")
print(f"Triple ES    - MAE: {mean_absolute_error(actual, tes_forecast):.4f}, RMSE: {np.sqrt(mean_squared_error(actual, tes_forecast)):.4f}")

### 2.3 ARIMA Models

**ARIMA(p, d, q)** = AutoRegressive Integrated Moving Average

- **p**: Order of autoregressive part
- **d**: Degree of differencing
- **q**: Order of moving average part

#### AR(p) - Autoregressive Model

$$y_t = c + \phi_1 y_{t-1} + \phi_2 y_{t-2} + \cdots + \phi_p y_{t-p} + \epsilon_t$$

The current value is a linear combination of past values.

#### MA(q) - Moving Average Model

$$y_t = c + \epsilon_t + \theta_1 \epsilon_{t-1} + \theta_2 \epsilon_{t-2} + \cdots + \theta_q \epsilon_{t-q}$$

The current value is a linear combination of past forecast errors.

#### ARMA(p, q)

$$y_t = c + \phi_1 y_{t-1} + \cdots + \phi_p y_{t-p} + \epsilon_t + \theta_1 \epsilon_{t-1} + \cdots + \theta_q \epsilon_{t-q}$$

#### ARIMA(p, d, q)

ARMA model applied to the differenced series:
$$\Delta^d y_t = \text{ARMA}(p, q)$$

**How to select p, d, q:**
1. **d**: Number of differences needed to make series stationary (ADF test)
2. **p**: Look at PACF plot - where it cuts off
3. **q**: Look at ACF plot - where it cuts off
4. Use AIC/BIC for model selection

In [None]:
def find_best_arima(series, max_p=5, max_d=2, max_q=5):
    """
    Find the best ARIMA order using AIC.
    
    Args:
        series: Time series data
        max_p, max_d, max_q: Maximum orders to try
    
    Returns:
        tuple: Best (p, d, q) order and the fitted model
    """
    best_aic = np.inf
    best_order = None
    best_model = None
    
    print("Searching for best ARIMA model...")
    print(f"{'Order':<15} {'AIC':<12} {'BIC':<12}")
    print("-" * 40)
    
    # Grid search
    results = []
    for p in range(max_p + 1):
        for d in range(max_d + 1):
            for q in range(max_q + 1):
                try:
                    model = ARIMA(series, order=(p, d, q))
                    fitted = model.fit()
                    
                    aic = fitted.aic
                    bic = fitted.bic
                    results.append(((p, d, q), aic, bic))
                    
                    if aic < best_aic:
                        best_aic = aic
                        best_order = (p, d, q)
                        best_model = fitted
                    
                except:
                    continue
    
    # Print top 5 models
    results.sort(key=lambda x: x[1])
    for i, (order, aic, bic) in enumerate(results[:5]):
        marker = "<-- BEST" if order == best_order else ""
        print(f"{str(order):<15} {aic:<12.2f} {bic:<12.2f} {marker}")
    
    return best_order, best_model

# Find best ARIMA model
best_order, best_arima = find_best_arima(train, max_p=3, max_d=2, max_q=3)

print(f"\n\nBest ARIMA order: {best_order}")
print(f"\nModel Summary:")
print(best_arima.summary())

In [None]:
# Forecast with ARIMA
arima_forecast = best_arima.forecast(steps=50)

# Get confidence intervals
forecast_result = best_arima.get_forecast(steps=50)
forecast_df = forecast_result.summary_frame(alpha=0.05)

# Plot results
plt.figure(figsize=(15, 6))
plt.plot(train.index, train.values, label='Training Data', alpha=0.7)
plt.plot(test.index, test.values, label='Actual', color='black', linewidth=2)
plt.plot(test.index, arima_forecast, label=f'ARIMA{best_order}', linestyle='--', linewidth=2)

# Plot confidence intervals
plt.fill_between(test.index, 
                 forecast_df['mean_ci_lower'].values, 
                 forecast_df['mean_ci_upper'].values, 
                 alpha=0.2, label='95% CI')

plt.axvline(test.index[0], color='red', linestyle=':', alpha=0.5, label='Forecast Start')
plt.title(f'ARIMA{best_order} Forecasting with Confidence Intervals')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# Model diagnostics
fig, axes = plt.subplots(2, 2, figsize=(15, 8))
best_arima.plot_diagnostics(fig=fig)
plt.tight_layout()
plt.show()

# Calculate errors
arima_mae = mean_absolute_error(test.values, arima_forecast)
arima_rmse = np.sqrt(mean_squared_error(test.values, arima_forecast))

print(f"\nARIMA{best_order} Forecast Accuracy:")
print(f"MAE:  {arima_mae:.4f}")
print(f"RMSE: {arima_rmse:.4f}")

### 2.4 SARIMA - Seasonal ARIMA

**SARIMA(p, d, q)(P, D, Q)s** extends ARIMA to handle seasonality.

- **(p, d, q)**: Non-seasonal components
- **(P, D, Q)s**: Seasonal components with period s

**Full SARIMA equation:**

$$\phi_p(B) \Phi_P(B^s) \Delta^d \Delta_s^D y_t = \theta_q(B) \Theta_Q(B^s) \epsilon_t$$

where:
- $B$ is the backshift operator: $B y_t = y_{t-1}$
- $\phi_p(B)$ is the non-seasonal AR polynomial
- $\Phi_P(B^s)$ is the seasonal AR polynomial
- $\theta_q(B)$ is the non-seasonal MA polynomial
- $\Theta_Q(B^s)$ is the seasonal MA polynomial

In [None]:
# Fit SARIMA model
# We know our data has seasonality with period 50
seasonal_order = (1, 1, 1, 50)  # (P, D, Q, s)
order = (1, 1, 1)  # (p, d, q)

print(f"Fitting SARIMA{order}x{seasonal_order} model...")
sarima_model = SARIMAX(train, order=order, seasonal_order=seasonal_order)
sarima_fitted = sarima_model.fit(disp=False)

print("\nSARIMA Model Summary:")
print(sarima_fitted.summary())

# Forecast
sarima_forecast = sarima_fitted.forecast(steps=50)
sarima_forecast_obj = sarima_fitted.get_forecast(steps=50)
sarima_forecast_df = sarima_forecast_obj.summary_frame(alpha=0.05)

# Plot results
plt.figure(figsize=(15, 6))
plt.plot(train.index, train.values, label='Training Data', alpha=0.7)
plt.plot(test.index, test.values, label='Actual', color='black', linewidth=2)
plt.plot(test.index, sarima_forecast, label=f'SARIMA{order}x{seasonal_order}', 
         linestyle='--', linewidth=2, color='green')

# Plot confidence intervals
plt.fill_between(test.index, 
                 sarima_forecast_df['mean_ci_lower'].values, 
                 sarima_forecast_df['mean_ci_upper'].values, 
                 alpha=0.2, color='green', label='95% CI')

plt.axvline(test.index[0], color='red', linestyle=':', alpha=0.5, label='Forecast Start')
plt.title(f'SARIMA Forecasting with Seasonal Components')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# Calculate errors
sarima_mae = mean_absolute_error(test.values, sarima_forecast)
sarima_rmse = np.sqrt(mean_squared_error(test.values, sarima_forecast))

print(f"\nSARIMA Forecast Accuracy:")
print(f"MAE:  {sarima_mae:.4f}")
print(f"RMSE: {sarima_rmse:.4f}")

print(f"\nComparison with ARIMA:")
print(f"ARIMA MAE:  {arima_mae:.4f}")
print(f"SARIMA MAE: {sarima_mae:.4f}")
print(f"Improvement: {((arima_mae - sarima_mae) / arima_mae * 100):.2f}%")