<a href="https://colab.research.google.com/github/VortexOsxo/Chess/blob/master/God_Optuna.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Utils

In [64]:
from google.colab import drive

drive.mount('/content/drive')
file_path = '/content/drive/My Drive/Inf8245/'

RANDOM_STATE = 42
NUM_FOLDS = 10

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [65]:
import numpy as np
import pandas as pd

def load_meta_data():
    meta_train = pd.read_csv(f'{file_path}Data/metadata_train.csv')
    meta_test = pd.read_csv(f'{file_path}Data/metadata_test.csv')
    return meta_train, meta_test

def load_data():
    data_train = np.load(f'{file_path}Data/train.npz')
    data_test = np.load(f'{file_path}Data/test.npz')

    X_train, y_train = data_train["X_train"], data_train["y_train"]
    X_test = data_test["X_test"]

    return X_train, y_train, X_test

def load_ids():
    data_train = np.load(f'{file_path}Data/train.npz')
    data_test = np.load(f'{file_path}Data/test.npz')

    return data_train["ids"], data_test["ids"]

def save_predictions(preds, filename):
    _, test_ids = load_ids()

    if len(test_ids) != len(preds):
        raise ValueError("Length of test_ids and preds must be the same.")

    df = pd.DataFrame({
        "id": test_ids.astype(str),
        "label": preds
    })

    df.to_csv(f'{file_path}Predictions/{filename}.csv', index=False)

def merge_train_test(X_train, X_test, debug=False):
    X_combined = np.concatenate([X_train, X_test], axis=0)
    if debug: print(X_combined.shape)
    return X_combined

def remove_null_variance_column(X_train, X_test, debug=False):
    mask = (X_train.min(axis=0) != X_train.max(axis=0))

    if debug: print(f'Initial Column number: {X_train.shape[1]}')
    X_train = X_train[:, mask]
    X_test  = X_test[:, mask]
    if debug: print(f'After removed null variance: {X_train.shape[1]}')

    return X_train, X_test

# Data Preprocessing

## PCA & SVD

In [66]:
def load_transform(transform, n_components, is_combined=True, tolerance=1e-5):
  name = f'nc_{n_components}_combined_{str(is_combined).lower()}_tol_{tolerance}'
  X_train = np.load(f'{file_path}Data/{transform}/{name}train.npy')
  X_test = np.load(f'{file_path}Data/{transform}/{name}test.npy')
  return X_train, X_test

def load_pca(n_components, is_combined=True, tolerance=1e-5):
  return load_transform('PCA', n_components, is_combined, tolerance)

def load_svd(n_components, is_combined=True, tolerance=1e-5):
  return load_transform('SVD', n_components, is_combined, tolerance)

## Chi2

In [67]:
from sklearn.feature_selection import VarianceThreshold, SelectKBest, chi2

def load_var_chi2(X_train, X_test, y_train, n_components, threshold):
  selector = VarianceThreshold(threshold=threshold)
  X_train  =  selector.fit_transform(X_train)
  X_test   = selector.transform(X_test)

  selector = SelectKBest(chi2, k=n_components)

  X_train = selector.fit_transform(X_train, y_train)
  X_test  = selector.transform(X_test)

  return X_train, X_test

## Metadata


In [68]:
import joblib
from scipy import sparse
from sklearn.preprocessing import OneHotEncoder

def save_metadata(meta_train, meta_test, reduced=False):
  joblib.dump(meta_train, f"{file_path}Data/X_meta_train_sparse{"_reduced" if reduced else ""}.pkl")
  joblib.dump(meta_test,  f"{file_path}Data/X_meta_test_sparse{"_reduced" if reduced else ""}.pkl")

def load_meta_onehot(reduced=False, debug=False):
  X_meta_train = joblib.load(f"{file_path}Data/X_meta_train_sparse{"_reduced" if reduced else ""}.pkl")
  X_meta_test  = joblib.load(f"{file_path}Data/X_meta_test_sparse{"_reduced" if reduced else ""}.pkl")

  X_meta_train_dense = X_meta_train.toarray() if sparse.issparse(X_meta_train) else X_meta_train
  X_meta_test_dense  = X_meta_test.toarray()  if sparse.issparse(X_meta_test)  else X_meta_test

  return X_meta_train_dense, X_meta_test_dense

