In [0]:
import pandas as pd
import numpy as np
import seaborn as sns
import plotly.express as px
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.preprocessing import StandardScaler, MinMaxScaler, OneHotEncoder
from sklearn.model_selection import TimeSeriesSplit
from sklearn.linear_model import LinearRegression
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
import pmdarima as pm
import xgboost as xgb
from xgboost import XGBRegressor
from prophet import Prophet
from pyspark.sql.functions import to_date, to_timestamp, col, last, when, lit, current_date, date_sub, datediff, substring, sequence, explode, coalesce, sum as spark_sum, min as spark_min
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import joblib
import holidays
import mlflow
mlflow.autolog(disable=True)

pi = spark.read.table('data_prod.silver_sanezdb.priceinspection').select('segment', 'when', 'standardprice')
dmh = spark.read.table('data_experience_commercial.cbt_0923_segmentfinder.dimensions_history').select('flightkey', 'onsale_dt', 'ty_capacity', 'routetype', 'region', 'route', 'flight_dt')

pi = pi.withColumnRenamed('when', 'charge_dt').withColumnRenamed('segment', 'flightkey').withColumn('charge_dt', F.col('charge_dt').cast('date')).groupby('flightkey', 'charge_dt').agg(F.avg('standardprice').alias('price'))
pi_with_meta = pi.join(dmh, on='flightkey', how='inner')
filtered_pi = pi_with_meta.filter((F.col('region').isin(['UK-London', 'UK-Regions'])) & (F.col('charge_dt') >= '2021-01-01') & (F.col('routetype') == 'Domestic'))
date_ranges = filtered_pi.groupBy('flightkey').agg(F.min('charge_dt').alias('first_date'), F.first('flight_dt').alias('flight_dt'))
current_date = F.lit(datetime.now().date())
date_ranges = date_ranges.withColumn('end_date', F.least(F.col('flight_dt'), current_date))
scaffold_pi = date_ranges.withColumn('charge_dt', F.explode(F.sequence(F.col('first_date'), F.col('end_date')))).select('flightkey', 'charge_dt')
filled_pi = scaffold_pi.join(filtered_pi.select('flightkey', 'charge_dt', 'price'), on=['flightkey', 'charge_dt'], how='left')
window_spec = Window.partitionBy('flightkey').orderBy('charge_dt').rowsBetween(Window.unboundedPreceding, Window.currentRow)
final_pi = filled_pi.withColumn('price', F.last('price', ignorenulls=True).over(window_spec))
final_pi = final_pi.join(dmh, on='flightkey', how='inner')
price_history = final_pi.groupby('charge_dt', 'flight_dt').agg(F.avg('price').cast('double').alias('price'))

rts_master = spark.read.table('data_experience_commercial.cbt_1423_rtsuite.master').select('charge_dt', 'dtg', 'chargeproduct', 'unt_pre', 'flightkey', 'rev_pre', 'unt_net')
df = rts_master.join(dmh, on='flightkey', how='inner')
df = df.filter((F.col('chargeproduct') == 'Ticket') & (F.col('dtg') >= 0) & (F.col('dtg') <= 364) & (F.col('charge_dt') >= '2021-01-01') & (F.col('routetype') == 'Domestic') & (F.col('region').isin(['UK-London','UK-Regions'])) & (F.col('dtg') < (F.datediff(F.col('flight_dt'), F.col('onsale_dt')) - 25)))

window_spec2 = Window.partitionBy('flightkey').orderBy(F.col('dtg').desc())
df = df.withColumn('pax_net', F.sum('unt_net').over(window_spec2)).withColumn('load_factor', F.col('pax_net') / F.col('ty_capacity')).withColumn('unt_pre', F.when(F.col('unt_pre') < 0, 0).otherwise(F.col('unt_pre'))).withColumn('rev_pre', F.when(F.col('rev_pre') < 0, 0).otherwise(F.col('rev_pre'))).withColumn('yield', F.when(F.col('unt_pre') == 0, 0).otherwise(F.col('rev_pre') / F.col('unt_pre')))

df = df.groupby('charge_dt', 'flight_dt').agg(F.first('dtg').alias('dtg'), F.sum('unt_pre').alias('unt_pre'), F.sum('ty_capacity').alias('ty_capacity'), F.avg('load_factor').alias('load_factor'), F.avg('yield').alias('yield'))

df = df.join(price_history, on=['charge_dt', 'flight_dt'], how='left').toPandas()

df.info()
df_orginal = df.copy()

In [0]:
# Data Optimisation

def optimize_df(df):

    start_mem = df.memory_usage(deep=True).sum() / 1024**2
    print(f"Initial memory usage: {start_mem:.2f} MB")

    for col in df.columns:

        if '_dt' in col:
            df[col] = pd.to_datetime(df[col])

        elif df[col].dtype == 'object':
            df[col] = df[col].astype('category')

        elif 'float' in str(df[col].dtype):
            df[col] = df[col].astype(np.float16)
            
        elif 'int' in str(df[col].dtype):
            df[col] = pd.to_numeric(df[col], downcast='integer')

    end_mem = df.memory_usage(deep=True).sum() / 1024**2
    reduction = 100 * (start_mem - end_mem) / start_mem
    print(f"Final memory usage: {end_mem:.2f} MB ({reduction:.2f}% reduction)")

    return df

optimize_df(df)
df.info()

In [0]:
# Hierarchy Diagram

layers = ['Network', 'Region', 'Region - RouteType', 'Route']
fig, ax = plt.subplots(figsize=(8, 12))

