In [None]:
import pandas as pd
import numpy as np
import warnings

warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

In [206]:
X_meat = pd.read_csv('./data/meat_df.csv', index_col='PURCHASE_DATE')
X_milk = pd.read_csv('./data/milk_df.csv', index_col='PURCHASE_DATE')
print(X_meat.shape, X_milk.shape)
problem_1 = pd.read_csv('./data/problem 1.csv')

(108195, 4) (108195, 4)


In [207]:
from sklearn.preprocessing import LabelEncoder

# encode MAGIC_KEY

le = LabelEncoder()
le.fit(X_meat['MAGIC_KEY'])
X_meat['MAGIC_KEY'] = le.fit_transform(X_meat['MAGIC_KEY'])
X_milk['MAGIC_KEY'] = le.fit_transform(X_milk['MAGIC_KEY'])
problem_1['MAGIC_KEY'] = le.fit_transform(problem_1['MAGIC_KEY'])

In [208]:
le.classes_[:5]

array(['2498CA210F2', '249BD4201E6', '249C911B64A', '249D123A385',
       '249DA1DC108'], dtype=object)

In [209]:
from sklearn.model_selection import train_test_split

# split data
X_train_meat, X_test_meat, y_train_meat, y_test_meat = train_test_split(X_meat.drop(columns='MEAT'), X_meat['MEAT'], test_size=0.2, random_state=42)
X_train_milk, X_test_milk, y_train_milk, y_test_milk = train_test_split(X_milk.drop(columns='MILK'), X_milk['MILK'], test_size=0.2, random_state=42)

In [ ]:
#OPTUNA
import optuna

class Optuna:
    def __init__(self, X, y, model, n_trials=100):
        self.X = X
        self.y = y
        self.model = model
        self.n_trials = n_trials
        self.best_params = None
        self.best_score = None
        
    def objective(self, trial):
        params = {
            'n_estimators': trial.suggest_int('n_estimators', 100, 1000),
            'max_depth': trial.suggest_int('max_depth', 3, 10),
            'learning_rate': trial.suggest_loguniform('learning_rate', 1e-3, 1e-1),
            'subsample': trial.suggest_uniform('subsample', 0.5, 1),
            'colsample_bytree': trial.suggest_uniform('colsample_bytree', 0.5, 1),
            'reg_alpha': trial.suggest_loguniform('reg_alpha', 1e-3, 1e3),
            'reg_lambda': trial.suggest_loguniform('reg_lambda', 1e-3, 1e3),
            'min_child_weight': trial.suggest_int('min_child_weight', 1, 300),
            'gamma': trial.suggest_loguniform('gamma', 1e-3, 1e3),
            'n_jobs': -1
        }
        
        model = self.model(**params)
        model.fit(self.X, self.y)
        preds = model.predict(self.X)
        score = mean_squared_error(self.y, preds)
        
        return score
    
    def optimize(self, maximize=False):
        study = None
        
        if maximize:
            study = optuna.create_study(direction='maximize')
        else:
            study = optuna.create_study(direction='minimize')
            
        study.optimize(self.objective, n_trials=self.n_trials)
        
        self.best_params = study.best_params
        self.best_score = study.best_value

In [210]:
import torch
import torch.nn as nn
import torch.optim as optim


from sklearn.ensemble import HistGradientBoostingRegressor, GradientBoostingClassifier
from xgboost import XGBRegressor, XGBClassifier
from lightgbm import LGBMRegressor, LGBMClassifier
from catboost import CatBoostRegressor, CatBoostClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.ensemble import AdaBoostClassifier


# model

class Regressor_torch(nn.Module):
    def __init__(self, input_dim, output_dim):
        self.model = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, output_dim)
        )
       
    def forward(self, x):
        return self.model(x)

class Classification_torch(nn.Module):
    def __init__(self, input_dim, output_dim):
        self.model = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, output_dim),
            nn.Softmax()
        )
       
    def forward(self, x):
        return self.model(x) 

