In [0]:
%pip install linearmodels

In [0]:
dbutils.library.restartPython()

In [0]:
# Import Libraries
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import weekofyear
from pyspark.sql.window import Window
from datetime import datetime

# Import Data

sales_history = spark.read.table('data_experience_commercial.cbt_1423_rtsuite.master_uat').select('flightkey', F.col('charge_dt').cast('date'), 'unt_net', 'chargeproduct', 'dtg')
dimensions_history = spark.read.table('data_experience_commercial.cbt_0923_segmentfinder.dimensions_history').select('flightkey', 'route', 'onsale_dt', 'ty_capacity', F.col('flight_dt').cast('date'))

# Filter Data

filtered_sales = sales_history.filter((F.col('chargeproduct')=='Ticket') & (F.col('dtg')>=0)) # ticket only + eliminate covid, ss, and not yet flown
filtered_dimensions = dimensions_history.filter((F.datediff(F.col('flight_dt'), F.col('onsale_dt')) >= 168) & (F.col('flight_dt') >= '2024-07-01') & (F.col('flight_dt') <= '2024-09-30')) # eliminate late top-ups, filter for LY flight dates 

# Preprocessing - filling missing charge dates

dsh = filtered_sales.join(filtered_dimensions, on='flightkey', how='inner') # join tables
dshsmooth = dsh.groupby('flightkey','charge_dt').agg(F.sum('unt_net').alias('unt_net'), F.first('onsale_dt').alias('onsale_dt'), F.first('flight_dt').alias('flight_dt')) # aggregate into daily flight sales
date_range = dshsmooth.groupBy('flightkey').agg(F.min('onsale_dt').alias('start_date'), F.least(F.first('flight_dt'), F.lit(datetime.now().date())).alias('end_date')) # define flight onsale period
index = date_range.withColumn('charge_dt_ts', F.explode(F.sequence(F.col('start_date'), F.col('end_date')))).withColumn('charge_dt', F.col('charge_dt_ts').cast('date')) # create index of dates between onsale and flight date
dshjoin = index.join(dshsmooth, on=['flightkey', 'charge_dt'], how='left').drop('start_date', 'end_date','onsale_dt','flight_dt','charge_dt_ts').fillna(0) # join index with daily sales
window_spec = Window.partitionBy('flightkey').orderBy(F.col('charge_dt')) # create window for rolling pax sum
dsh_pax = dshjoin.withColumn('pax_net', F.sum('unt_net').over(window_spec)) # calculate current pax sum - currently neglects cancellations

# Preprocessing - creating LF by dtg progress remaining curves for each sector week

final_dsh = dsh_pax.join(filtered_dimensions, on='flightkey', how='left').drop('unt_net') # join daily sales with dimensions
curves = final_dsh.withColumn('total_booking_days', F.datediff(F.col('flight_dt'), F.col('onsale_dt'))) # calculate on sale period length in days
curves = curves.withColumn('dtg', F.datediff(F.col('flight_dt'), F.col('charge_dt'))) # calculate dtg
normal_curves = curves.withColumn('dtg_pr', F.col('dtg') / F.col('total_booking_days')) # express dtg progress remaining as dtg as a fraction of total booking days
normal_curve_buckets = normal_curves.withColumn('dtg_bucket', (F.floor(F.col('dtg_pr') * 100)).cast('int'))  # split dtg_pr into percentile buckets
aggregated_normal_curves = normal_curve_buckets.groupby('route', 'dtg_bucket').agg(F.sum('ty_capacity').alias('ty_capacity'), F.sum('pax_net').alias('pax_net')).orderBy('dtg_bucket') # aggregate by sector week and dtg bucket
df = aggregated_normal_curves.withColumn('load_factor', F.col('pax_net')/F.col('ty_capacity')).drop('ty_capacity', 'pax_net').toPandas() # create pandas dataframe of LF by dtg progress remaining by sector week

# storing original dataframe

df.info()
df_original = df.copy()
df = df_original.copy()

In [0]:
df_pivot = df.pivot_table(index='route', columns='dtg_bucket', values='load_factor', aggfunc='mean').fillna(0)
df_cluster = df_pivot[df_pivot[1] > 0]
df_cluster.head()

In [0]:
# Removing all routes that didnt have final LF > 60%

df_sold = df_cluster[df_cluster[1] > 0.6]
df_sold.info()