def append_meta_to_data(X_train, X_test, reduced=False, debug=False):
  meta_train, meta_test = load_meta_onehot(reduced)

  X_train_combined = np.hstack([meta_train, X_train])
  X_test_combined  = np.hstack([meta_test,  X_test])

  return X_train_combined, X_test_combined

def build_categorical_features(meta_train, meta_test, selected_columns):
  if len(selected_columns) == 0:
    # No metadata selected by Optuna
    n_train = meta_train.shape[0]
    n_test = meta_test.shape[0]
    return np.zeros((n_train, 0)), np.zeros((n_test, 0))

  train_cat = meta_train[selected_columns].copy()
  test_cat = meta_test[selected_columns].copy()

  threshold = 15

  for col in selected_columns:
    counts = train_cat[col].value_counts(dropna=False)
    keep_categories = set(counts[counts >= threshold].index)

    def map_value(v):
      return v if v in keep_categories else "Other"

    train_cat[col] = train_cat[col].map(map_value)
    test_cat[col] = test_cat[col].map(map_value)

  encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=False)
  train_encoded = encoder.fit_transform(train_cat)
  test_encoded = encoder.transform(test_cat)

  return train_encoded, test_encoded


# Optuna

### Imports

In [69]:
!pip install optuna

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score
import optuna



### Load Data

In [70]:
SIZES = [64, 96, 128, 192, 256, 512]


class MyDataLoader():

    def __init__(self):
        self.meta_train, self.meta_test = load_meta_data()
        self.X_train, self.y_train, self.X_test = load_data()
        self.data_dict = {
            'PCA': {}, 'SVD':{},
        }
        self.load_data()


    def load_data(self):
        for size in SIZES:
            X_tr, _ = load_svd(n_components=size)
            self.data_dict['SVD'][size] = X_tr

            X_tr, _ = load_pca(n_components=size)
            self.data_dict['PCA'][size] = X_tr
            print(f"  [OK] Loaded input_size={size:<3} | Shape: {X_tr.shape}")


    def get_combined_data(self, config, train, test=None) -> np.ndarray:
        meta_train, meta_test = build_categorical_features(
            self.meta_train, self.meta_test,
            config['selected_columns'],
        )
        if test is None:
            return np.hstack([meta_train, train])
        return np.hstack([meta_train, train]), np.hstack([meta_test, test])


    def get_val_data(self, type, config):
        if type in ['PCA', 'SVD']:
            X_train = self.data_dict[type][config['input_size']]
        elif type == 'CHI2':
            X_train, _ = load_var_chi2(self.X_train, self.X_test, self.y_train, config['input_size'], config['chi2_threshold'])
        return self.get_combined_data(config, X_train)


    def get_test_data(self, type, config):
        if type == 'SVD':
            X_train, X_test = load_svd(n_components=config['input_size'])
        elif type == 'PCA':
            X_train, X_test = load_pca(n_components=config['input_size'])
        elif type == 'CHI2':
            X_train, X_test = load_var_chi2(self.X_train, self.X_test, self.y_train, config['input_size'], config['chi2_threshold'])
        return self.get_combined_data(config, X_train, X_test)


data_loader = MyDataLoader()

  [OK] Loaded input_size=64  | Shape: (1939, 64)
  [OK] Loaded input_size=96  | Shape: (1939, 96)
  [OK] Loaded input_size=128 | Shape: (1939, 128)
  [OK] Loaded input_size=192 | Shape: (1939, 192)
  [OK] Loaded input_size=256 | Shape: (1939, 256)
  [OK] Loaded input_size=512 | Shape: (1939, 512)


# Hyperparameter Search

## BaseModelClass

In [71]:
from abc import abstractclassmethod
from imblearn.over_sampling import SMOTE
import datetime

