In [None]:
# Packages
import numpy as np
import scipy 
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import pygam
import sklearn
import optuna
from sklearn.preprocessing import SplineTransformer
import warnings
warnings.filterwarnings('ignore') 
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [None]:
#linear
# Train linear regression model
ols = LinearRegression().fit(X_train, y_train)

# Make predictions and evaluate
ols_pred = ols.predict(X_valid)
ols_rmse = np.sqrt(mean_squared_error(y_valid, ols_pred))
ols_r2 = r2_score(y_valid, ols_pred)
ols_mae = mean_absolute_error(y_valid, ols_pred)

print(f"\nLinear Regression Performance:")
print(f"RMSE: {ols_rmse:.2f}")
print(f"R²: {ols_r2:.4f}")
print(f"MAE: {ols_mae:.2f}")

In [None]:
#linear spline

def fit_and_plot_spline(feature, n_knots, degree, y_train, train_data):
    """Fit a spline with specified parameters and plot the results."""
    # Create the spline transformer
    spline = SplineTransformer(n_knots=n_knots, degree=degree, knots='uniform', include_bias=False)
    X_spline = spline.fit_transform(train_data[[feature]].values)
    
    # Fit linear regression on spline features
    model = LinearRegression()
    model.fit(X_spline, y_train)
    
    # Create grid for visualization
    x_min = train_data[feature].min()
    x_max = train_data[feature].max()
    x_grid = np.linspace(x_min, x_max, 100).reshape(-1, 1)
    X_grid = spline.transform(x_grid)
    
    # Make predictions
    y_pred = model.predict(X_grid)
    
    # Plot
    plt.figure(figsize=(10, 6))
    plt.scatter(train_data[feature], y_train, alpha=0.4, label='Data')
    plt.plot(x_grid, y_pred, linewidth=2, 
             label=f'degree {degree} spline with {n_knots} knots')
    plt.xlabel(f'{feature} (standardized)')
    plt.ylabel('Customer Lifetime Value')
    plt.title(f'Regression Spline: Degree {degree} with {n_knots} knots')
    plt.legend()
    plt.show()
    
    return model

In [None]:
#Natural Cubic Splines

from patsy import dmatrix, build_design_matrices

# Create a natural cubic spline with 3 degrees of freedom (df parameter)
formula = "cr(x, df=4) - 1"  # df=4 gives 3 degrees of freedom after removing the intercept
x = train['Ret_Expense']
X_natural = dmatrix(formula, {"x": x})

# Fit model
model_natural = LinearRegression()
model_natural.fit(X_natural, y_train)

# Create a grid for visualization
x_grid = np.linspace(x_min, x_max, 100)
X_grid_natural = build_design_matrices([X_natural.design_info], {"x": x_grid})[0]

# Predict
y_pred_natural = model_natural.predict(X_grid_natural)

# Plot
plt.figure(figsize=(10, 6))
plt.scatter(x, y_train, alpha=0.4, label='Data')
plt.plot(x_grid, y_pred_natural, linewidth=2, label='Natural Cubic Spline')
plt.xlabel('Retention Expense (standardized)')
plt.ylabel('Customer Lifetime Value')
plt.title('Natural Cubic Spline Regression')
plt.legend()
plt.show()

In [None]:
#GAM
from pygam import LinearGAM, s, l

# Define the model: s() for smooth terms, l() for linear terms
gam = LinearGAM(
    s(0) +  # Acq_Expense: smooth term
    s(1) +  # Ret_Expense: smooth term 
    l(2) +  # First_Purchase: linear term
    l(3) +  # Revenue: linear term
    l(4) +  # Employees: linear term
    l(5) +  # Frequency: linear term
    l(6) +  # Crossbuy: linear term
    l(7)    # Industry: linear term
)

# Fit the model
gam.fit(X_train, y_train)

# Display model summary
print(gam.summary())

# Plotting the partial dependence for each feature
plt.figure(figsize=(16, 20))

# Track the plot position
plot_idx = 1

