In [None]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns
from catboost import CatBoostRegressor, Pool, MetricVisualizer
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from scipy.stats import boxcox
import warnings
import shap
import GPyOpt

In [None]:
sns.set()
shap.initjs()

In [None]:
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

In [None]:
# Load and preprocess data
def load_data(file_path):
    data = pd.read_csv(file_path, encoding="ISO-8859-1", dtype={'CustomerID': str})
    data["InvoiceDate"] = pd.to_datetime(data.InvoiceDate, cache=True)
    data = data[data.UnitPrice > 0]
    data = data[(data.Quantity > 0) & (data.Quantity < 55)]
    data["Revenue"] = data.Quantity * data.UnitPrice
    data["Year"] = data.InvoiceDate.dt.year
    data["Quarter"] = data.InvoiceDate.dt.quarter
    data["Month"] = data.InvoiceDate.dt.month
    data["Week"] = data.InvoiceDate.dt.isocalendar().week
    data["Weekday"] = data.InvoiceDate.dt.weekday
    data["Day"] = data.InvoiceDate.dt.day
    data["Dayofyear"] = data.InvoiceDate.dt.dayofyear
    data["Date"] = pd.to_datetime(data[['Year', 'Month', 'Day']])
    data = data.dropna(subset=["CustomerID", "Description"])
    data["Description"] = data["Description"].str.lower()
    data = data[~data["Description"].str.contains("nan")]
    data = data[data.Description.str.len() > 0]
    return data

In [None]:
# Extract and visualize stock codes
def analyze_stock_codes(data):
    data["StockCodeLength"] = data.StockCode.str.len()
    data["nNumericStockCode"] = data.StockCode.apply(lambda l: sum(1 for c in l if c.isdigit()))
    data = data[(data.nNumericStockCode == 5) & (data.StockCodeLength == 5)]
    return data

In [None]:
# Cluster products
def cluster_products(data, n_clusters=30):
    products = pd.DataFrame(index=data.StockCode.unique(), columns=["MedianPrice", "MedianQuantities", "Customers", "DescriptionLength"])
    products["MedianPrice"] = data.groupby("StockCode").UnitPrice.median()
    products["MedianQuantities"] = data.groupby("StockCode").Quantity.median()
    products["Customers"] = data.groupby("StockCode").CustomerID.nunique()
    products["DescriptionLength"] = data.groupby("StockCode").Description.apply(lambda x: x.str.len().median())
    
    for col in products.columns:
        products[col] = boxcox(products[col])[0]
    
    scaler = StandardScaler()
    X = scaler.fit_transform(products.values)
    km = KMeans(n_clusters=n_clusters)
    products["cluster"] = km.fit_predict(X)
    
    data["ProductType"] = data.StockCode.map(products.cluster)
    return data, products

In [None]:
# Aggregate daily data
def aggregate_daily_data(data):
    grouped_features = ["Date", "Year", "Quarter", "Month", "Week", "Weekday", "Dayofyear", "Day", "StockCode"]
    daily_data = data.groupby(grouped_features).agg({"Quantity": "sum", "Revenue": "sum"}).reset_index()
    return daily_data

In [None]:
# Prepare training and validation data
def prepare_data(daily_data, week):
    X = daily_data.drop(["Quantity", "Revenue", "Date"], axis=1)
    y = np.log(daily_data["Quantity"])
    return X, y

In [None]:
# Define CatBoost model class
class CatHyperparameter:
    def __init__(self, loss="RMSE", metric="RMSE", iterations=1000, max_depth=4, l2_leaf_reg=3, seed=0):
        self.loss = loss
        self.metric = metric
        self.max_depth = max_depth
        self.l2_leaf_reg = l2_leaf_reg
        self.iterations = iterations
        self.seed = seed