class ModelClass():
  @classmethod
  def objective(cls, trial):
    config = cls.create_data_config(trial)
    return cls.evaluate_model(config)

  @abstractclassmethod
  def update_data_config(cls, config, trial):
    pass

  @classmethod
  def create_data_config(cls, trial):
    config = {
        'data_type':    trial.suggest_categorical("data_type", ['PCA', 'SVD', 'CHI2']),
        'use_smote':    trial.suggest_categorical("use_smote", [True, False]),
        'threshold':    trial.suggest_float("threshold", 0.1, 0.7),
    }

    # Metadata feature used
    config['selected_columns'] = [
        col for col in ['Isolation type', 'Location', 'Isolation source', 'Testing standard']
        if trial.suggest_categorical(f"use_{col}", [True, False])
    ]

    # Conditionnal
    config['chi2_threshold'] = None
    if config['data_type'] == 'CHI2':
      config['input_size'] = trial.suggest_int("input_size_chi2", 64, 1024, log=True)
      config['chi2_threshold'] = trial.suggest_float("chi2_threshold", 0.001, 0.1)
    else:
      config['input_size'] = trial.suggest_categorical("input_size_pca_svd", SIZES)

    # Smote
    config['smote_k'] = trial.suggest_int("smote_k", 1, 10) if config['use_smote'] else None
    config['smote_ratio'] = trial.suggest_float("smote_ratio", 0.3, 1.0) if config['use_smote'] else None
    return cls.update_data_config(config, trial)

  @abstractclassmethod
  def train_predict(cls, X_tr, X_val, y_tr, config):
    pass

  @classmethod
  def evaluate_model(cls, config):
    # Cross validation folds
    kf = StratifiedKFold(n_splits=NUM_FOLDS, shuffle=True, random_state=RANDOM_STATE)
    f1_scores = []

    # Get Data
    X_source = data_loader.get_val_data(config['data_type'], config)
    y_source = data_loader.y_train

    for fold, (train_idx, val_idx) in enumerate(kf.split(X_source, y_source)):
        X_tr, X_val = X_source[train_idx], X_source[val_idx]
        y_tr, y_val = y_source[train_idx], y_source[val_idx]

        # --- SMOTE ---
        if config['use_smote']:
            try:
                smote = SMOTE(k_neighbors=config['smote_k'], sampling_strategy=config['smote_ratio'], random_state=RANDOM_STATE)
                X_tr, y_tr = smote.fit_resample(X_tr, y_tr)
            except ValueError:
                print('Smote Failed...')
                pass

        # Function to evaluate a model, return its predictions
        val_probs = cls.train_predict(X_tr, X_val, y_tr, config)

        val_preds = (val_probs > config['threshold']).astype(int)
        score = f1_score(y_val, val_preds, average='macro')
        f1_scores.append(score)

    return np.mean(f1_scores)

  @classmethod
  def predict(cls, config, proba=False, save=False):
        X_train_full, X_test_final = data_loader.get_test_data(config['data_type'], config)
        y_train_full = data_loader.y_train

        # 2. --- SMOTE ---
        if config['use_smote']:
            print("Applying SMOTE to full training set...")
            try:
                smote = SMOTE(k_neighbors=config['smote_k'], sampling_strategy=config['smote_ratio'], random_state=RANDOM_STATE)
                X_train_full, y_train_full = smote.fit_resample(X_train_full, y_train_full)
            except ValueError:
                print('Smote Failed on full set, proceeding without oversampling.')
                pass

        probs = cls.train_predict(X_train_full, X_test_final, y_train_full, config)
        results = probs if proba else (probs > config['threshold']).astype(int)

        if len(results.shape) > 1:
             results = results.flatten()
        if save:
            model_name = cls.__name__.replace('Model', '')
            date_str = datetime.datetime.now().strftime("%Y%m%d_%H%M")
            pred_type = "PROBS" if proba else "PREDS"

            filename = f"Submission_{model_name}_{date_str}_{pred_type}"
            save_predictions(results, filename)
            print(f"Saved submission to {filename}.csv")
        return results

## Logistic regression

In [72]:
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler

class LogisticRegressionModel(ModelClass):
  @classmethod
  def update_data_config(cls, config, trial):
    # --- LR Model Params ---
    config['C'] = trial.suggest_float("C", 1e-4, 1e2, log=True)
    config['penalty'] = trial.suggest_categorical("penalty", ['l1', 'l2'])

    # Solver choice depends on the penalty (saga supports both l1 and l2)
    config['solver'] = 'saga'
    config['max_iter'] = trial.suggest_int("max_iter", 5000, 10000)

    # --- Validation Params ---
    config['scale_data'] = trial.suggest_categorical("scale_data", [True, False])
    return config

  @classmethod
  def train_predict(cls, X_tr, X_val, y_tr, config):
    if config['scale_data']:
      scaler = StandardScaler()
      X_tr = scaler.fit_transform(X_tr)
      X_val = scaler.transform(X_val)

    # Initialize and Train the Model
    model = LogisticRegression(
      C=config['C'],
      penalty=config['penalty'],
      solver=config['solver'],
      max_iter=config['max_iter'],
      random_state=RANDOM_STATE
    )

    # Fit the model
    model.fit(X_tr, y_tr)
    return model.predict_proba(X_val)[:, 1]