In [0]:
# Expressing Load Factor as a function of final LF
df_sold = df_sold.div(df_sold[1], axis=0)

In [0]:
# initial LF set to 0

row_min = df_sold.min(axis=1)
row_max = df_sold.max(axis=1)

df_sold = df_sold.subtract(row_min, axis=0).divide(row_max - row_min, axis=0)
df_sold.head()

In [0]:
import mlflow
mlflow.autolog(disable=True)

from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
k_range = range(2, 11)
wcss = []
for i in k_range: 
    kmeans = KMeans(n_clusters = i, init = 'k-means++', n_init=10)
    kmeans.fit(df_sold)
    wcss.append(kmeans.inertia_)

plt.figure(figsize=(10, 6))
plt.plot(k_range, wcss, marker='o', linestyle='--')
plt.title('Elbow Method')
plt.xlabel('Number of Clusters (k)')
plt.ylabel('WCSS (Inertia)')
plt.xticks(k_range) 
plt.grid(True)
plt.show()

In [0]:
df_sold_original = df_sold.copy()

In [0]:
df_sold = df_sold_original.copy()

In [0]:
kmeans = KMeans(n_clusters = 5, init = 'k-means++', n_init=10)
kmeans.fit(df_sold)
df_sold['cluster_label'] = kmeans.labels_   
curves_clusters = df_sold.groupby('cluster_label').mean()

curves_clusters.T.plot(figsize=(20, 8))

plt.xlabel('% DTG remaining')
plt.ylabel('LF / LF_final')
plt.title('Isolated booking curve shape clustering "Banana split"')
plt.grid(True, linestyle='--', alpha=0.6)
plt.legend(title=' # Cluster')
plt.show()

In [0]:
col40 = df_sold.groupby('cluster_label')[40].mean()
sorted_clusters = col40.sort_values()

auto_mapping = {}
for new_rank, old_label in enumerate(sorted_clusters.index, start=1):
    auto_mapping[old_label] = new_rank

df_sold['new_cluster_label'] = df_sold['cluster_label'].map(auto_mapping)

new_curves_clusters = df_sold.drop(columns=['cluster_label']).groupby('new_cluster_label').mean()

new_curves_clusters.T.plot(figsize=(20, 8))

plt.xlabel('% DTG remaining')
plt.ylabel('LF / LF_final')
plt.title('Isolated booking curve shape clustering "Banana split"')
plt.grid(True, linestyle='--', alpha=0.6)
plt.legend(title=' # Cluster')
plt.show()

In [0]:
df_sold['cluster_label'].value_counts()

In [0]:
FY25Q3 = pd.read_csv('/Workspace/Users/barney.hodge@easyjet.com/FY25Q3.csv')
FY25Q3.sort_values(by='flight_time', inplace=True)
FY25Q3.head()

In [0]:
FY25Q4 = pd.read_csv('/Workspace/Users/barney.hodge@easyjet.com/FY25Q4.csv')
FY25Q4.sort_values(by='flight_time', inplace=True)
FY25Q4.head()

In [0]:
df_internal = pd.concat([FY25Q3, FY25Q4], ignore_index=True)
df_internal.info()

In [0]:
cluster_df = pd.merge(df_internal, df_sold['new_cluster_label'], on='route', how='left')
cluster_df.head()

In [0]:
df_external = pd.read_csv('/Workspace/Users/barney.hodge@easyjet.com/wealth statistics.csv')
df_external.head()


In [0]:

df_initial = pd.merge(cluster_df, df_external[['Airport','National (Nominal) GDP Per Capita ($) (worldbank 2024)', 'National (PPP) GDP Per Capita ($) (worldbank 2024)', 'Local GDP Per Capita ($)']], left_on='base', right_on='Airport', how='left').rename(columns={'National (Nominal) GDP Per Capita ($) (worldbank 2024)': 'Ngdppc_base', 'National (PPP) GDP Per Capita ($) (worldbank 2024)': 'Pgdppc_base', 'Local GDP Per Capita ($)': 'Lgdppc_base'})
df = pd.merge(df_initial, df_external[['Airport','National (Nominal) GDP Per Capita ($) (worldbank 2024)', 'National (PPP) GDP Per Capita ($) (worldbank 2024)', 'Local GDP Per Capita ($)']], left_on='dest', right_on='Airport', how='left').rename(columns={'National (Nominal) GDP Per Capita ($) (worldbank 2024)': 'Ngdppc_dest', 'National (PPP) GDP Per Capita ($) (worldbank 2024)': 'Pgdppc_dest', 'Local GDP Per Capita ($)': 'Lgdppc_dest'})