for i, layer in enumerate(layers):
    ax.text(0.5, (0.8 - i * 0.2), layer, fontsize=18, ha='center', va='center', bbox={'boxstyle':'square,pad=0.5', 'facecolor':'white', 'edgecolor':'black'})
    ax.plot([0.5, 0.5], [(0.75 - i * 0.2), (0.65 - i * 0.2)])

ax.axis('off')
ax.set_ylim(0, 1)
ax.set_xlim(0, 1)

plt.show()

In [0]:
#Sales Over Charge Date

total_sales = df.groupby('charge_dt')['unt_pre'].sum().reset_index()
plt.style.use('ggplot')
total_sales.plot(style='-', figsize=(20,5), title = 'sales by charge date', y='unt_pre', x='charge_dt')
plt.show()

In [0]:
# Removing covid-19 outlier data before 2022. Keeping Dec 2021 for now to generate lag features.

df = df[df['charge_dt'] >= '2021-10-01']
total_sales_by_charge_dt = df.groupby('charge_dt')['unt_pre'].sum().reset_index()
plt.style.use('ggplot')
total_sales_by_charge_dt.plot(style='-', figsize=(20,5), title = 'sales by charge date', y='unt_pre', x='charge_dt')
plt.show()

In [0]:
total_sales_by_flight_dt = df.groupby('flight_dt')['unt_pre'].sum().reset_index()
plt.style.use('ggplot')
total_sales_by_flight_dt.plot(style='-', figsize=(20,5), title = 'sales by flight date', y='unt_pre', x='flight_dt')
plt.show()

In [0]:
sales_by_routetype = df.groupby(['charge_dt', 'routetype'])['unt_pre'].mean().reset_index()
fig, ax = plt.subplots(figsize=(20,5))
for routetype in sales_by_routetype['routetype'].unique():
    subset = sales_by_routetype[sales_by_routetype['routetype'] == routetype]
    ax.plot(subset['charge_dt'], subset['unt_pre'], label=routetype)
plt.title('Sales by routetype')
plt.xlabel('Charge Date')
plt.ylabel('Sales')
plt.legend()
plt.show()

In [0]:
df = df[df['routetype'] != 'Charter']
df['routetype'] = df['routetype'].cat.remove_unused_categories()

In [0]:
# Typical Booking Curve

total_sales_by_dtg = df.groupby('dtg')['unt_pre'].sum().reset_index()
plt.style.use('ggplot')
total_sales_by_dtg.plot(style='-', figsize=(20,5), title = 'sales by dtg since 2022', y='unt_pre', x='dtg')
plt.show()

In [0]:
total_sales_by_log_dtg = df.groupby('dtg')['unt_pre'].sum().reset_index()
total_sales_by_log_dtg['log_dtg'] = np.log(total_sales_by_log_dtg['dtg'])
plt.style.use('ggplot')
total_sales_by_log_dtg.plot(style='-', figsize=(20,5), title = 'sales by dtg since 2022', y='unt_pre', x='log_dtg')
plt.show()

In [0]:
# Booking Curve Over Time

df_dynamic_plot = df.reset_index()
df_dynamic_plot['charge_dt'] = df_dynamic_plot['charge_dt'].astype(str)
df_dynamic_plot = df_dynamic_plot.groupby(['dtg', 'charge_dt'])['unt_pre'].sum().reset_index()
fig = px.line(df_dynamic_plot, x='dtg', y='unt_pre', animation_frame='charge_dt', title='Sales by DTG with charge date variations')
fig.update_layout(xaxis_title='Days To Go', yaxis_title='Sales', legend_title='Charge Date', height=900, width=1400)

fig.show()

In [0]:
#ACF charge-date
df_ACF = df.groupby('charge_dt')['unt_pre'].sum()
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(18, 8))

plot_acf(df_ACF, ax=ax1, lags=56)
ax1.set_title('Autocorrelation Function (ACF)')

plot_pacf(df_ACF, ax=ax2, lags=56, method='ywm') 
ax2.set_title('Partial Autocorrelation Function (PACF)')

plt.tight_layout()
plt.show()

In [0]:
#ACF flight-date
df_ACF = df.groupby('flight_dt')['unt_pre'].sum()
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(18, 8))

plot_acf(df_ACF, ax=ax1, lags=56)
ax1.set_title('Autocorrelation Function (ACF)')

plot_pacf(df_ACF, ax=ax2, lags=56, method='ywm') 
ax2.set_title('Partial Autocorrelation Function (PACF)')

plt.tight_layout()
plt.show()

In [0]:
df = df_orginal.copy()
df = optimize_df(df)
df = df[df['charge_dt'] >= '2021-10-01']

In [0]:
def generate_lag_features(df, charge_lags, flight_lags):

    df['charge_dt'] = pd.to_datetime(df['charge_dt'])
    df['flight_dt'] = pd.to_datetime(df['flight_dt'])
    df_source = df.copy()

    for c_lag in charge_lags:
        for f_lag in flight_lags:
            f_lag_str = str(f_lag)
            feature_col_name = f'C{c_lag}F{f_lag_str}'
            
            df_temp = df_source.copy()
            df_temp['merge_key_charge_dt'] = df_temp['charge_dt'] + timedelta(days=c_lag)
            df_temp['merge_key_flight_dt'] = df_temp['flight_dt'] - timedelta(days=f_lag)
            df_temp = df_temp.rename(columns={'unt_pre': feature_col_name})
            df = pd.merge(df, df_temp[['merge_key_charge_dt', 'merge_key_flight_dt', feature_col_name]], left_on=['charge_dt', 'flight_dt'], right_on=['merge_key_charge_dt', 'merge_key_flight_dt'], how='left')
            df = df.drop(columns=['merge_key_charge_dt', 'merge_key_flight_dt'])
            df.fillna(-1, inplace=True)

    return df