## Random Forest

In [73]:
from sklearn.ensemble import RandomForestClassifier

class RandomForestModel(ModelClass):
  @classmethod
  def update_data_config(cls, config, trial):
    # --- RF Model Params ---
    config['n_estimators']      = trial.suggest_int("n_estimators", 50, 500, step=50)
    config['max_depth']         = trial.suggest_int("max_depth", 5, 30, log=True)
    config['criterion']         = trial.suggest_categorical("criterion", ['gini', 'entropy', 'log_loss'])
    config['min_samples_split'] = trial.suggest_int("min_samples_split", 2, 20)
    config['min_samples_leaf']  = trial.suggest_int("min_samples_leaf", 1, 10)
    config['max_features']      = trial.suggest_categorical("max_features", ['sqrt', 'log2', 0.5, 0.7, 1.0, None])
    config['bootstrap']         = trial.suggest_categorical("bootstrap", [True])
    config['class_weight']      = trial.suggest_categorical("class_weight", [None, "balanced", "balanced_subsample"])
    config['max_samples']       = trial.suggest_categorical("max_samples", [None, 0.5, 0.7, 0.9])
    return config

  @classmethod
  def train_predict(cls, X_tr, X_val, y_tr, config):
      model = RandomForestClassifier(
        n_estimators=config['n_estimators'],
        max_depth=config['max_depth'],
        criterion=config['criterion'],
        min_samples_split=config['min_samples_split'],
        min_samples_leaf=config['min_samples_leaf'],
        max_features=config['max_features'],
        bootstrap=config['bootstrap'],
        random_state=RANDOM_STATE,
        class_weight=config['class_weight'],
        max_samples=config['max_samples'],
        n_jobs=-1
      )

      model.fit(X_tr, y_tr)
      return model.predict_proba(X_val)[:, 1]

## Extremly Random Forest


In [82]:
from sklearn.ensemble import ExtraTreesClassifier
# Note: You can reuse the Random Forest's update_data_config method
# or copy it here, as the parameters are nearly identical.

class ExtraTreesModel(ModelClass):
  @classmethod
  def update_data_config(cls, config, trial):
    # --- ET Model Params (Copied from Random Forest, they are compatible) ---
    config['n_estimators']      = trial.suggest_int("n_estimators", 50, 500, step=50)
    config['max_depth']         = trial.suggest_int("max_depth", 5, 30, log=True)
    config['criterion']         = trial.suggest_categorical("criterion", ['gini', 'entropy', 'log_loss'])
    config['min_samples_split'] = trial.suggest_int("min_samples_split", 2, 20)
    config['min_samples_leaf']  = trial.suggest_int("min_samples_leaf", 1, 10)
    config['max_features']      = trial.suggest_categorical("max_features", ['sqrt', 'log2', 0.5, 0.7, 1.0, None])
    config['bootstrap']         = trial.suggest_categorical("bootstrap", [True])
    config['class_weight']      = trial.suggest_categorical("class_weight", [None, "balanced", "balanced_subsample"])
    config['max_samples']       = trial.suggest_categorical("max_samples", [None, 0.5, 0.7, 0.9])

    # ExtraTrees specific parameter, usually kept at 0 or tuned
    config['min_impurity_decrease'] = trial.suggest_float("min_impurity_decrease", 0.0, 0.1)
    return config

  @classmethod
  def train_predict(cls, X_tr, X_val, y_tr, config):
      model = ExtraTreesClassifier(
        n_estimators=config['n_estimators'],
        max_depth=config['max_depth'],
        criterion=config['criterion'],
        min_samples_split=config['min_samples_split'],
        min_samples_leaf=config['min_samples_leaf'],
        max_features=config['max_features'],
        bootstrap=config['bootstrap'],
        random_state=RANDOM_STATE,
        class_weight=config['class_weight'],
        max_samples=config['max_samples'],
        min_impurity_decrease=config['min_impurity_decrease'], # Added ET specific parameter
        n_jobs=-1
      )

      model.fit(X_tr, y_tr)
      return model.predict_proba(X_val)[:, 1]

## XGBoost

In [74]:
from xgboost import XGBClassifier

