# Traffic Prediction Pipeline
This notebook implements a TrafficPredictor class for time series forecasting, feature engineering, model training, interpretation, and error analysis using XGBoost, Optuna, and SHAP.


In [5]:
# importing the libraries
import pandas as pd
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from xgboost import XGBRegressor
import optuna
import shap
import joblib
import warnings
import os

warnings.filterwarnings('ignore')

## TrafficPredictor Class
Encapsulates all modeling and analysis steps, including feature engineering, model training, interpretability, and error visualization.


In [None]:
class TrafficPredictor:
    def __init__(self, data_path, output_dir='results'):
        self.data_path = data_path
        self.output_dir = output_dir
        self._create_output_dir()
        self.df = None
        self.X_train, self.X_test = None, None
        self.y_train, self.y_test = None, None
        self.feature_names = None
        self.scaler = MinMaxScaler()
        self.models = {}
        self.metrics = {}
        self.best_params = {}
        self.chunk_size = 10000
        self.study = None

    def _create_output_dir(self):
        for subdir in ['models', 'plots', 'shap']:
            os.makedirs(os.path.join(self.output_dir, subdir), exist_ok=True)

    def _add_time_features(self, df):
        """
        Adds time-based features to dataframe.
        Tries to capture temporal cycles like hour of day, month, etc.
        """
        df = df.copy()
        # Cyclical hour encoding
        df['hour_sin'] = np.sin(2 * np.pi * df['Hour'] / 23.0)
        df['hour_cos'] = np.cos(2 * np.pi * df['Hour'] / 23.0)
        # Datetime features
        df['day_of_week'] = df.index.dayofweek
        df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
        df['day_of_month'] = df.index.day
        df['month'] = df.index.month
        df['year'] = df.index.year
        # Cyclical day/month
        df['day_sin'] = np.sin(2 * np.pi * df.index.dayofyear / 365.0)
        df['day_cos'] = np.cos(2 * np.pi * df.index.dayofyear / 365.0)
        df['month_sin'] = np.sin(2 * np.pi * df.index.month / 12.0)
        df['month_cos'] = np.cos(2 * np.pi * df.index.month / 12.0)
        # Relative time
        df['days_since_start'] = (df.index - df.index.min()).days
        # Lag features for vehicles
        for lag in [1, 2, 3, 24, 168]:
            df[f'vehicles_lag_{lag}'] = df['Vehicles'].shift(lag)
        # Rolling averages
        for window in [3, 6, 12, 24]:
            df[f'vehicles_rolling_mean_{window}'] = df['Vehicles'].rolling(window).mean()
            df[f'vehicles_rolling_std_{window}'] = df['Vehicles'].rolling(window).std()
        # Drop rows with NA after lag/roll
        df = df.dropna()
        return df

    def load_and_preprocess_data(self):
        print("Loading and preprocessing data...")
        chunks = []
        # Read in manageable chunks for large files
        for chunk in pd.read_csv(self.data_path, chunksize=self.chunk_size,
                                usecols=['DateTime','Junction','Vehicles','Hour','Avg_Temp','Precipitation','Wind_Speed']):
            chunk['DateTime'] = pd.to_datetime(chunk['DateTime'])
            chunk.sort_values('DateTime', inplace=True)
            chunks.append(chunk)
        self.df = pd.concat(chunks)
        self.df.set_index('DateTime', inplace=True)
        self.df.sort_index(inplace=True)
        # Add temporal features
        self.df = self._add_time_features(self.df)
        self.feature_names = [col for col in self.df.columns if col != 'Vehicles']
        print(f"Feature count: {len(self.feature_names)}")
        return self.df

    def objective(self, trial, X, y):
        """Optuna objective for hyperparameter tuning."""
        params = {
            'n_estimators': trial.suggest_int('n_estimators', 50, 300),
            'max_depth': trial.suggest_int('max_depth', 3, 10),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
            'subsample': trial.suggest_float('subsample', 0.5, 1.0),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
            'random_state': 42,
            'n_jobs': -1,
            'tree_method': 'hist'
        }
        tscv = TimeSeriesSplit(n_splits=3)
        cv_scores = []
        for train_idx, val_idx in tscv.split(X):
            model = XGBRegressor(**params)
            model.fit(X[train_idx], y[train_idx])
            pred = model.predict(X[val_idx])
            cv_scores.append(np.sqrt(mean_squared_error(y[val_idx], pred)))
        return np.mean(cv_scores)

    def optimize_hyperparameters(self, n_trials=15):
        print("\nRunning Optuna for hyperparam tuning...")
        X = self.scaler.fit_transform(self.X_train)
        y = self.y_train.values
        self.study = optuna.create_study(direction='minimize')
        self.study.optimize(lambda t: self.objective(t, X, y), n_trials=n_trials, n_jobs=-1)
        self.best_params = self.study.best_params
        print("Best params:", self.best_params)
        plt.figure(figsize=(8, 5))
        hist = self.study.trials_dataframe()
        plt.plot(hist['number'], hist['value'], marker='o')
        plt.title('Optuna Optimization History')
        plt.xlabel('Trial')
        plt.ylabel('RMSE')
        plt.grid(True)
        plt.savefig(f"{self.output_dir}/plots/opt_history.png")
        plt.close()
        return self.best_params

    def train_xgboost(self, use_optimization=True):
        print("Training XGBoost...")
        if use_optimization and not self.best_params:
            self.optimize_hyperparameters()
        params = self.best_params or {
            'n_estimators': 150,
            'learning_rate': 0.1,
            'max_depth': 5,
            'subsample': 0.7,
            'colsample_bytree': 0.7,
            'random_state': 42,
            'n_jobs': -1,
            'tree_method': 'hist'
        }
        model = XGBRegressor(**params)
        model.fit(self.X_train, self.y_train)
        joblib.dump(model, f"{self.output_dir}/models/xgb_model.pkl")
        metrics = self.evaluate_model(model, self.X_test, self.y_test, 'XGBRegressor')
        self._plot_feature_importance(model)
        self._shap_analysis(model)
        self.models['xgboost'] = model
        return model, metrics

    def _plot_feature_importance(self, model):
        impt = model.feature_importances_
        order = np.argsort(impt)[::-1]
        plt.figure(figsize=(10,6))
        plt.barh(range(20), impt[order][:20][::-1])
        plt.yticks(range(20), [self.feature_names[i] for i in order][:20][::-1])
        plt.title("Top 20 Feature Importances")
        plt.tight_layout()
        plt.savefig(f"{self.output_dir}/plots/feature_importance.png")
        plt.close()
        pd.DataFrame({'feature':[self.feature_names[i] for i in order],'importance':impt[order]})\
            .to_csv(f"{self.output_dir}/top_features.csv", index=False)

    def _shap_analysis(self, model, sample_size=1000):
        """Computes SHAP values for interpretability, stores summary and dependence plots."""
        try:
            if len(self.X_test) > sample_size:
                X_sample = shap.utils.sample(self.X_test, sample_size)
            else:
                X_sample = self.X_test
            explainer = shap.Explainer(model)
            shap_values = explainer(X_sample)
            plt.figure(figsize=(10,7))
            shap.summary_plot(shap_values, X_sample, feature_names=self.feature_names, show=False)
            plt.tight_layout()
            plt.savefig(f"{self.output_dir}/shap/summary.png")
            plt.close()
            for i in range(min(5, len(self.feature_names))):
                plt.figure(figsize=(8,5))
                shap.dependence_plot(i, shap_values.values, X_sample, feature_names=self.feature_names, show=False)
                plt.tight_layout()
                plt.savefig(f"{self.output_dir}/shap/dependence_{i}.png")
                plt.close()
        except Exception as err:
            print("SHAP error:", err)

    def evaluate_model(self, model, X_test, y_test, name):
        """Quick reporting and error metrics."""
        pred = model.predict(X_test)
        mae = mean_absolute_error(y_test, pred)
        mse = mean_squared_error(y_test, pred)
        rmse = np.sqrt(mse)
        r2 = r2_score(y_test, pred)
        mape = np.mean(np.abs((y_test - pred) / np.maximum(1e-8, y_test))) * 100
        metrics = {'MAE': mae, 'MSE': mse, 'RMSE': rmse, 'R2': r2, 'MAPE': mape}
        print(f"{name} results:")
        for k,v in metrics.items():
            print(f"{k}: {v:.4f}")
        self._plot_predictions(y_test, pred, name)
        return metrics

    def _plot_predictions(self, y_true, y_pred, name):
        plt.figure(figsize=(8,5))
        idx = np.random.choice(len(y_true), min(200, len(y_true)), replace=False)
        plt.scatter(y_true.iloc[idx], y_pred[idx], alpha=0.5)
        plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--')
        plt.title(f'{name}: Actual vs Predicted')
        plt.xlabel('Actual')
        plt.ylabel('Predicted')
        plt.tight_layout()
        plt.savefig(f"{self.output_dir}/plots/{name.lower()}_prediction.png")
        plt.close()

    def analyze_errors(self, model):
        pred = model.predict(self.X_test)
        err = self.y_test - pred
        abs_err = np.abs(err)
        error_df = pd.DataFrame({'actual': self.y_test, 'predicted': pred, 'error': err,
                                'abs_error': abs_err, 'datetime': self.y_test.index})
        error_df['hour'] = error_df['datetime'].dt.hour
        error_df['day_of_week'] = error_df['datetime'].dt.dayofweek
        error_df['is_weekend'] = error_df['day_of_week'].isin([5, 6])
        self._plot_error_patterns(error_df)
        error_df.to_csv(f"{self.output_dir}/error_analysis.csv", index=False)
        return error_df

    def _plot_error_patterns(self, error_df):
        plt.figure(figsize=(10,5))
        error_df.groupby('hour')['abs_error'].mean().plot()
        plt.title('Mean Abs Error by Hour')
        plt.xlabel('Hour of Day')
        plt.tight_layout()
        plt.savefig(f"{self.output_dir}/plots/error_hour.png")
        plt.close()
        plt.figure(figsize=(10,5))
        error_df.groupby('day_of_week')['abs_error'].mean().plot()
        plt.title('Mean Abs Error by Day of Week')
        plt.xlabel('Day of Week (0=Mon)')
        plt.tight_layout()
        plt.savefig(f"{self.output_dir}/plots/error_weekday.png")
        plt.close()
        plt.figure(figsize=(8,5))
        plt.scatter(error_df['actual'], error_df['error'], alpha=0.3)
        plt.axhline(0, color='r', linestyle='--')
        plt.title('Error vs Actual Volume')
        plt.xlabel('Actual Traffic')
        plt.ylabel('Prediction Error')
        plt.tight_layout()
        plt.savefig(f"{self.output_dir}/plots/error_vs_actual.png")
        plt.close()