flight_lags = [-28, -21, -14, -7, 0, 7, 14, 21, 28] 
charge_lags = [7, 14, 21, 28] 
df = generate_lag_features(df, charge_lags, flight_lags)
df.info()


In [0]:
df = df[df['charge_dt'] >= '2022-01-01']
optimize_df(df)

In [0]:
import re
feature_cols = [col for col in df.columns if col.startswith('C')]
target_col = 'unt_pre'

unique_feature_cols = list(set(feature_cols))
all_cols_for_corr = [target_col] + unique_feature_cols
corr_matrix = df[all_cols_for_corr].dropna().corr()
target_corrs = corr_matrix[[target_col]].drop(target_col)
target_corrs = target_corrs.reset_index().rename(columns={'index': 'feature', target_col: 'correlation'})

def parse_lags(feature_name):
    match = re.match(r'C(\d+)F(-?\d+)', feature_name)
    if match:
        c_lag = int(match.group(1))
        f_lag = int(match.group(2))
        return c_lag, f_lag
    return None, None

target_corrs[['charge_lag', 'flight_lag']] = target_corrs['feature'].apply(lambda x: pd.Series(parse_lags(x)))

target_corrs = target_corrs.drop_duplicates(subset=['feature'])
corr_pivot = target_corrs.pivot(index='charge_lag', columns='flight_lag', values='correlation')
corr_pivot = corr_pivot.sort_index().sort_index(axis=1)
plt.figure(figsize=(16, 12))
sns.heatmap(corr_pivot,annot=True,fmt=".2f",cmap='viridis',cbar=True,linewidths=.1)
plt.title('Correlation of Lag Features with Target ("unt_pre")')
plt.xlabel("Flight Date Lag (days)")
plt.ylabel("Charge Date Lag (days)")
plt.tight_layout()
plt.show()

In [0]:
# Seasonality Analysis

def create_features(df):
    df['flight_dt'] = pd.to_datetime(df['charge_dt'] + pd.to_timedelta(df['dtg'], unit='D'))
    df['charge_dt'] = pd.to_datetime(df['charge_dt'])
    df['flight_month'] = df['flight_dt'].dt.month.astype(int)
    df['flight_dow'] = df['flight_dt'].dt.dayofweek.astype(int)
    df['flight_dom'] = df['flight_dt'].dt.day.astype(int)
    df['flight_doy'] = df['flight_dt'].dt.dayofyear.astype(int)
    df['flight_year'] = df['flight_dt'].dt.year.astype(int)
    df['charge_month'] = df['charge_dt'].dt.month.astype(int)
    df['charge_dow'] = df['charge_dt'].dt.dayofweek.astype(int)
    df['charge_dom'] = df['charge_dt'].dt.day.astype(int)
    df['charge_doy'] = df['charge_dt'].dt.dayofyear.astype(int)
    df['charge_year'] = df['charge_dt'].dt.year.astype(int)
    df['day_number'] = (df['charge_dt'] - pd.to_datetime('2022-01-01')).dt.days.astype(int)

create_features(df)
df.tail()

In [0]:
optimize_df(df)
df.info()

In [0]:
# Flight Month vs Charge Month

df_melt = df.melt(id_vars='unt_pre', value_vars=['flight_month', 'charge_month'], var_name='month_type', value_name='month')
mean_sales = df_melt.groupby(['month', 'month_type'])['unt_pre'].sum().reset_index()

plt.figure(figsize=(20, 8))
sns.barplot(data=mean_sales, x='month', y='unt_pre', hue='month_type')
plt.style.use('ggplot')
plt.title('Historic Sales by Month since 2022')
plt.xlabel('Month')
plt.ylabel('Total Sales')
plt.show()

In [0]:
# Flight DoW vs Charge DoW

df_melt = df.melt(id_vars='unt_pre', value_vars=['flight_dow', 'charge_dow'], var_name='dow_type', value_name='dow')
mean_sales = df_melt.groupby(['dow', 'dow_type'])['unt_pre'].sum().reset_index()
plt.figure(figsize=(15, 8))
sns.barplot(data=mean_sales, x='dow', y='unt_pre', hue='dow_type')
plt.style.use('ggplot')
plt.title('Historic Sales by DoW since 2022')
plt.xlabel('DoW')
plt.ylabel('Total Sales')
display(plt.show())

In [0]:
# Weekly Seasonality

df[(df['charge_dt'] > '2024-01-01') & (df['charge_dt'] < '2024-01-31')].groupby('charge_dt')['unt_pre'].sum().plot(figsize=(20,5), title = 'sales by charge date (Jan24)', y='unt_pre', linewidth=10)
plt.style.use('ggplot')
plt.show()

In [0]:
# DoM Yield vs Sales

yield_vs_slaes = df[(df['charge_month'] != 12) & (df['charge_month'] != 3) & (df['charge_month'] != 4) & ((df['charge_month'] != 1))]

mean_sales = yield_vs_slaes.groupby('charge_dom')['unt_pre'].mean().reset_index()
mean_yield = yield_vs_slaes.groupby('charge_dom')['yield'].mean().reset_index()

fig, ax1 = plt.subplots(figsize=(15, 8))
plt.style.use('ggplot')

ax1.plot(mean_sales['charge_dom'], mean_sales['unt_pre'], marker='o', linewidth=12, color='tab:red', label='Mean Sales')
ax1.set_xlabel('Charge Day of Month')
ax1.set_ylabel('Mean Sales', color='tab:red')
ax1.tick_params(axis='y', labelcolor='tab:red')

ax2 = ax1.twinx()
ax2.plot(mean_yield['charge_dom'], mean_yield['yield'], marker='s', linewidth=12, color='tab:blue', label='Mean Yield')
ax2.set_ylabel('Mean Yield', color='tab:blue')
ax2.tick_params(axis='y', labelcolor='tab:blue')