class XGBoostModel(ModelClass):
  @classmethod
  def update_data_config(cls, config, trial):
    # --- XGBoost Model Params ---
    config['n_estimators']      = trial.suggest_int("n_estimators", 50, 1000, step=50)
    config['learning_rate']     = trial.suggest_float("learning_rate", 0.01, 0.3, log=True)
    config['max_depth']         = trial.suggest_int("max_depth", 3, 15)
    config['min_child_weight']  = trial.suggest_int("min_child_weight", 1, 10)
    config['subsample']         = trial.suggest_float("subsample", 0.5, 1.0)
    config['colsample_bytree']  = trial.suggest_float("colsample_bytree", 0.5, 1.0)
    config['gamma']             = trial.suggest_float("gamma", 0, 5)
    config['reg_alpha']         = trial.suggest_float("reg_alpha", 1e-8, 1.0, log=True)
    config['reg_lambda']        = trial.suggest_float("reg_lambda", 1e-8, 1.0, log=True)
    config['balance_strategy']  = trial.suggest_categorical("balance_strategy", ["None", "Balanced"])
    return config

  @classmethod
  def train_predict(cls, X_tr, X_val, y_tr, config):
    # Calculate scale_pos_weight dynamicallly if strategy is Balanced
    scale_pos_weight = 1.0
    if config['balance_strategy'] == "Balanced":
      scale_pos_weight = (len(y_tr) - np.sum(y_tr)) / np.sum(y_tr)

    # Initialize and Train XGBoost
    model = XGBClassifier(
      n_estimators=config['n_estimators'],
      learning_rate=config['learning_rate'],
      max_depth=config['max_depth'],
      min_child_weight=config['min_child_weight'],
      subsample=config['subsample'],
      colsample_bytree=config['colsample_bytree'],
      gamma=config['gamma'],
      reg_alpha=config['reg_alpha'],
      reg_lambda=config['reg_lambda'],
      scale_pos_weight=scale_pos_weight,
      random_state=RANDOM_STATE,
      n_jobs=-1,
      tree_method='hist',
      objective='binary:logistic',
      eval_metric='logloss'
    )

    model.fit(X_tr, y_tr)
    return model.predict_proba(X_val)[:, 1]

## KNN

In [75]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler

class KnnModel(ModelClass):
  @classmethod
  def update_data_config(cls, config, trial):
      # --- KNN Model Params ---
      config['n_neighbors']  = trial.suggest_int("n_neighbors", 3, 100)
      config['weights']      = trial.suggest_categorical("weights", ['uniform', 'distance'])
      config['metric']       = trial.suggest_categorical("metric", ['euclidean', 'manhattan', 'cosine', 'minkowski'])
      config['algorithm']    = 'auto'

      # --- Validation Params ---
      config['scale_data']   = trial.suggest_categorical("scale_data", [True, False])

      # Conditional params for minkowski
      config['p'] = trial.suggest_int("p", 1, 5) if config['metric'] == 'minkowski' else 2
      return config

  @classmethod
  def train_predict(cls, X_tr, X_val, y_tr, config):
    if config['scale_data']:
      scaler = StandardScaler()
      X_tr = scaler.fit_transform(X_tr)
      X_val = scaler.transform(X_val)

    # Initialize Model
    model = KNeighborsClassifier(
      n_neighbors=config['n_neighbors'],
      weights=config['weights'],
      metric=config['metric'],
      p=config['p'],
      algorithm=config['algorithm'],
      n_jobs=-1
    )

    # Fit
    model.fit(X_tr, y_tr)
    return model.predict_proba(X_val)[:, 1]


## Mlp

In [76]:
EPOCHS = 15
device = 'cuda'

def create_model(config, input_size):
    layers = []
    in_features = input_size
    activation_layer = getattr(nn, config['activation'])

    for out_features in config['layer_sizes']:
        layers.append(nn.Linear(in_features, out_features))
        if config['layer_norm']:
            layers.append(nn.LayerNorm(out_features))
        layers.append(activation_layer())

        if config['dropout'] > 0:
            layers.append(nn.Dropout(config['dropout']))
        in_features = out_features

    layers.append(nn.Linear(in_features, 1))
    return nn.Sequential(*layers)