df['local_wealth'] = ((df['prop_from_base'] * df['Lgdppc_base']) + (df['prop_from_dest'] * df['Lgdppc_dest']))
df['national_wealth_nominal'] = ((df['prop_from_base'] * df['Ngdppc_base']) + (df['prop_from_dest'] * df['Ngdppc_dest']))
df['national_wealth_ppp'] = ((df['prop_from_base'] * df['Pgdppc_base']) + (df['prop_from_dest'] * df['Pgdppc_dest']))
df = df.drop(['base', 'dest', 'prop_from_base', 'prop_from_dest', 'Airport_x','Airport_y','Ngdppc_base', 'Ngdppc_dest', 'Pgdppc_base', 'Pgdppc_dest', 'Lgdppc_base', 'Lgdppc_dest'], axis=1)
df.head()

In [0]:
df.info()

In [0]:
df.dropna(inplace=True)
df = df[df['cumulative_sales'] >= 0]
df.info()

In [0]:
import holidays
uk_holidays = holidays.UK(years=range(2024, 2026))
holidays_df = pd.DataFrame([(date, name) for date, name in uk_holidays.items()], columns=['ds', 'holiday'])
holidays_df.sort_values(by='ds', inplace=True)
holidays_df

In [0]:
additional_holidays = pd.DataFrame([
    {'ds': '2024-04-01', 'holiday': 'Easter Monday'},
    {'ds': '2024-08-26', 'holiday': 'Summer Bank Holiday'},
    {'ds': '2025-04-21', 'holiday': 'Easter Monday'},
    {'ds': '2025-08-25', 'holiday': 'Summer Bank Holiday'},
])
holidays_df = pd.concat([holidays_df, additional_holidays], ignore_index=True)
holidays_df.drop_duplicates(inplace=True)
holidays_df['ds'] = pd.to_datetime(holidays_df['ds'])
holidays_df.sort_values(by='ds', inplace=True)
holidays_df.reset_index(drop=True, inplace=True)
holidays_df

In [0]:
df["charge_dt"] = pd.to_datetime(df["charge_dt"])
df["flight_dt"] = pd.to_datetime(df["flight_dt"])
holidays_df["ds"] = pd.to_datetime(holidays_df["ds"])

df = df.merge(holidays_df.rename(columns={'ds': 'charge_dt', 'holiday': 'charge_dt_holiday'}), how='left', on='charge_dt')
df = df.merge(holidays_df.rename(columns={'ds': 'flight_dt', 'holiday': 'flight_dt_holiday'}), how='left', on='flight_dt')
df['is_charge_date_holiday'] = df['charge_dt_holiday'].notnull().astype(int)
df['is_flight_date_holiday'] = df['flight_dt_holiday'].notnull().astype(int)
df.drop(['charge_dt_holiday', 'flight_dt_holiday'], axis=1, inplace=True)
df.head()

In [0]:
import numpy as np

def profile(x100, x60, x20, x00, dtg, capacity):

    x1 = (x60 - x100)/(x00 - x100)
    x2 = (x20 - x100)/(x00 - x100)
    b = np.log(np.log((1-(x1**4))/0.6)/np.log((1-(x2**4))/0.2))/np.log(x1/x2) 
    a = np.log((1-(x1**4))/0.6)/(x1**b) 

    x_norm = (dtg - x100)/(x00 - x100)
    y_val = (np.exp(-a * (x_norm**b)))*(1-(x_norm**4))

    return y_val*capacity

In [0]:
df['profile_tgt'] = profile(df['x100'], df['x60'], df['x20'], df['x00'], df['dtg'], df['ty_capacity'])

In [0]:
df['distance_to_profile'] = df['cumulative_sales'] - df['profile_tgt'] 

In [0]:
df['profile_distance_bucket'] = pd.cut(df['distance_to_profile'], bins=[-np.inf, -50, -40, -30, -20, -10, 0, 10, 20, 30, 40, 50, np.inf], labels=['-50+', '-40 to -50', '-30 to -40', '-20 to -30', '-10 to -20', '-10 to 0', '0 to 10', '10 to 20', '20 to 30', '30 to 40', '40 to 50', '50+'])
df.head()

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
plt.figure(figsize=(12,6))
sns.boxplot(data=df, x='profile_distance_bucket', y='treatment_delta')
plt.title('Treatment Delta by Profile Distance Bucket (unt_net)')
plt.xlabel('Profile Distance Bucket')
plt.ylabel('Treatment Delta')
plt.show()