fig.suptitle('Mean Sales and Yield by Charge Day of Month (2022 - ToDate)')
fig.legend(loc='upper right', bbox_to_anchor=(0.9, 0.9))
plt.tight_layout()
plt.show()

In [0]:
# DoM Price vs Sales

price_vs_slaes = df[(df['charge_month'] != 12) & (df['charge_month'] != 3) & (df['charge_month'] != 4) & ((df['charge_month'] != 1))]

mean_sales = price_vs_slaes.groupby('charge_dom')['unt_pre'].mean().reset_index()
mean_price = price_vs_slaes.groupby('charge_dom')['price'].mean().reset_index()

fig, ax1 = plt.subplots(figsize=(15, 8))
plt.style.use('ggplot')

ax1.plot(mean_sales['charge_dom'], mean_sales['unt_pre'], marker='o', linewidth=12, color='tab:red', label='Mean Sales')
ax1.set_xlabel('Charge Day of Month')
ax1.set_ylabel('Mean Sales', color='tab:red')
ax1.tick_params(axis='y', labelcolor='tab:red')

ax2 = ax1.twinx()
ax2.plot(mean_price['charge_dom'], mean_price['price'], marker='s', linewidth=12, color='tab:blue', label='Mean Price')
ax2.set_ylabel('Mean Price', color='tab:blue')
ax2.tick_params(axis='y', labelcolor='tab:blue')

fig.suptitle('Mean Sales and Price by Charge Day of Month (2022 - ToDate)')
fig.legend(loc='upper right', bbox_to_anchor=(0.9, 0.9))
plt.tight_layout()
plt.show()

In [0]:
# Holiday Features

uk_holidays = holidays.UK(years=range(2022, 2026))
holidays_df = pd.DataFrame([(date, name) for date, name in uk_holidays.items()], columns=['ds', 'holiday'])
holidays_df.sort_values(by='ds', inplace=True)
holidays_df

In [0]:
additional_holidays = pd.DataFrame([
    {'ds': '2022-04-18', 'holiday': 'Easter Monday'},
    {'ds': '2022-08-29', 'holiday': 'Summer Bank Holiday'},
    {'ds': '2023-04-10', 'holiday': 'Easter Monday'},
    {'ds': '2023-08-28', 'holiday': 'Summer Bank Holiday'},
    {'ds': '2024-04-01', 'holiday': 'Easter Monday'},
    {'ds': '2024-08-26', 'holiday': 'Summer Bank Holiday'},
    {'ds': '2025-04-21', 'holiday': 'Easter Monday'},
    {'ds': '2025-08-25', 'holiday': 'Summer Bank Holiday'},
])
holidays_df = pd.concat([holidays_df, additional_holidays], ignore_index=True)
holidays_df.drop_duplicates(inplace=True)
holidays_df['ds'] = pd.to_datetime(holidays_df['ds'])
holidays_df.sort_values(by='ds', inplace=True)
holidays_df.reset_index(drop=True, inplace=True)
holidays_df
   

In [0]:
holidays_df["ds"] = pd.to_datetime(holidays_df["ds"])
df = df.merge(holidays_df.rename(columns={'ds': 'charge_dt', 'holiday': 'charge_dt_holiday'}), how='left', on='charge_dt')
df = df.merge(holidays_df.rename(columns={'ds': 'flight_dt', 'holiday': 'flight_dt_holiday'}), how='left', on='flight_dt')
df['is_charge_date_holiday'] = df['charge_dt_holiday'].notnull().astype(int)
df['is_flight_date_holiday'] = df['flight_dt_holiday'].notnull().astype(int)
df.drop(['charge_dt_holiday', 'flight_dt_holiday'], axis=1, inplace=True)
df = df.sort_values(by=['charge_dt', 'dtg'], ascending=[True, True])
df.set_index('charge_dt', inplace=True)
df.head()

In [0]:
optimize_df(df)

In [0]:
index_cols = ['charge_dt', 'flight_dt']
cyclic_cols=['flight_dom', 'flight_doy', 'charge_dom', 'charge_doy', 'flight_month', 'charge_month', 'flight_dow', 'charge_dow']
normal_num_cols=['ty_capacity', 'C7F-28', 'C7F-21', 'C7F-14', 'C7F-7', 'C7F0', 'C7F7', 'C7F14', 'C7F21', 'C7F28', 'C14F-28', 'C14F-21', 'C14F-14', 'C14F-7', 'C14F0', 'C14F7', 'C14F14', 'C14F21', 'C14F28', 'C21F-28', 'C21F-21', 'C21F-14', 'C21F-7', 'C21F0', 'C21F7', 'C21F14', 'C21F21', 'C21F28', 'C28F-28', 'C28F-21', 'C28F-14', 'C28F-7', 'C28F0', 'C28F7', 'C28F14', 'C28F21', 'C28F28', 'dtg']
standard_num_cols=['charge_year','flight_year', 'day_number']
df.drop(['load_factor', 'yield', 'price'], axis=1, inplace=True)
df = df.reset_index()
original_df = df.copy()

def encode_cyclic_features(df, cols):
    for col in cols:
        df[col] = pd.to_numeric(df[col], errors='coerce')
        max_val = df[col].max()
        df[col + '_sin'] = (np.sin(2 * np.pi * df[col] / max_val)).astype('float16')
        df[col + '_cos'] = (np.cos(2 * np.pi * df[col] / max_val)).astype('float16')
        df.drop(col, axis=1, inplace=True)
    return df

def scale_normal_cols(df, cols):
    for col in cols:
        scaler = StandardScaler()
        df[col] = scaler.fit_transform(df[[col]])
    return df

