<a href="https://colab.research.google.com/github/balashankar-d/marketing-mix-modelling/blob/main/revenue_predict.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install optuna

Collecting optuna
  Downloading optuna-4.5.0-py3-none-any.whl.metadata (17 kB)
Collecting colorlog (from optuna)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Downloading optuna-4.5.0-py3-none-any.whl (400 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m400.9/400.9 kB[0m [31m8.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorlog-6.9.0-py3-none-any.whl (11 kB)
Installing collected packages: colorlog, optuna
Successfully installed colorlog-6.9.0 optuna-4.5.0


In [None]:
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error, mean_absolute_percentage_error
import plotly.graph_objects as go
import optuna
import warnings

# Suppress warnings for a cleaner output
warnings.filterwarnings('ignore')
optuna.logging.set_verbosity(optuna.logging.WARNING)

In [None]:
file_path = 'data.csv'

try:
    df = pd.read_csv(file_path)
except FileNotFoundError:
    print(f"Error: The file '{file_path}' was not found. Using a dummy dataframe for demonstration.")


# Data Preparation
df.columns = df.columns.str.lower().str.replace(' ', '_')
df['week'] = pd.to_datetime(df['week'])
df = df.set_index('week').sort_index()

print("Data loaded successfully. Shape:", df.shape)

Data loaded successfully. Shape: (104, 11)


In [None]:
def geometric_adstock(series, decay_rate):
    adstocked_series = np.zeros_like(series, dtype=float)
    adstocked_series[0] = series.iloc[0]
    for i in range(1, len(series)):
        adstocked_series[i] = series.iloc[i] + decay_rate * adstocked_series[i-1]
    return adstocked_series

def create_features(df_in, decay_rate):
    """Create features with a specific decay rate."""
    df_feat = df_in.copy()
    media_channels = ['facebook_spend', 'google_spend', 'tiktok_spend', 'instagram_spend', 'snapchat_spend']

    # a. Adstock & Saturation with the given decay rate
    for channel in media_channels:
        df_feat[f'{channel}_adstock'] = geometric_adstock(df_feat[channel], decay_rate)
        df_feat[f'{channel}_saturation'] = np.log1p(df_feat[f'{channel}_adstock'])

    # b. IMPROVED Temporal Features (Cyclical)
    df_feat['week_of_year'] = df_feat.index.isocalendar().week.astype(float)
    df_feat['week_sin'] = np.sin(2 * np.pi * df_feat['week_of_year'] / 52)
    df_feat['week_cos'] = np.cos(2 * np.pi * df_feat['week_of_year'] / 52)
    df_feat['month'] = df_feat.index.month
    df_feat['time_index'] = np.arange(len(df_feat))

    # c. NEW Interaction Feature
    df_feat['promo_facebook_interaction'] = df_feat['promotions'] * df_feat['facebook_spend_saturation']

    # d. Lag Features
    lag_vars = media_channels + ['emails_send', 'sms_send']
    for var in lag_vars:
        for i in range(1, 4):
            df_feat[f'{var}_lag{i}'] = df_feat[var].shift(i)

    return df_feat.dropna()


In [None]:
decay_rates_to_try = [0.1, 0.3, 0.5, 0.7, 0.9]
best_r2 = -np.inf
best_decay_rate = None
best_model = None
best_test_data = None

print("\nStarting optimization to find the best adstock decay rate...")

for decay in decay_rates_to_try:
    print(f"--- Testing Decay Rate: {decay} ---")

    df_featured = create_features(df, decay_rate=decay)

    test_size = int(len(df_featured) * 0.2)
    train_df = df_featured.iloc[:-test_size]
    test_df = df_featured.iloc[-test_size:]

    # STAGE 1: Model the Mediator
    stage1_predictors = [
        'facebook_spend_saturation', 'tiktok_spend_saturation', 'snapchat_spend_saturation',
        'time_index', 'week_sin', 'week_cos', 'month'
    ]
    target_stage1 = 'google_spend_saturation'

    X_train_s1 = train_df[stage1_predictors]
    y_train_s1 = train_df[target_stage1]

    model_s1 = xgb.XGBRegressor(random_state=42, n_estimators=100, objective='reg:squarederror')
    model_s1.fit(X_train_s1, y_train_s1)

    df_featured['google_spend_predicted'] = model_s1.predict(df_featured[stage1_predictors])
    df_featured['google_spend_residuals'] = df_featured[target_stage1] - df_featured['google_spend_predicted']

    # STAGE 2: Model Revenue
    stage2_predictors = [
        'facebook_spend_saturation', 'tiktok_spend_saturation', 'snapchat_spend_saturation',
        'instagram_spend_saturation', 'emails_send', 'sms_send',
        'google_spend_predicted', 'google_spend_residuals',
        'average_price', 'promotions', 'social_followers',
        'time_index', 'week_sin', 'week_cos', 'month',
        'promo_facebook_interaction'
    ]
    lag_cols = [col for col in df_featured.columns if '_lag' in col]
    stage2_predictors.extend(lag_cols)
    target_stage2 = 'revenue'

    train_df_s2 = df_featured.iloc[:-test_size]
    test_df_s2 = df_featured.iloc[-test_size:]

    X_train = train_df_s2[stage2_predictors]
    y_train = train_df_s2[target_stage2]
    X_test = test_df_s2[stage2_predictors]
    y_test = test_df_s2[target_stage2]

    # ADVANCED Hyperparameter Tuning with Optuna
    def objective(trial):
        params = {
            'objective': 'reg:squarederror',
            'eval_metric': 'rmse', # Specify metric for early stopping
            'n_estimators': trial.suggest_int('n_estimators', 100, 1000, step=100),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
            'max_depth': trial.suggest_int('max_depth', 3, 8),
            'subsample': trial.suggest_float('subsample', 0.6, 1.0),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
            'gamma': trial.suggest_float('gamma', 0, 5),
            'lambda': trial.suggest_float('lambda', 0.01, 10),
            'alpha': trial.suggest_float('alpha', 0.01, 10),
            'random_state': 42
        }

        # <<< FIX IS HERE >>>
        # Add early_stopping_rounds to the model's parameters at initialization
        model = xgb.XGBRegressor(**params, early_stopping_rounds=30)

        # The .fit() method now only needs the data and the evaluation set
        model.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=False)

        preds = model.predict(X_test)
        r2 = r2_score(y_test, preds)
        return r2

    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=50)

    current_r2 = study.best_value
    print(f"Decay Rate {decay} -> Test R²: {current_r2:.4f}")
    if current_r2 > best_r2:
        best_r2 = current_r2
        best_decay_rate = decay
        best_model = xgb.XGBRegressor(random_state=42, **study.best_params, early_stopping_rounds=30)
        best_model.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=False)
        best_test_data = (X_test, y_test)



