Load dataloaders

In [None]:
import numpy as np
import optuna
from aim.optuna import AimCallback
from sklearn.model_selection import StratifiedKFold
from torch import nn, tensor
import os
from pathlib import Path
from torch import optim
from torch.utils.data import DataLoader
import polars as pl
import torch
from sklearn.model_selection import train_test_split
from aim import Text

from src.cci.dataset.dataset import TransitionDataset
from src.cci.dataset.transforms import CropSample, RandomSample, ToTensor

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Running with device: {torch.cuda.get_device_name(DEVICE)}")

# Split train/test
clean_df = pl.read_csv("data/clean_df.csv").with_row_index()
(labels,) = clean_df.select("Class Label")
labels = labels.to_numpy()

train_val_idx, test_idx = train_test_split(
    range(len(clean_df)),
    stratify=labels,
    test_size=0.1,
)

train_val_df = clean_df.filter(pl.col("index").is_in(train_val_idx))
test_df = clean_df.filter(pl.col("index").is_in(test_idx))


# Reindex
train_val_df = train_val_df.drop("index").with_row_index()
train_val_labels = labels[train_val_idx]


sample_length = 1500
root_dir = Path(os.environ["OOCHA_DIR"])


# Tuner
aim_callback = AimCallback(metric_name=["f1", "loss"], as_multirun=True, experiment_name="MLP_parameter_search")


def suggest_mlp(trial: optuna.Trial) -> list[nn.Module]:
    n_hidden_layers = trial.suggest_int("n_hidden_layers", 1, 3)
    i = 1
    layers: list[nn.Module] = [
        nn.Flatten(),
        nn.Dropout(trial.suggest_float(f"dropout_{i}", 0.1, 0.5)),
    ]
    features = trial.suggest_int(f"linear_{i}", 1, 1000)
    prev_features = features
    layers.append(nn.LazyLinear(features))
    layers.append(nn.ReLU())
    for _ in range(n_hidden_layers):
        i += 1
        features = trial.suggest_int(f"linear_{i}", 1, 1000)

        layers.append(nn.Dropout(trial.suggest_float(f"dropout_{i}", 0.1, 0.5)))
        layers.append(nn.Linear(prev_features, features))
        layers.append(nn.ReLU())
        prev_features = features
    i += 1
    layers.append(nn.Dropout(trial.suggest_float(f"dropout_{i}", 0.1, 0.5)))
    layers.append(nn.Linear(prev_features, 1))
    return layers


@aim_callback.track_in_aim()
def objective(trial: optuna.Trial):
    aim_callback.experiment["dataset"] = {
        "samples": 1500,
        "preprocessing": {},
        "set": "data/clean_df.csv",
        "test_set": {"augmentation": "random_shift"},
    }

    optimizer_name = trial.suggest_categorical("Optimizer", ["Adam", "RMSprop", "SGD"])
    lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
    batch_size = trial.suggest_categorical("batch_size", [4, 8, 16, 32])
    model_layers = suggest_mlp(trial)

    fold = StratifiedKFold(n_splits=5, shuffle=True, random_state=0)
    for fold_idx, (train_idx, val_idx) in enumerate(fold.split(np.zeros(len(train_val_df)), labels[train_val_idx])):
        train_df = train_val_df.filter(pl.col("index").is_in(train_idx))
        val_df = train_val_df.filter(pl.col("index").is_in(val_idx))
        train_dataset = TransitionDataset(
            train_df,
            root_dir,
            transforms=[
                RandomSample(sample_length),
                ToTensor(),
            ],
        )
        val_dataset = TransitionDataset(
            val_df,
            root_dir,
            transforms=[
                CropSample(sample_length),
                ToTensor(),
            ],
        )
        train_loader = DataLoader(
            train_dataset,
            batch_size=batch_size,
            shuffle=True,
        )
        val_loader = DataLoader(
            val_dataset,
            batch_size=batch_size,
            shuffle=True,
        )

        model = nn.Sequential(*model_layers).to(DEVICE)
        if fold_idx == 0:
            aim_callback.experiment.track(Text(f"{model}"), "Model Architecture")
        opt = getattr(optim, optimizer_name)(model.parameters(), lr=lr)

        loss_fn = nn.BCEWithLogitsLoss(
            pos_weight=tensor(train_dataset.get_pos_weight()),
        )
        val_loss_fn = nn.BCEWithLogitsLoss()

        # Fit

    return 2, 3


study = optuna.create_study(directions=["maximize", "minimize"])
study.set_metric_names(["f1", "loss"])


study.optimize(objective, n_trials=10, callbacks=[aim_callback])