class RegressorV1:
    def __init__(self):
        self.models = {
            'HistGradientBoostingRegressor': HistGradientBoostingRegressor(),
            'XGBRegressor': XGBRegressor(),
            'LGBMRegressor': LGBMRegressor(),
            'CatBoostRegressor': CatBoostRegressor()
        }
    
    def fit(self, X, y):
        for model in self.models.values():
            model.fit(X, y)
    
    def predict(self, X, model=None):
        if model:
            return self.models[model].predict(X)
        
        return {name: model.predict(X) for name, model in self.models.items()}
    
class ClassifierV1:
    def __init__(self):
        self.models = {
            'GBC': GradientBoostingClassifier(),
            'ABC': AdaBoostClassifier(),
            'SVC': SVC(),
            'KNC': KNeighborsClassifier(),
            'GPC': GaussianProcessClassifier(),
            'XGBC': XGBClassifier(),
            'LGBC': LGBMClassifier(),
            'CBC': CatBoostClassifier()
        }
        
        self.best_model = None
         
    def fit(self, X, y):
        for model in self.models.values():
            model.fit(X, y)
        
    def predict(self, X, model=None):
        if model:
            return self.models[model].predict(X)
        
        return {name: model.predict(X) for name, model in self.models.items()}    
    
    

class ClassifierV2:
    def __init__(self, params: dict):
        self.models = {
            'GBC': GradientBoostingClassifier(**params['GBC']),
            'ABC': AdaBoostClassifier(**params['ABC']),
            'SVC': SVC(**params['SVC']),
            'KNC': KNeighborsClassifier(**params['KNC']),
            'GPC': GaussianProcessClassifier(**params['GPC']),
            'XGBC': XGBClassifier(**params['XGBC']),
            'LGBC': LGBMClassifier(**params['LGBC']),
            'CBC': CatBoostClassifier(**params['CBC'])
        }
        
    def fit(self, X, y):
        for model in self.models.values():
            model.fit(X, y)
        
    def predict(self, X, model=None):
        if model:
            return self.models[model].predict(X)
        
        return {name: model.predict(X) for name, model in self.models.items()}

In [2]:
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import mean_squared_error, f1_score, log_loss, accuracy_score

class TrainClassifier:
    def __init__(self, model, X, y, n_splits=5):
        self.model = model
        self.X = X
        self.y = y
        self.n_splits = n_splits
        self.f1_scores = []
        self.log_losses = []
        self.accuracy_scores = []
        
    def train(self):
        kf = StratifiedKFold(n_splits=self.n_splits, shuffle=True)
        f1_scores = []
        log_losses = []
        accuracy_scores = []
        
        for train_index, test_index in kf.split(self.X):
            X_train, X_test = self.X.iloc[train_index], self.X.iloc[test_index]
            y_train, y_test = self.y.iloc[train_index], self.y.iloc[test_index]
            
            self.model.fit(X_train, y_train)
            preds = self.model.predict(X_test)
            
            f1_scores.append(f1_score(y_test, preds))
            log_losses.append(log_loss(y_test, preds))
            accuracy_scores.append(accuracy_score(y_test, preds))
        
        self.f1_scores = f1_scores
        self.log_losses = log_losses
        self.accuracy_scores = accuracy_scores
    
    def get_scores(self):
        return {
            'f1_scores': self.f1_scores,
            'log_losses': self.log_losses,
            'accuracy_scores': self.accuracy_scores
        }

class TrainRegressor:
    def __init__(self, model, X, y, n_splits=5):
        self.model = model
        self.X = X
        self.y = y
        self.n_splits = n_splits
        self.mean_squared_errors = []
        
    def train(self):
        kf = StratifiedKFold(n_splits=self.n_splits, shuffle=True)
        mean_squared_errors = []
        
        for train_index, test_index in kf.split(self.X):
            X_train, X_test = self.X.iloc[train_index], self.X.iloc[test_index]
            y_train, y_test = self.y.iloc[train_index], self.y.iloc[test_index]
            
            self.model.fit(X_train, y_train)
            preds = self.model.predict(X_test)
            
            mean_squared_errors.append(mean_squared_error(y_test, preds))
        
        self.mean_squared_errors = mean_squared_errors
    
    def get_scores(self):
        return {
            'mean_squared_errors': self.mean_squared_errors
        }