Starting optimization to find the best adstock decay rate...
--- Testing Decay Rate: 0.1 ---
Decay Rate 0.1 -> Test R²: 0.9221
--- Testing Decay Rate: 0.3 ---
Decay Rate 0.3 -> Test R²: 0.9415
--- Testing Decay Rate: 0.5 ---
Decay Rate 0.5 -> Test R²: 0.7103
--- Testing Decay Rate: 0.7 ---
Decay Rate 0.7 -> Test R²: 0.3013
--- Testing Decay Rate: 0.9 ---
Decay Rate 0.9 -> Test R²: 0.5521


In [None]:
print(f"\n--- BEST MODEL PERFORMANCE (with Decay Rate = {best_decay_rate}) ---")

X_test_final, y_test_final = best_test_data
y_pred = best_model.predict(X_test_final)

r2 = r2_score(y_test_final, y_pred)
mae = mean_absolute_error(y_test_final, y_pred)
rmse = np.sqrt(mean_squared_error(y_test_final, y_pred))
mape = mean_absolute_percentage_error(y_test_final, y_pred)

print(f"R-squared (R²): {r2:.4f}")
print(f"Mean Absolute Error (MAE): {mae:,.2f}")
print(f"Root Mean Squared Error (RMSE): {rmse:,.2f}")
print(f"Mean Absolute Percentage Error (MAPE): {mape:.2%}")

results_df = pd.DataFrame({'Actual': y_test_final, 'Predicted': y_pred}, index=y_test_final.index)

fig = go.Figure()
fig.add_trace(go.Scatter(x=results_df.index, y=results_df['Actual'], mode='lines', name='Actual Revenue', line=dict(color='royalblue', width=2)))
fig.add_trace(go.Scatter(x=results_df.index, y=results_df['Predicted'], mode='lines', name='Predicted Revenue', line=dict(color='crimson', width=2, dash='dot')))
fig.update_layout(
    title='Model Performance: Actual vs. Predicted Revenue on Test Set',
    xaxis_title='Week', yaxis_title='Weekly Revenue',
    legend_title='Legend', template='plotly_white'
)
fig.show()


--- BEST MODEL PERFORMANCE (with Decay Rate = 0.3) ---
R-squared (R²): 0.9415
Mean Absolute Error (MAE): 11,291.41
Root Mean Squared Error (RMSE): 13,586.13
Mean Absolute Percentage Error (MAPE): 90765.87%