In [0]:
df['Z_instrument'] = df['distance_to_profile']
df['control_ROS'] = df['log_sales_pre']
df['control_loadfactor'] = np.log(df['Loadfactor'] + 0.01)
df['control_dtg'] = df['dtg']
df.dropna(inplace=True)
df.info()

In [0]:
import statsmodels.api as sm

X_first_stage = df[['Z_instrument', 'control_ROS', 'control_loadfactor', 'control_dtg']]
y_first_stage = df['treatment_delta']

X_first_stage = sm.add_constant(X_first_stage)

model_first_stage = sm.OLS(y_first_stage, X_first_stage).fit()

print(model_first_stage.summary())

f_test = model_first_stage.f_test("Z_instrument = 0")
f_stat = f_test.fvalue

print(f"Instrument F-Statistic: {f_stat}")

if f_stat > 10:
    print("PASS: Instrument is Strong. (F > 10)")
else:
    print("FAIL: Instrument is Weak. (F < 10)")

In [0]:
cyclic_cols = ['flight_dow', 'charge_dow', 'flight_dom', 'charge_dom', 'flight_mth', 'charge_mth']

def encode_cyclic_features(df, cols):
    for col in cols:
        df[col] = pd.to_numeric(df[col], errors='coerce')
        max_val = df[col].max()
        df[col + '_sin'] = (np.sin(2 * np.pi * df[col] / max_val)).astype('float16')
        df[col + '_cos'] = (np.cos(2 * np.pi * df[col] / max_val)).astype('float16')
        df.drop(col, axis=1, inplace=True)
    return df

df = encode_cyclic_features(df, cyclic_cols)

In [0]:
df = df.sort_values('flight_time')
train_size = int(len(df) * 0.8)

df_train = df.iloc[:train_size]
df_test = df.iloc[train_size:]

In [0]:
from sklearn import set_config
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer


set_config(transform_output="pandas")

cols_to_scale = ['dtg','total_optionality_score','combined_bp','sale_length','lag_sales_7','lag_sales_14','lag_sales_21','lag_sales_28','Loadfactor','log_sales_pre','sale_period_progress','new_cluster_label','time_quality_score','ty_capacity','local_wealth']

preprocessor = ColumnTransformer([('scaler', StandardScaler(), cols_to_scale)], remainder='passthrough', verbose_feature_names_out=False )

pipeline = Pipeline([('preprocessor', preprocessor)])

df_train_scaled = pipeline.fit_transform(df_train)
df_test_scaled = pipeline.transform(df_test)
df_train_scaled['dtg_sq_scaled'] = df_train_scaled['dtg'] ** 2
df_test_scaled['dtg_sq_scaled'] = df_test_scaled['dtg'] ** 2
df_train_scaled.head()

In [0]:
from sklearn.linear_model import Ridge

Y = df_train_scaled['outcome_delta'].values
T = df_train_scaled['treatment_delta'].values
Z = df_train_scaled['Z_instrument'].values

W = df_train_scaled[['flight_dow_sin','flight_dow_cos','charge_dow_sin','charge_dow_cos','flight_dom_sin','flight_dom_cos','charge_dom_sin','charge_dom_cos','flight_mth_sin','flight_mth_cos','charge_mth_sin','charge_mth_cos','dtg','dtg_sq_scaled','lag_sales_7','lag_sales_14','lag_sales_21','lag_sales_28','Loadfactor','log_sales_pre','sale_length','sale_period_progress','new_cluster_label','time_quality_score','ty_capacity','is_charge_date_holiday','is_flight_date_holiday']].values

X = df_train_scaled[['dtg','dtg_sq_scaled','total_optionality_score','combined_bp','local_wealth']].values
X_names = ['dtg','dtg_sq_scaled','total_optionality_score','combined_bp','local_wealth']

print(f"Running Manual DML on {len(Y)} rows...")
print("Step 1: Cleaning Data (Orthogonalization)...")

def get_residuals(target, controls):
    model = Ridge(alpha=1.0)
    model.fit(controls, target)
    return target - model.predict(controls)