def scale_standard_cols(df, cols):
    for col in cols:
        scaler = MinMaxScaler()
        df[col] = scaler.fit_transform(df[[col]])
    return df

df = encode_cyclic_features(df, cyclic_cols)
df = scale_normal_cols(df, normal_num_cols)
df = scale_standard_cols(df, standard_num_cols)
#df = pd.get_dummies(df, columns=['region', 'routetype'])

for col in index_cols:
    df[col] = original_df[col]
df.set_index(index_cols, inplace=True)

df = optimize_df(df)
df.head()

In [0]:
#capacity proportion look-up table for disaggregation 
'''
df_cap = df.reset_index()
total_daily_capacity = df_cap.groupby(['charge_dt','dtg'])['ty_capacity'].sum().reset_index().rename(columns={'ty_capacity':'capacity_total'})
route_daily_capacity = df_cap[['charge_dt','dtg','route','ty_capacity']]
capacity_proportions = route_daily_capacity.merge(total_daily_capacity, on=['charge_dt','dtg'], how='left')
capacity_proportions['capacity_proportion'] = capacity_proportions['ty_capacity'] / capacity_proportions['capacity_total']
capacity_proportions = capacity_proportions[['dtg','charge_dt','route','capacity_proportion']]
optimize_df(capacity_proportions)'''

In [0]:
# SARIMA

df_SARIMA = df.copy()
df_SARIMA = df_SARIMA.reset_index()
df_SARIMA = df_SARIMA.groupby('charge_dt').agg(unt_pre=('unt_pre', 'sum'))
auto_model = pm.auto_arima(df_SARIMA['unt_pre'], start_p=1, start_q=1, test='adf', max_p=3, max_q=3, m=7, d=0, seasonal=True, start_P=0, D=None, trace=True, error_action='ignore', suppress_warnings=True, stepwise=True)
print(auto_model.summary())

In [0]:
optimize_df(df_SARIMA)

In [0]:
# Sarima Rolling Forecast

split_date = pd.to_datetime(spark.sql('SELECT current_date()').collect()[0][0]) - pd.DateOffset(days=168)
df_SARIMA_train = df_SARIMA.loc[df_SARIMA.index < split_date]
df_SARIMA_test = df_SARIMA.loc[df_SARIMA.index >= split_date]

history = list(df_SARIMA_train['unt_pre'])
all_predictions = []
forecast_horizon = 7

for i in range(0, len(df_SARIMA_test), 7):
    
    SARIMA_model = pm.ARIMA(order=(2,0,2), seasonal_order=(1,0,2,7))
    SARIMA_model.fit(history)
    
    next_forecast = SARIMA_model.predict(n_periods=forecast_horizon)[:7]
    all_predictions.extend(next_forecast)
    
    actuals_for_period = df_SARIMA_test['unt_pre'][i : i + 7]
    history.extend(actuals_for_period)


final_predictions = all_predictions[:len(df_SARIMA_test)]

rmse_SARIMA = np.sqrt(mean_squared_error(df_SARIMA_test['unt_pre'], final_predictions))
mae_SARIMA = mean_absolute_error(df_SARIMA_test['unt_pre'], final_predictions)

print(f"RMSE: {rmse_SARIMA:.2f}")
print(f"MAE: {mae_SARIMA:.2f}")

# Plot the results
plt.figure(figsize=(14, 7))
plt.plot(df_SARIMA_test.index, df_SARIMA_test['unt_pre'], color='green', label='Actual Test Data')
plt.plot(df_SARIMA_test.index, final_predictions, color='red', linestyle='--', label='Rolling Forecast')
plt.title('SARIMA Rolling Forecast Backtest')
plt.legend()
plt.grid(True)
plt.show()

In [0]:
# SARIMAX

df_SARIMAX = df.copy()
df_SARIMAX = df_SARIMAX.reset_index()
df_SARIMAX = df_SARIMAX.groupby('charge_dt')[['unt_pre', 'ty_capacity', 'unt_pre_lag7', 'unt_pre_lag14', 'unt_pre_lag21', 'unt_pre_lag28']].sum()
df_SARIMAX = df_SARIMAX.set_index('charge_dt')

y_SARIMAX = df_SARIMAX[['unt_pre']]
X_SARIMAX = df_SARIMAX.drop('unt_pre', axis=1)

split_date = pd.to_datetime(spark.sql('SELECT current_date()').collect()[0][0]) - pd.DateOffset(days=168)
X_SARIMAX_train = X_SARIMAX.loc[X_SARIMAX.index < split_date]
X_SARIMAX_test = X_SARIMAX.loc[X_SARIMAX.index >= split_date]
y_SARIMAX_train = y_SARIMAX.loc[y_SARIMAX.index < split_date]
y_SARIMAX_test = y_SARIMAX.loc[y_SARIMAX.index >= split_date]
SARIMAX_model = pm.auto_arima(y_SARIMAX_train, X=X_SARIMAX_train, seasonal=True,trace=True, start_p=1, start_q=1, test='adf', max_p=3, max_q=3, m=7, d=0, start_P=0, D=None, error_action='ignore', suppress_warnings=True, stepwise=True)
predictions = SARIMAX_model.predict(n_periods=len(y_SARIMAX_test), X=X_SARIMAX_test)

print(SARIMAX_model.summary())

In [0]:
# SARIMAX Rolling Forecast

history_y = list(y_SARIMAX_train['unt_pre'])
history_X = X_SARIMAX_train.copy()
all_predictions = []
forecast_horizon = 7