In [None]:
# Define CatBoost model class (keep the rest of the class as before)
class Catmodel:
    def __init__(self, name, params):
        self.name = name
        self.params = params

    def set_data(self, X, y, week):
        cat_features_idx = np.where(X.dtypes != float)[0]
        x_train, self.x_val = X[X.Week < week], X[X.Week >= week]
        y_train, self.y_val = y[X.Week < week], y[X.Week >= week]
        self.train_pool = Pool(x_train, y_train, cat_features=cat_features_idx)
        self.val_pool = Pool(self.x_val, self.y_val, cat_features=cat_features_idx)

    def prepare_model(self):
        self.model = CatBoostRegressor(
            loss_function=self.params.loss,
            random_seed=self.params.seed,
            logging_level='Silent',
            iterations=self.params.iterations,
            max_depth=self.params.max_depth,
            l2_leaf_reg=self.params.l2_leaf_reg,
            od_type='Iter',
            od_wait=40,
            train_dir=self.name,
            has_time=True
        )

    def learn(self, plot=False):
        self.prepare_model()
        self.model.fit(self.train_pool, eval_set=self.val_pool, plot=plot)
        print(f"{self.name}, early-stopped model tree count {self.model.tree_count_}")

    def score(self):
        return self.model.score(self.val_pool)

    def show_importances(self, kind="bar"):
        explainer = shap.TreeExplainer(self.model)
        shap_values = explainer.shap_values(self.val_pool)
        if kind == "bar":
            return shap.summary_plot(shap_values, self.x_val, plot_type="bar")
        return shap.summary_plot(shap_values, self.x_val)

    def get_val_results(self):
        self.results = pd.DataFrame(self.y_val)
        self.results["prediction"] = self.predict(self.x_val)
        self.results["error"] = np.abs(self.results[self.results.columns.values[0]].values - self.results.prediction)
        self.results["Month"] = self.x_val.Month
        self.results["SquaredError"] = self.results.error.apply(lambda l: np.power(l, 2))

    def show_val_results(self):
        self.get_val_results()
        fig, ax = plt.subplots(1, 2, figsize=(20, 5))
        sns.distplot(self.results.error, ax=ax[0])
        ax[0].set_xlabel("Single absolute error")
        ax[0].set_ylabel("Density")
        self.median_absolute_error = np.median(self.results.error)
        print(f"Median absolute error: {self.median_absolute_error}")
        ax[0].axvline(self.median_absolute_error, c="black")
        ax[1].scatter(self.results.prediction.values, self.results[self.results.columns[0]].values, c=self.results.error, cmap="RdYlBu_r", s=1)
        ax[1].set_xlabel("Prediction")
        ax[1].set_ylabel("Target")
        return ax

    def get_monthly_RMSE(self):
        return self.results.groupby("Month").SquaredError.mean().apply(lambda l: np.sqrt(l))

    def predict(self, x):
        return self.model.predict(x)

    def get_dependence_plot(self, feature1, feature2=None):
        explainer = shap.TreeExplainer(self.model)
        shap_values = explainer.shap_values(self.val_pool)
        if feature2 is None:
            return shap.dependence_plot(feature1, shap_values, self.x_val)
        else:
            return shap.dependence_plot(feature1, shap_values, self.x_val, interaction_index=feature2)

In [None]:
# Define hyperparameter tuning class
class Hypertuner:
    def __init__(self, model, max_iter=10, max_time=10, max_depth=6, max_l2_leaf_reg=20):
        self.bounds = [{'name': 'depth', 'type': 'discrete', 'domain': (1, max_depth)},
                       {'name': 'l2_leaf_reg', 'type': 'discrete', 'domain': (1, max_l2_leaf_reg)}]
        self.model = model
        self.max_iter = max_iter
        self.max_time = max_time
        self.best_depth = None
        self.best_l2_leaf_reg = None

    def objective(self, params):
        params = params[0]
        params = CatHyperparameter(max_depth=params[0], l2_leaf_reg=params[1])
        self.model.params = params
        self.model.learn()
        return self.model.score()

    def learn(self):
        np.random.seed(777)
        optimizer = GPyOpt.methods.BayesianOptimization(f=self.objective, domain=self.bounds, acquisition_type='EI', acquisition_par=0.2, exact_eval=True)
        optimizer.run_optimization(self.max_iter, self.max_time)
        optimizer.plot_convergence()
        best = optimizer.X[np.argmin(optimizer.Y)]
        self.best_depth = best[0]
        self.best_l2_leaf_reg = best[1]
        print(f"Optimal depth is {self.best_depth} and optimal l2-leaf-reg is {self.best_l2_leaf_reg}")
        print('Optimal RMSE:', np.min(optimizer.Y))

    def retrain_catmodel(self):
        params = CatHyperparameter(max_depth=self.best_depth, l2_leaf_reg=self.best_l2_leaf_reg)
        self.model.params = params
        self.model.learn(plot=True)
        return self.model

In [None]:
# Define main function to run the workflow
def main():
    data = load_data("data.csv")
    data = analyze_stock_codes(data)
    data, products = cluster_products(data)
    daily_data = aggregate_daily_data(data)
    week = daily_data.Week.max() - 2
    X, y = prepare_data(daily_data, week)
    
    params = CatHyperparameter()
    model = Catmodel("baseline", params)
    model.set_data(X, y, week)
    model.learn(plot=True)
    model.score()
    model.show_val_results()
    model.show_importances()
    model.show_importances(kind=None)
    np.mean(np.abs(np.exp(model.results.prediction) - np.exp(model.results.Quantity)))
    np.median(np.abs(np.exp(model.results.prediction) - np.exp(model.results.Quantity)))

    search = Hypertuner(model)
    search.learn()
    search.retrain_catmodel()

In [None]:
if __name__ == "__main__":
    main()