class MlpModel(ModelClass):
    @classmethod
    def update_data_config(cls, config, trial):
        # --- Architecture Params ---
        config['n_layers'] = trial.suggest_int("n_layers", 1, 3)
        config['layer_norm'] = trial.suggest_categorical("layer_norm", [True, False])
        config['activation'] = trial.suggest_categorical("activation", [
            "ReLU", "LeakyReLU", "GELU", "SiLU", "Mish", "Hardswish", "ELU", "Tanh", "Sigmoid"
        ])
        config['dropout'] = trial.suggest_float("dropout", 0.0, 0.5)

        # --- Training Params ---
        config['lr'] = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
        config['batch_size'] = trial.suggest_categorical("batch_size", [32, 64])
        config['threshold'] = trial.suggest_float("threshold", 0.1, 0.7)
        config['weight_decay'] = trial.suggest_float("weight_decay", 1e-8, 1e-1, log=True)

        # --- Dynamic Layer Sizes ---
        config['layer_sizes'] = [
            trial.suggest_int(f"n_units_l{i}", 64, 512, step=64)
            for i in range(config['n_layers'])
        ]
        return config

    @classmethod
    def evaluate_model_intern(cls, X_tr, X_val, y_tr, config):
        pass # Not used but required by base class

    @classmethod
    def train_predict(cls, config):
        # --- Setup Data for Cross-Validation ---
        kf = StratifiedKFold(n_splits=NUM_FOLDS, shuffle=True, random_state=RANDOM_STATE)
        loss_fn = nn.BCEWithLogitsLoss()

        all_fold_scores = []

        # Get data using the shared loader
        X_current = data_loader.get_data(config['data_type'], config)
        y_train = data_loader.y_train

        for fold, (train_idx, val_idx) in enumerate(kf.split(X_current, y_train)):
            X_tr, X_val = X_current[train_idx], X_current[val_idx]
            y_tr, y_val = y_train[train_idx], y_train[val_idx]

            # --- SMOTE ---
            if config['use_smote']:
                from imblearn.over_sampling import SMOTE
                try:
                    smote = SMOTE(k_neighbors=config['smote_k'], sampling_strategy=config['smote_ratio'], random_state=RANDOM_STATE)
                    X_tr, y_tr = smote.fit_resample(X_tr, y_tr)
                except ValueError:
                    pass

            # Prepare Tensors
            X_tr_t = torch.tensor(X_tr, dtype=torch.float32).to(device)
            y_tr_t = torch.tensor(y_tr.reshape(-1, 1), dtype=torch.float32).to(device)
            X_val_t = torch.tensor(X_val, dtype=torch.float32).to(device)

            train_ds = TensorDataset(X_tr_t, y_tr_t)
            train_dl = DataLoader(train_ds, batch_size=config['batch_size'], shuffle=True)

            # Build Model
            model = create_model(config, input_size=X_current.shape[1]).to(device)

            # Optimizer & Scheduler
            optimizer = optim.Adam(model.parameters(), lr=config['lr'], weight_decay=config['weight_decay'])
            scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
                optimizer, mode='max', factor=0.5, patience=3
            )

            fold_curve = []

            # Training Loop
            for epoch in range(EPOCHS):
                model.train()
                for X_b, y_b in train_dl:
                    optimizer.zero_grad()
                    out = model(X_b)
                    loss = loss_fn(out, y_b)
                    loss.backward()
                    optimizer.step()

                # Validation
                model.eval()
                with torch.no_grad():
                    val_logits = model(X_val_t)
                    val_probs = torch.sigmoid(val_logits).cpu().numpy()
                    val_preds = (val_probs > config['threshold']).astype(int)
                    score = f1_score(y_val, val_preds, average='macro')

                # Update Scheduler
                scheduler.step(score)
                fold_curve.append(score)

            all_fold_scores.append(fold_curve)

        all_fold_scores = np.array(all_fold_scores)
        mean_curve = np.mean(all_fold_scores, axis=0)
        final_score = np.max(mean_curve)

        return final_score

## Search

In [84]:
if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model = 'ext'
    DB_FILE = f'sqlite:///{file_path}/Optuna/optuna_{model}.db'

    print(f"Starting Optimization for model: {model}")

    study_names = {
        'knn': 'knn_study',
        'mlp': 'mlp_study' ,
        'reg': 'reg_study',
        'xgb': 'xgb_study',
        'for': 'for_study',
        'ext': 'ext_study',
    }

    objectives = {
        'knn': KnnModel.objective,
        'mlp': MlpModel.objective,
        'reg': LogisticRegressionModel.objective,
        'xgb': XGBoostModel.objective,
        'for': RandomForestModel.objective,
        'ext': ExtraTreesModel.objective
    }

    study = optuna.create_study(
        study_name=study_names[model],
        storage=DB_FILE,
        direction="maximize",
        load_if_exists=True
    )
    study.optimize(objectives[model], n_trials=10)