y_res = get_residuals(Y, W)
t_res = get_residuals(T, W)
z_res = get_residuals(Z, W)

print("Step 2: Creating Interactions...")

T_interaction = t_res.reshape(-1, 1) * X
T_final = np.column_stack([t_res, T_interaction])

Z_interaction = z_res.reshape(-1, 1) * X
Z_final = np.column_stack([z_res, Z_interaction])

print("Step 3: Running Final IV Regression...")

from linearmodels.iv import IV2SLS

interaction_names = [f"Price_x_{col}" for col in X_names]
exog_names = ['Price'] + interaction_names

df_Y_res = pd.DataFrame(y_res, columns=['Sales_Res'])
df_T_final = pd.DataFrame(T_final, columns=exog_names)
df_Z_final = pd.DataFrame(Z_final, columns=[f"Instr_{i}" for i in range(Z_final.shape[1])])

model = IV2SLS(dependent=df_Y_res, exog=None, endog=df_T_final, instruments=df_Z_final)

results = model.fit()
print(results)

In [0]:
import pandas as pd
import numpy as np
from linearmodels.iv import IV2SLS
import matplotlib.pyplot as plt

# ---------------------------------------------------------
# 1. PREDICT ELASTICITY ON TEST SET
# ---------------------------------------------------------
# We use the formula from your trained model's coefficients.
# Replace these values with the EXACT outputs from your last run.
BETA_INTERCEPT = -1.3782
BETA_DTG = 0.1977
BETA_DTG2 = 
BETA_OPT = -0.0848
BETA_BP = 0.2534
BETA_WEALTH = 0.1375

def predict_elasticity(row):
    e = BETA_INTERCEPT
    e += BETA_DTG * row['dtg']                 # Scaled DTG
    e += BETA_BP * row['combined_bp']      # Scaled BP
    e += BETA_WEALTH * row['local_wealth'] # Scaled Wealth
    e += BETA_OPT * row['total_optionality_score'] # Scaled Opt
    return e

# Apply to Test Set (Make sure it's scaled!)
df_test_scaled['predicted_elasticity'] = df_test_scaled.apply(predict_elasticity, axis=1)

# ---------------------------------------------------------
# 2. CREATE BUCKETS (QUINTILES)
# ---------------------------------------------------------
# Sort by predicted elasticity and split into 5 groups
df_test_scaled['elasticity_bin'] = pd.qcut(df_test_scaled['predicted_elasticity'], 5, labels=[1, 2, 3, 4, 5])

print("Buckets created. Calculating ACTUAL elasticity per bucket...")

# ---------------------------------------------------------
# 3. VERIFY EACH BUCKET (THE TRUTH TEST)
# ---------------------------------------------------------
# We run a mini-IV regression for each bin to see the REAL slope.

bucket_results = []
bucket_labels = []

for bin_num in [1, 2, 3, 4, 5]:
    # Filter data for this bucket
    subset = df_test_scaled[df_test_scaled['elasticity_bin'] == bin_num]
    
    # Run simple IV: Sales ~ Price (Instrumented by Z)
    # We use the same 'get_residuals' logic or just raw IV if sample is large enough
    # Here we use raw IV for simplicity of the test check
    iv_mod = IV2SLS(
        dependent=subset['outcome_delta'],
        exog=None,
        endog=subset['treatment_delta'],
        instruments=subset['Z_instrument']
    ).fit()
    
    # The coefficient of 'treatment_delta' is the ACTUAL elasticity of this group
    real_elasticity = iv_mod.params['treatment_delta']
    bucket_results.append(real_elasticity)
    bucket_labels.append(f"Bin {bin_num}")
    
    print(f"Bin {bin_num} (Model pred: {subset['predicted_elasticity'].mean():.2f}) -> Actual Slope: {real_elasticity:.2f}")

# ---------------------------------------------------------
# 4. VISUALIZE THE SUCCESS
# ---------------------------------------------------------
plt.figure(figsize=(10, 6))
plt.bar(bucket_labels, bucket_results, color='skyblue', edgecolor='black')
plt.ylabel('Actual Observed Elasticity (IV Slope)')
plt.xlabel('Model Predicted Sensitivity (Bin 1 = Most Elastic)')
plt.title('Model Validation: Did we correctly sort the flights?')
plt.axhline(0, color='black', linewidth=1)
plt.show()