for i in range(0, len(y_SARIMAX_test), 7):

    X_future = X_SARIMAX_test[i : i + forecast_horizon]
    
    SARIMAX_model = pm.ARIMA(order=(0,0,1), seasonal_order=(0,0,0,7))
    SARIMAX_model.fit(history_y, X=history_X)
    
    next_forecast = SARIMAX_model.predict(n_periods=len(X_future), X=X_future)[:7]
    all_predictions.extend(next_forecast)
    
    actuals_y_for_period = y_SARIMAX_test['unt_pre'][i : i + 7]
    actuals_X_for_period = X_SARIMAX_test[i : i + 7]

    history_y.extend(actuals_y_for_period)
    history_X = pd.concat([history_X, actuals_X_for_period])


final_predictions = all_predictions[:len(y_SARIMAX_test)]

rmse_SARIMAX = np.sqrt(mean_squared_error(y_SARIMAX_test['unt_pre'], final_predictions))
mae_SARIMAX = mean_absolute_error(y_SARIMAX_test['unt_pre'], final_predictions)

print(f"RMSE: {rmse_SARIMAX:.2f}")
print(f"MAE: {mae_SARIMAX:.2f}")

# Plot the results
plt.figure(figsize=(14, 7))
plt.plot(y_SARIMAX_test.index, y_SARIMAX_test['unt_pre'], color='green', label='Actual Test Data')
plt.plot(y_SARIMAX_test.index, final_predictions, color='red', linestyle='--', label='Rolling Forecast')
plt.title('SARIMAX Rolling Forecast Backtest')
plt.legend()
plt.grid(True)
plt.show()

In [0]:
#LR Train-Test Split

split_date = pd.to_datetime(spark.sql("SELECT current_date()").collect()[0][0]) - pd.DateOffset(days=168)
df_LR_train = df[df.index.get_level_values('charge_dt') < split_date]
df_LR_test = df[df.index.get_level_values('charge_dt') >= split_date]

df_LR_train_plot = df_LR_train.groupby('charge_dt')['unt_pre'].sum().reset_index()
df_LR_test_plot = df_LR_test.groupby('charge_dt')['unt_pre'].sum().reset_index()

plt.figure(figsize=(18, 6))
plt.plot(df_LR_train_plot['charge_dt'], df_LR_train_plot['unt_pre'], label='Train', color='blue')
plt.plot(df_LR_test_plot['charge_dt'], df_LR_test_plot['unt_pre'], label='Validation', color='red')
plt.axvline(pd.to_datetime(split_date), linestyle='--', color='black', label='Split Date')
plt.legend()
plt.title('Sales Before and After Split Date')
plt.xlabel('Charge Date')
plt.ylabel('Sales')
plt.show()

In [0]:
#LR Fit

X_train = df_LR_train.drop(['unt_pre'], axis=1)
X_test = df_LR_test.drop(['unt_pre'], axis=1)
y_train = df_LR_train['unt_pre']
y_test = df_LR_test['unt_pre']

LR_model = LinearRegression()
LR_model.fit(X_train, y_train)
y_pred_LR = LR_model.predict(X_test)

rmse_LR = np.sqrt(mean_squared_error(y_test, y_pred_LR))
mae_LR = mean_absolute_error(y_test, y_pred_LR)
print(f'RMSE: {rmse_LR:.2f}')
print(f'MAE: {mae_LR:.2f}')

In [0]:
# Create a new DataFrame for easy plotting
results_df = pd.DataFrame({'actual': y_test, 'predicted': y_pred_LR, 'dtg': X_test['dtg']})
results_df['charge_dt'] = results_df.index.get_level_values('charge_dt')
aggregated_results = results_df.groupby(level='charge_dt')[['actual', 'predicted']].sum()
plt.figure(figsize=(15, 7))
plt.plot(aggregated_results.index, aggregated_results['actual'], label='Actual Values', marker='o', linestyle='-')
plt.plot(aggregated_results.index, aggregated_results['predicted'], label='Predicted Values', marker='x', linestyle='--')
plt.title('Actual vs. Predicted Values Aggregated by Charge Date', fontsize=16)
plt.xlabel('Charge Date', fontsize=12)
plt.ylabel('Average unt_pre', fontsize=12)
plt.xticks(rotation=45)
plt.legend()
plt.tight_layout() 
plt.show()

In [0]:
# LR Results (dynamic)
results_df.drop(['charge_dt'], axis=1, inplace=True)
results_df.reset_index(inplace=True)
LR_agg_dynamic = results_df.groupby(['charge_dt', 'dtg'])[['actual', 'predicted']].sum().reset_index()
LR_agg_dynamic['charge_dt'] = LR_agg_dynamic['charge_dt'].astype(str)

actual = LR_agg_dynamic[['charge_dt','dtg','actual']].copy()
actual['sales_type'] = 'actual'
actual['sales_value'] = actual['actual']

pred = LR_agg_dynamic[['charge_dt','dtg','predicted']].copy()
pred['sales_type'] = 'predicted'
pred['sales_value'] = pred['predicted']

melted = pd.concat([actual, pred]).sort_values(['charge_dt','dtg','sales_type'])

fig = px.line(melted, x='dtg', y='sales_value', animation_frame='charge_dt', title='Sales by DTG with charge date variations')

fig = px.line(melted, x='dtg', y='sales_value', color='sales_type', animation_frame='charge_dt', title='Actual vs Predicted by DTG across Charge Dates', color_discrete_map={'unt_net':'red', 'prediction':'blue'})

#fig.update_xaxes(range=[0, 200])
#fig.update_yaxes(range=[0, 800])
fig.update_layout(xaxis_title='Days To Go', yaxis_title='Sales', legend_title='Charge Date', height=900, width=1400)

fig.show()