## Main Pipeline Execution
This cell sets up the traffic predictor, runs preprocessing, model training, evaluation, and error analysis.


In [None]:
def main():
    try:
        # Model pipeline steps
        predictor = TrafficPredictor('integrated_traffic_data.csv')
        df = predictor.load_and_preprocess_data()
        X = predictor.scaler.fit_transform(df[predictor.feature_names])
        y = df['Vehicles']
        # Train test split
        split_idx = int(0.8 * len(X))
        predictor.X_train, predictor.X_test = X[:split_idx], X[split_idx:]
        predictor.y_train, predictor.y_test = y[:split_idx], y[split_idx:]
        # Fit model and print results
        model, metrics = predictor.train_xgboost(use_optimization=True)
        error_df = predictor.analyze_errors(model)
        print("Training and analysis complete. Results in:", predictor.output_dir)
    except Exception as exc:
        print("Runtime error:", exc)
        import traceback; traceback.print_exc()


In [None]:
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['OPENBLAS_NUM_THREADS'] = '1'
main()

[I 2025-09-10 09:47:09,745] A new study created in memory with name: no-name-46e25c03-29f5-4e1b-a443-074c14a6a178


Loading and preprocessing data...
Feature count: 30
Training XGBoost...

Running Optuna for hyperparam tuning...