for i, term in enumerate(gam.terms):
    if term.isintercept:
        continue
        
    # Create subplot (2 plots per row)
    plt.subplot(4, 2, plot_idx)
    plot_idx += 1
    
    # Generate points for plotting
    X_grid = gam.generate_X_grid(term=i)
    
    # Get the partial dependence and confidence intervals
    pdep, confi = gam.partial_dependence(term=i, X= X_grid, width=0.95)
    
    # Plot the function
    plt.plot(X_grid[:, term.feature], pdep)
    plt.plot( X_grid[:, term.feature], confi, c='grey', ls='--', alpha=0.5)
    
    # Label the plot
    plt.title(predictors[i])
    plt.grid(True, alpha=0.2)

plt.tight_layout()
plt.show()

import optuna
optuna.logging.set_verbosity(optuna.logging.WARNING) # Swithing off ouput, you want to remove this line to see more details

# Define a simple objective function to minimize
def objective(trial):
    # Parameter to optimize
    x = trial.suggest_float('x', -10, 10)
    
    # The function to minimize: (x-2)²
    return (x - 2) ** 2

# Create a study object and optimize
sampler = optuna.samplers.TPESampler(seed=42)  # For reproducibility
study = optuna.create_study(direction='minimize', sampler=sampler)
study.optimize(objective, n_trials=100)

# Print results
print(f"Best value: {study.best_value:.4f}")
print(f"Best parameters: {study.best_params}")



def gam_objective(trial):
    # Separate lambda parameters for different term types
    lambda_spline_acq = trial.suggest_float('lambda_spline_acq', 1e-4, 1e4, log=True)
    lambda_spline_ret = trial.suggest_float('lambda_spline_ret', 1e-4, 1e4, log=True)
    lambda_linear = trial.suggest_float('lambda_linear', 1e-4, 1e4, log=True)
    
    # Create lambda list with appropriate values for each term
    lambdas = [
        lambda_spline_acq,  # Acq_Expense (spline)
        lambda_spline_ret,  # Ret_Expense (spline)
        lambda_linear,   # First_Purchase (linear)
        lambda_linear,   # Revenue (linear)
        lambda_linear,   # Employees (linear)
        lambda_linear,   # Frequency (linear)
        lambda_linear,   # Crossbuy (linear)
        lambda_linear    # Industry (linear)
    ]
    
    # Create and fit the GAM with these lambda values
    model = LinearGAM(s(0) + s(1) + l(2) + l(3) + l(4) + l(5) + l(6) + l(7), 
                      lam=lambdas)
    model.fit(X_train, y_train)
    
    # Return the GCV score
    return model.statistics_['GCV']

sampler = optuna.samplers.TPESampler(seed=42)  # makes the sampler behave in a deterministic way

study = optuna.create_study(direction='minimize', sampler=sampler) # TPE is a method for Bayesian optimisation
study.optimize(gam_objective, n_trials=10000, timeout=120) 

best_lambdas = list(study.best_params.values())

lambda_spline_acq = best_lambdas[0]
lambda_spline_ret = best_lambdas[1]
lambda_linear = best_lambdas[2]
    
# Create lambda list with appropriate values for each term
lambdas = [
    lambda_spline_acq,  # Acq_Expense (spline)
    lambda_spline_ret,  # Ret_Expense (spline)
    lambda_linear,   # First_Purchase (linear)
    lambda_linear,   # Revenue (linear)
    lambda_linear,   # Employees (linear)
    lambda_linear,   # Frequency (linear)
    lambda_linear,   # Crossbuy (linear)
    lambda_linear    # Industry (linear)
]


optimized_gam = LinearGAM(s(0) + s(1) + l(2) + l(3) + l(4) + l(5) + l(6) + l(7), lam = lambdas)
optimized_gam.fit(X_train, y_train)

In [None]:
#KNN

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

# Select most important features
subset_indices = [0, 1, 5]  # Income, Limit, Student

# Create a column selector using ColumnTransformer
column_selector = ColumnTransformer(
    [("selector", "passthrough", subset_indices)],
    remainder="drop"
)

# Create the pipeline
knn_pipeline = Pipeline([
    ('selector', column_selector),
    ('knn', KNeighborsRegressor(metric='mahalanobis', 
                              metric_params={'V': train.iloc[:, subset_indices].cov()}))
])

# Grid search for optimal number of neighbors
param_grid = {'knn__n_neighbors': np.arange(1, 51)}

