Libraries and Dependencies

In [None]:
import MyPipe as mp
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import xgboost
import optuna
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error

Function and class definition

In [None]:
class xgbReg():
    def __init__(self, enable_categorical=True, random_state=42):
        self.xgb = xgboost.XGBRegressor(enable_categorical=enable_categorical, random_state=random_state)
    
    def fit(self, x, y):
        self.xgb.fit(x, y)
    
    def predict(self, x):
        return self.xgb.predict(x)
    
class optParam:
    def __init__(self, objective, enable_categorical=True, random_state=42, direction='minimize', study_name='my_study', n_trials=100):
        self.study = optuna.create_study(direction=direction, study_name=study_name)
        self.objective = objective
        self.n_trials = n_trials
        self.xgb_opt = None
        self.enable_categorical = enable_categorical
        self.random_state = random_state
    
    def fit(self, x, y):
        self.study.optimize(self.objective, n_trials=self.n_trials)
        self.xgb_opt = xgboost.XGBRegressor(**self.study.best_params, enable_categorical=self.enable_categorical, random_state=self.random_state)
        self.xgb_opt.fit(x, y)

    def predict(self, x):
        return self.xgb_opt.predict(x)


def preprocess_data(raw_data: pd.DataFrame):
    raw_data = raw_data.loc[(raw_data.x * raw_data.y * raw_data.z != 0) & (raw_data.price > 0)] # Clean zero dimensions and negative prices
    processed_data = raw_data.copy()
    processed_data['cut'] = pd.Categorical(processed_data['cut'], categories=['Fair', 'Good', 'Very Good', 'Ideal', 'Premium'], ordered=True)
    processed_data['color'] = pd.Categorical(processed_data['color'], categories=['D', 'E', 'F', 'G', 'H', 'I', 'J'], ordered=True)
    processed_data['clarity'] = pd.Categorical(processed_data['clarity'], categories=['IF', 'VVS1', 'VVS2', 'VS1', 'VS2', 'SI1', 'SI2', 'I1'], ordered=True)
    return processed_data

def split_data(data: pd.DataFrame, test_size=0.2, random_state=42, apply_ylog = False):
    x = data.drop(columns='price')
    y = data['price']
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_size, random_state=random_state)
    if (apply_ylog):
        y_train = np.log(y_train)
        y_test = np.log(y_test)
    return x_train, x_test, y_train, y_test


    
def plot_gof(y_true: pd.Series, y_pred: pd.Series):
    plt.plot(y_true, y_pred, '.')
    plt.plot(y_true, y_true, linewidth=3, c='black')
    plt.xlabel('Actual')
    plt.ylabel('Predicted')
    plt.show()


C2.1 Example on how to use the class MyPipe with the whole dataset (no data acquisition) and the custom trasformer xgbReg

In [None]:
diamonds = pd.read_csv("https://raw.githubusercontent.com/xtreamsrl/xtream-ai-assignment-engineer/main/datasets/diamonds/diamonds.csv")

data = preprocess_data(diamonds)
x_train, x_test, y_train, y_test = split_data(data)

my_steps = [('regression', xgbReg())]

In [None]:
my_pipeline = mp.MyPipe(steps=my_steps)
my_pipeline.define_data(data)
my_pipeline.fit(x_train,y_train)

pred = my_pipeline.predict(x_test)
performance = my_pipeline.evaluate_performance(y_test,pred)

In [None]:
performance

In [None]:
plot_gof(y_test, pred)

C2.2 Example on how to include the optuna improvement

In [None]:
#Define the objective function
def objective(trial: optuna.trial.Trial) -> float:
    # Define hyperparameters to tune
    param = {
        'lambda': trial.suggest_float('lambda', 1e-8, 1.0, log=True),
        'alpha': trial.suggest_float('alpha', 1e-8, 1.0, log=True),
        'colsample_bytree': trial.suggest_categorical('colsample_bytree', [0.3, 0.4, 0.5, 0.7]),
        'subsample': trial.suggest_categorical('subsample', [0.5, 0.6, 0.7, 0.8, 0.9, 1.0]),
        'learning_rate': trial.suggest_float('learning_rate', 1e-8, 1.0, log=True),
        'n_estimators': trial.suggest_int('n_estimators', 100, 1000),
        'max_depth': trial.suggest_int('max_depth', 3, 9),
        'random_state': 42,
        'min_child_weight': trial.suggest_int('min_child_weight', 1, 10),
        'enable_categorical': True
    }

    # Split the training data into training and validation sets
    x_train, x_val, y_train, y_val = train_test_split(x_train_opt, y_train_opt, test_size=0.2, random_state=42)

    # Train the model
    model = xgboost.XGBRegressor(**param)
    model.fit(x_train, y_train)

    # Make predictions
    preds = model.predict(x_val)

    # Calculate MAE
    mae = mean_absolute_error(y_val, preds)

    return mae

In [None]:
x_train_opt, x_test_opt, y_train_opt, y_test_opt = split_data(data)

my_steps = [('parameters optimization', optParam(objective, n_trials=100))]

In [None]:
my_pipeline = mp.MyPipe(steps=my_steps)
my_pipeline.define_data(data)
my_pipeline.fit(x_train_opt,y_train_opt)

pred_opt = my_pipeline.predict(x_test_opt)
performance = my_pipeline.evaluate_performance(y_test_opt,pred_opt)

C2.3 Simulate data acquisition

In [None]:
half = int(len(data.index)/2)
data0 = data[:half] # Suppose we know half of the data at the beginning of the procedure
batch_size = 300 #Number of new diamonds in each batch of new data
data_new = [] # List of the data coming in batch at every update
i = 0
while (half+batch_size*(i+1)<len(data.index)):
    data_new.append(data[half+batch_size*i:half+batch_size*(i+1)])
    i +=1
data_new.append(data[half+batch_size*i:])

# Define the first model with half of the data
x_train, x_test, y_train, y_test = split_data(data0, apply_ylog=False)

my_steps = [('parameters optimization', optParam(objective, n_trials=20))]

my_pipeline = mp.MyPipe(steps=my_steps)
my_pipeline.define_data(data0)
my_pipeline.fit(x_train,y_train)

pred = my_pipeline.predict(x_test)
performance = my_pipeline.evaluate_performance(y_test, pred)
my_pipeline.dump('../data/models_history/xgb_opt_model/xgb_opt_0.pkl') # save the pipeline to file

for n, current_data in enumerate(data_new):
    print(f'Batch {n+1} of {len(data_new)}')
    my_pipeline.augment_data(current_data)
    x_train, x_test, y_train, y_test = split_data(my_pipeline.data, apply_ylog=False)
    my_pipeline.fit(x_train, y_train)
    pred = my_pipeline.predict(x_test)
    performance = my_pipeline.evaluate_performance(y_test, pred)    
    my_pipeline.dump(f'../data/models_history/xgb_opt_model/xgb_opt_{n+1}.pkl') # save the pipeline to file

In [None]:
my_pipeline.plot_history(trendline=True)