In [0]:
from sklearn.linear_model import Lasso
split_date = pd.to_datetime(spark.sql("SELECT current_date()").collect()[0][0]) - pd.DateOffset(days=168)
df_lasso_train = df[df.index.get_level_values('charge_dt') < split_date]
df_lasso_test = df[df.index.get_level_values('charge_dt') >= split_date]
X_train = df_lasso_train.drop(['unt_pre'], axis=1)
X_test = df_lasso_test.drop(['unt_pre'], axis=1)
y_train = df_lasso_train['unt_pre']
y_test = df_lasso_test['unt_pre']

lasso_model = Lasso()
lasso_model.fit(X_train, y_train)
y_pred_lasso = lasso_model.predict(X_test)

rmse_lasso = np.sqrt(mean_squared_error(y_test, y_pred_lasso))
mae_lasso = mean_absolute_error(y_test, y_pred_lasso)
print(f'RMSE: {rmse_lasso:.2f}')
print(f'MAE: {mae_lasso:.2f}')

In [0]:
#RidgeRegression

from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer
from sklearn.linear_model import Ridge

param_grid = {'alpha': [0.01, 0.1, 1, 10]}
rmse_scorer = make_scorer(mean_squared_error, squared=False)

outer_cv = TimeSeriesSplit(n_splits=5)
inner_cv = TimeSeriesSplit(n_splits=3)

outer_scores = []

for train_idx, test_idx in outer_cv.split(df):
    X_train_outer = df.iloc[train_idx].drop('unt_net', axis=1).copy()
    y_train_outer = df.iloc[train_idx]['unt_net'].copy()
    X_test_outer = df.iloc[test_idx].drop('unt_net', axis=1).copy()
    y_test_outer = df.iloc[test_idx]['unt_net'].copy()

    model = Ridge()
    grid_search = GridSearchCV(estimator=model, param_grid=param_grid, cv=inner_cv, n_jobs=-1, verbose=1, scoring=rmse_scorer)

    grid_search.fit(X_train_processed, y_train_outer, eval_set=[(X_test_processed, y_test_outer)])

    best_model = grid_search.best_estimator_
    test_score = best_model.score(X_test_processed, y_test_outer)
    outer_scores.append(test_score)

    print(f'Outer fold score: {test_score}, best params: {grid_search.best_params_}')

In [0]:
# Prophet
df_prophet = df.copy()
df_prophet = df_prophet.reset_index()
df_prophet = df.groupby('charge_dt')[['unt_pre', 'ty_capacity', 'unt_pre_lag7', 'unt_pre_lag14', 'unt_pre_lag21', 'unt_pre_lag28']].sum().reset_index()

train_prophet = df_prophet.loc[df_prophet['charge_dt'] < split_date]
val_prophet = df_prophet.loc[df_prophet['charge_dt'] >= split_date]
train_prophet.rename(columns={'charge_dt': 'ds', 'unt_pre': 'y'}, inplace=True)
val_prophet.rename(columns={'charge_dt': 'ds'}, inplace=True)
train_prophet.sort_values('ds', inplace=True)
val_prophet.sort_values('ds', inplace=True)

prophet_model = Prophet(holidays=holidays_df)
regressors = ['ty_capacity', 'unt_pre_lag7', 'unt_pre_lag14', 'unt_pre_lag21', 'unt_pre_lag28']
for reg in regressors:
    prophet_model.add_regressor(reg)
prophet_model.fit(train_prophet)
prophet_forecast = prophet_model.predict(val_prophet)

In [0]:
# Prophet Results

plot_prophet = pd.concat([train_prophet, val_prophet])
plot_prophet = pd.merge(prophet_forecast, plot_prophet, on='ds', how='outer').sort_values('ds')
plot_prophet = plot_prophet[plot_prophet['ds'] >= '2025-01-01']

#plt.style.use('seaborn-v0_8-darkgrid')
fig, ax = plt.subplots(figsize=(18, 6))

plot_prophet_val = plot_prophet[plot_prophet['ds'] >= split_date]
plot_prophet_train = plot_prophet[plot_prophet['ds'] < split_date]

ax.plot(plot_prophet_val['ds'], plot_prophet_val['unt_pre'], 'k.', alpha=0.6, label='Actual')
ax.plot(plot_prophet_train['ds'], plot_prophet_train['y'], 'red', linewidth=1.5, label='Train') 

ax.plot(plot_prophet['ds'], plot_prophet['yhat'], color='blue', label='Forecast')
ax.fill_between(plot_prophet['ds'], plot_prophet['yhat_lower'], plot_prophet['yhat_upper'], color='grey', alpha=0.3, label='Confidence Interval')

ax.axvline(x=split_date, color='black', linestyle='--', lw=2, label='Forecast Start')
ax.set_title('Forecasted Demand with Prophet', fontsize=18, fontweight='bold')
ax.set_xlabel('Date', fontsize=14)
ax.set_ylabel('Sales', fontsize=14)
ax.legend(loc='upper left')
plt.tight_layout()
plt.show()

In [0]:
# Prophet Results pt. 2

plot_prophet = plot_prophet[plot_prophet['ds'] >= split_date]
plt.figure(figsize=(12, 6))
plt.plot(plot_prophet['ds'], plot_prophet['unt_pre'], label='Actual', marker='o')
plt.plot(plot_prophet['ds'], plot_prophet['yhat'], label='Forecast (Prophet)', marker='x')
plt.xlabel('Charge Date')
plt.ylabel('Total Sales Across All DTGs')
plt.title('Prophet Forecast vs Actual – Aggregated Over DTG')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

mse_prophet = np.sqrt(mean_squared_error(plot_prophet['unt_pre'], plot_prophet['yhat']))
print(f'MSE: {mse_prophet:.2f}')