# Perform grid search
knn = GridSearchCV(knn_pipeline, param_grid, cv=rkf, scoring='neg_mean_squared_error', verbose=1)
knn.fit(X_train, y_train)

# Display best number of neighbours
print(f'\nBest hyperparameters: {knn.best_params_}')

# Use the best model
knn = knn.best_estimator_

# Make predictions and evaluate
knn_pred = knn.predict(X_valid)
knn_rmse = np.sqrt(mean_squared_error(y_valid, knn_pred))
knn_r2 = r2_score(y_valid, knn_pred)
knn_mae = mean_absolute_error(y_valid, knn_pred)

print(f"\nKNN Performance:")
print(f"RMSE: {knn_rmse:.2f}")
print(f"R²: {knn_r2:.4f}")
print(f"MAE: {knn_mae:.2f}")

In [None]:
#decision tree
model = DecisionTreeClassifier(criterion='entropy', min_samples_leaf=5)

search_space = {
    'ccp_alpha': alphas,
}

# Select alpha by grid search
tree_search = GridSearchCV(model, search_space, cv = 5 , scoring='neg_log_loss')
tree_search.fit(X_train, y_train)
tree = tree_search.best_estimator_

print('Best parameters found by grid search:', tree_search.best_params_, '\n')

from sklearn.ensemble import BaggingClassifier

bag = BaggingClassifier(DecisionTreeClassifier(criterion='entropy'), n_estimators=1000, random_state=42)
bag.fit(X_train, y_train)

In [None]:
#random forest

from sklearn.ensemble import RandomForestClassifier

rf =  RandomForestClassifier(n_estimators=100,  criterion='entropy',  max_features = 3, min_samples_leaf= 5)
rf.fit(X_train, y_train)

import optuna
from optuna.samplers import TPESampler
from sklearn.model_selection import cross_val_score

optuna.logging.set_verbosity(optuna.logging.WARNING) # Swithing off ouput, remove this line to see more details

def objective(trial):
    
    criterion = trial.suggest_categorical('criterion', ['entropy', 'gini'])
    min_samples_leaf = trial.suggest_int('min_samples_leaf', 1, 50)
    max_features = trial.suggest_int('max_features', 1, 24)
    
    
    model = RandomForestClassifier(n_estimators = 100,  
                                  criterion = criterion,  
                                  max_features = max_features, 
                                  min_samples_leaf= min_samples_leaf,
                                  random_state = 1)
    
    scores = cross_val_score(model, X_train, y_train, cv = 5, scoring = 'neg_log_loss')
    loss = - np.mean(scores)
    
    return loss

sampler = TPESampler(seed=42) 
study = optuna.create_study(direction='minimize', sampler=sampler)
study.optimize(objective, n_trials = 100, timeout = 120)  # use a higher timeout when trying to maximize accuracy, as opposed to prototyping

In [None]:
#LightGBM (Gradient Boosting)

import lightgbm as lgb

params = {
 'objective': 'regression',
 'boosting_type': 'gbdt',
 'learning_rate': 0.01,
 'num_leaves': 45,
 'lambda_l1': 0.027109795180157614,
 'lambda_l2': 1.3211122996540583e-06,
 'bagging_fraction': 0.9303390497096511,
 'bagging_freq': 8,
 'feature_fraction': 0.6954068017048125,
 'min_data_in_leaf': 2}

final_lgbm = lgb.train(params, train_data, num_boost_round = 3996)

# Make final predictions
y_pred_final = final_lgbm.predict(X_valid)

# Evaluate final performance
rmse_final = np.sqrt(mean_squared_error(y_valid, y_pred_final))
r2_final = r2_score(y_valid, y_pred_final)
mae_final = mean_absolute_error(y_valid, y_pred_final)

print(f"Final LightGBM Performance:")
print(f"RMSE: {rmse_final:.4f}")
print(f"R²: {r2_final:.4f}")
print(f"MAE: {mae_final:.4f}")

In [None]:
#stacking
from sklearn.ensemble import StackingRegressor

# Create a stacking ensemble with linear regression and KNN as base models
stack = StackingRegressor(
   estimators=[('Linear Regression', ols), ('KNN', knn)], #other models
   final_estimator=LinearRegression(), 
   cv=5
)

# Train the stacking ensemble
stack.fit(X_train, y_train)