Starting Optimization for model: ext


[I 2025-12-03 01:44:57,125] A new study created in RDB with name: ext_study
[I 2025-12-03 01:45:10,691] Trial 0 finished with value: 0.5864794953608744 and parameters: {'data_type': 'SVD', 'use_smote': False, 'threshold': 0.23906287362135661, 'use_Isolation type': True, 'use_Location': False, 'use_Isolation source': True, 'use_Testing standard': True, 'input_size_pca_svd': 64, 'n_estimators': 250, 'max_depth': 28, 'criterion': 'entropy', 'min_samples_split': 6, 'min_samples_leaf': 3, 'max_features': 1.0, 'bootstrap': True, 'class_weight': 'balanced_subsample', 'max_samples': 0.7, 'min_impurity_decrease': 0.013988513867563102}. Best is trial 0 with value: 0.5864794953608744.
[I 2025-12-03 01:46:16,230] Trial 1 finished with value: 0.5995485345325254 and parameters: {'data_type': 'CHI2', 'use_smote': False, 'threshold': 0.45779835358098153, 'use_Isolation type': True, 'use_Location': False, 'use_Isolation source': False, 'use_Testing standard': False, 'input_size_chi2': 130, 'chi2_thresh

## Visualization

In [78]:
!pip install plotly



In [79]:
import optuna
from optuna.visualization import (
    plot_optimization_history,
    plot_param_importances,
    plot_slice,
    plot_contour,
    plot_parallel_coordinate
)

print(study_names[model])
study = optuna.load_study(study_name=study_names[model], storage=DB_FILE)

print(f"\nBest F1 Score (Macro): {study.best_value:.4f}")
print("Best Config Found:")
for key, value in study.best_params.items():
    print(f"  {key}: {value}")

# fig1 = plot_optimization_history(study)
# fig1.show()

# fig2 = plot_param_importances(study)
# fig2.show()

# fig3 = plot_slice(study)
# fig3.show()

# fig4 = plot_parallel_coordinate(study)
# fig4.show()

knn_study

Best F1 Score (Macro): 0.8140
Best Config Found:
  data_type: SVD
  use_smote: True
  threshold: 0.5229996009368388
  use_Isolation type: False
  use_Location: True
  use_Isolation source: True
  use_Testing standard: True
  input_size_pca_svd: 128
  smote_k: 3
  smote_ratio: 0.3345491559902423
  n_neighbors: 50
  weights: distance
  metric: cosine
  scale_data: True


# Predictions

In [81]:
if __name__ == '__main__':
    model_name = 'xgb'

    model_classes = {
      'knn': KnnModel,
      'mlp': MlpModel,
      'reg': LogisticRegressionModel,
      'xgb': XGBoostModel,
      'for': RandomForestModel,
    }

    TargetClass = model_classes[model_name]

    db_file = f'sqlite:///{file_path}/Optuna/optuna_{model_name}.db'
    study_name = study_names[model_name]

    print(f"--- Loading Study: {study_name} ---")
    study = optuna.load_study(study_name=study_name, storage=db_file)
    best_trial = study.best_trial

    print(f"Best CV F1 Score found: {best_trial.value:.4f}")

    # Reconstruct the full configuration dictionary using FixedTrial
    fixed_trial = optuna.trial.FixedTrial(best_trial.params)
    best_config = TargetClass.create_data_config(fixed_trial)

    print(f"\n--- Training {TargetClass.__name__} on Full Data and Predicting ---")

    # Generate class predictions and save them.
    # The 'predict' method handles all data preparation, training, and saving.
    final_preds = TargetClass.predict(best_config, proba=False, save=True)

    print(f"\nPrediction complete. Total test samples predicted: {len(final_preds)}")

--- Loading Study: xgb_study ---
Best CV F1 Score found: 0.8294

--- Training XGBoostModel on Full Data and Predicting ---
Saved submission to Submission_XGBoost_20251203_0135_PREDS.csv

Prediction complete. Total test samples predicted: 1092