In [0]:
# XGBoost Grid Search

'''

from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer

param_grid = {
    'learning_rate': [0.005, 0.01, 0.1],
    #'subsample': [0.8, 1.0],
    #'colsample_bytree': [0.8, 1.0],
    #'lambda': [0.1, 1.0],
    #'alpha': [0, 0.1],
}

rmse_scorer = make_scorer(mean_squared_error, squared=False)

outer_cv = TimeSeriesSplit(n_splits=5)
inner_cv = TimeSeriesSplit(n_splits=3)

outer_scores = []

for train_idx, test_idx in outer_cv.split(df):
    X_train_outer = df.iloc[train_idx].drop('unt_net', axis=1).copy()
    y_train_outer = df.iloc[train_idx]['unt_net'].copy()
    X_test_outer = df.iloc[test_idx].drop('unt_net', axis=1).copy()
    y_test_outer = df.iloc[test_idx]['unt_net'].copy()

    X_train_processed = preprocessor.fit_preprocess(X_train_outer)
    X_test_processed = preprocessor.transform_preprocess(X_test_outer)

    model = XGBRegressor(objective='reg:pseudohubererror', base_score=0.5, boosting='gbtree', early_stopping_rounds=50, max_depth=3, n_estimators=500, learning_rate=0.01)
    grid_search = GridSearchCV(estimator=model, param_grid=param_grid, cv=inner_cv, n_jobs=-1, verbose=1, scoring=rmse_scorer)

    grid_search.fit(X_train_processed, y_train_outer, eval_set=[(X_test_processed, y_test_outer)])

    best_model = grid_search.best_estimator_
    test_score = best_model.score(X_test_processed, y_test_outer)
    outer_scores.append(test_score)

    print(f'Outer fold score: {test_score}, best params: {grid_search.best_params_}')'''

In [0]:
df.head()

In [0]:
# XGBoost CV Fit

unique_dates = df.index.get_level_values('charge_dt').unique().sort_values()
tscv = TimeSeriesSplit(n_splits=5, test_size=168)

fig, axs = plt.subplots(5, 1, figsize=(20, 10), sharex=True)
plt.style.use('ggplot')
fold = 0
preds = []
scores = []

# Nested cross-validation
for train_index, val_index in tscv.split(unique_dates):
    train_dates = unique_dates[train_index]
    val_dates = unique_dates[val_index]

    train_mask = df.index.get_level_values('charge_dt').isin(train_dates)
    val_mask = df.index.get_level_values('charge_dt').isin(val_dates)

    train_data = df[train_mask]
    val_data = df[val_mask]

    total_sales_train = train_data.groupby('charge_dt')['unt_pre'].sum().reset_index()
    total_sales_val = val_data.groupby('charge_dt')['unt_pre'].sum().reset_index()
    total_sales_train.plot(ax=axs[fold], label='Train', x='charge_dt', y='unt_pre', style='-')
    total_sales_val.plot(ax=axs[fold], label='Val', x='charge_dt', y='unt_pre', style='-')
    axs[fold].axvline(val_data.index.get_level_values('charge_dt').min(), linestyle='--', color='black')
    axs[fold].set_title(f'Fold {fold+1}')
    fold += 1

    X_train = train_data.drop('unt_pre', axis=1)
    y_train = train_data['unt_pre']

    X_test = val_data.drop('unt_pre', axis=1)
    y_test = val_data['unt_pre']

    XGB_model = xgb.XGBRegressor(base_score=0.5,booster='gbtree',n_estimators=1500,early_stopping_rounds=50,max_depth=3,learning_rate=0.01,objective='reg:pseudohubererror',enable_categorical=True)
    XGB_model.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=100)

    y_pred = XGB_model.predict(X_test)
    preds.extend(y_pred)
    score = np.sqrt(mean_squared_error(y_test, y_pred))
    scores.append(score)

In [0]:
# XGBoost Results

print(f'individual scores: {scores}')
print(f'combined score: {np.mean(scores)}')
print(f'std: {np.std(scores)}')      

In [0]:
y_pred_series = pd.Series(y_pred, index=X_test.index)

y_test_total = y_test.groupby(level='charge_dt').sum()
y_pred_total = y_pred_series.groupby(level='charge_dt').sum()

y_test_total, y_pred_total = y_test_total.align(y_pred_total, join='inner')

rmse_xgb = np.sqrt(mean_squared_error(y_test_total, y_pred_total))
print(f'RMSE on Daily Totals: {rmse_xgb:.2f}')

plt.figure(figsize=(15, 6))
plt.plot(y_test_total.index, y_test_total, label='Actual Daily Total')
plt.plot(y_pred_total.index, y_pred_total, label='Predicted Daily Total', linestyle='--')
plt.title('Actual vs. Predicted Daily Totals')
plt.legend()
plt.show()

In [0]:
df_results = pd.DataFrame({'actual': y_test, 'predicted': y_pred_series})
df_results = df_results.reset_index()
df_agg = df_results.groupby(['charge_dt', 'dtg'])[['actual', 'predicted']].sum().reset_index()
df_melted = df_agg.melt(id_vars=['charge_dt', 'dtg'], value_vars=['actual', 'predicted'], var_name='sales_type', value_name='sales_value')
df_melted = df_melted.sort_values(['charge_dt', 'dtg'])
df_melted['charge_dt'] = df_melted['charge_dt'].dt.strftime('%Y-%m-%d')

fig = px.line(df_melted, x='dtg', y='sales_value', color='sales_type', animation_frame='charge_dt', title='Actual vs Predicted Booking Curve', color_discrete_map={'actual': 'red', 'predicted': 'blue'})
fig.update_layout(legend_title='type', height=800, width=1600)
fig.show()