[I 2025-09-10 09:47:10,137] Trial 6 finished with value: 7.139128151107272 and parameters: {'n_estimators': 57, 'max_depth': 3, 'learning_rate': 0.05398514441747595, 'subsample': 0.747250380935055, 'colsample_bytree': 0.5143912232857653}. Best is trial 6 with value: 7.139128151107272.
[I 2025-09-10 09:47:10,329] Trial 3 finished with value: 7.788563590840444 and parameters: {'n_estimators': 96, 'max_depth': 3, 'learning_rate': 0.02275240036050781, 'subsample': 0.9356490135881741, 'colsample_bytree': 0.7646339168322039}. Best is trial 6 with value: 7.139128151107272.
[I 2025-09-10 09:47:10,332] Trial 7 finished with value: 12.01792963847228 and parameters: {'n_estimators': 82, 'max_depth': 3, 'learning_rate': 0.010426681345545504, 'subsample': 0.5471409491355568, 'colsample_bytree': 0.9714365752253389}. Best is trial 6 with value: 7.139128151107272.
[I 2025-09-10 09:47:10,633] Trial 4 finished with value: 3.8611682057937187 and parameters: {'n_estimators': 149, 'max_depth': 3, 'learning

Best params: {'n_estimators': 149, 'max_depth': 3, 'learning_rate': 0.23607149522753962, 'subsample': 0.6484929700131534, 'colsample_bytree': 0.961381140403829}
XGBRegressor results:
MAE: 2.8922
MSE: 19.9633
RMSE: 4.4680
R2: 0.9730
MAPE: 18.0838
Training and analysis complete. Results in: results
