Define settings

In [1]:
from mltrainer import ReportTypes, Trainer, TrainerSettings, metrics
from pathlib import Path
from loguru import logger

batch_size = 64
# batch_size = 32


Get dataset

In [2]:
from mads_datasets import DatasetFactoryProvider, DatasetType
from mltrainer.preprocessors import BasePreprocessor
from typing import Iterator

def get_streamers(batchsize: int) -> tuple[Iterator, Iterator]:
    fashionfactory = DatasetFactoryProvider.create_factory(DatasetType.FASHION)
    preprocessor = BasePreprocessor()
    streamers = fashionfactory.create_datastreamer(
        batchsize=batchsize, preprocessor=preprocessor
    )
    train = streamers["train"]
    valid = streamers["valid"]
    trainstreamer = train.stream()
    validstreamer = valid.stream()
    output_size = 10
    return trainstreamer, validstreamer, output_size

# def get_streamers(batchsize: int) -> tuple[Iterator, Iterator]:
#     flowersfactory = DatasetFactoryProvider.create_factory(DatasetType.FLOWERS)
#     preprocessor = BasePreprocessor()
#     streamers = flowersfactory.create_datastreamer(
#         batchsize=batchsize, preprocessor=preprocessor
#     )
#     train = streamers["train"]
#     valid = streamers["valid"]
#     trainstreamer = train.stream()
#     validstreamer = valid.stream()
#     output_size = 5
#     return trainstreamer, validstreamer, output_size

In [3]:
trainstreamer, validstreamer, output_size = get_streamers(batch_size)

x, y = next(trainstreamer)

logger.info(f"Fashion images shape: {x.shape}, labels shape: {y.shape}")

[32m2025-09-25 08:05:23.524[0m | [1mINFO    [0m | [36mmads_datasets.base[0m:[36mdownload_data[0m:[36m121[0m - [1mFolder already exists at /home/KIEI/.cache/mads_datasets/fashionmnist[0m
[32m2025-09-25 08:05:23.525[0m | [1mINFO    [0m | [36mmads_datasets.base[0m:[36mdownload_data[0m:[36m124[0m - [1mFile already exists at /home/KIEI/.cache/mads_datasets/fashionmnist/fashionmnist.pt[0m
[32m2025-09-25 08:05:23.525[0m | [1mINFO    [0m | [36mmads_datasets.base[0m:[36mdownload_data[0m:[36m124[0m - [1mFile already exists at /home/KIEI/.cache/mads_datasets/fashionmnist/fashionmnist.pt[0m
[32m2025-09-25 08:05:23.574[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m5[0m - [1mFashion images shape: torch.Size([64, 1, 28, 28]), labels shape: torch.Size([64])[0m
[32m2025-09-25 08:05:23.574[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m5[0m - [1mFashion images shape: torch.Size([64, 1, 28, 28]), labels shape: torch.Size([6

Set machine type

In [4]:
import torch

def get_device() -> str:
    if torch.backends.mps.is_available() and torch.backends.mps.is_built():
        return "mps"
    elif torch.cuda.is_available():
        return "cuda:0"
    else:
        return "cpu"

device = get_device()
device

'cpu'

Setup MLFlow & work directory

In [5]:
import mlflow
from pathlib import Path

def setup_mlflow() -> None:
    mlflow.set_tracking_uri("sqlite:///mlflow.db")

setup_mlflow()

def set_model_dir(model_dir: str) -> None:
    modeldir = Path(model_dir).resolve()
    if not modeldir.exists():
        modeldir.mkdir(parents=True)
        logger.info(f"Created {modeldir}")
    return modeldir

model_dir = set_model_dir(model_dir="models")

Setup model

In [None]:
from torch import nn

class CustomCNN(nn.Module):
    def __init__(
        self,
        filters,
        units1=128,
        units2=64,
        output_size=10,
        kernel_size=3,
        stride=1,
        pooling_layer=nn.MaxPool2d,
        padding="valid",
        input_size=(batch_size, 1, 28, 28),
        dropout1=0.5,
        dropout2=0.3,
        use_batchnorm=True,
    ):
        super().__init__()
        self.in_channels = input_size[1]
        self.input_size = input_size
        self.filters = filters
        self.units1 = units1
        self.units2 = units2

        pad = self._get_pad(padding, kernel_size)
        layers = []
        # First conv block
        layers.append(nn.Conv2d(self.in_channels, filters, kernel_size=kernel_size, stride=stride, padding=pad))
        if use_batchnorm:
            layers.append(nn.BatchNorm2d(filters))
        layers.append(nn.ReLU())
        layers.append(pooling_layer(kernel_size=2))
        # Second conv block
        layers.append(nn.Conv2d(filters, filters, kernel_size=kernel_size, stride=stride, padding=pad))
        if use_batchnorm:
            layers.append(nn.BatchNorm2d(filters))
        layers.append(nn.ReLU())
        layers.append(pooling_layer(kernel_size=2))
        # Third conv block
        layers.append(nn.Conv2d(filters, filters, kernel_size=kernel_size, stride=stride, padding=pad))
        if use_batchnorm:
            layers.append(nn.BatchNorm2d(filters))
        layers.append(nn.ReLU())
        layers.append(pooling_layer(kernel_size=2))
        self.convolutions = nn.Sequential(*layers)

        activation_map_size = self._conv_test(input_size)
        logger.info(f"Aggregating activation map with size {activation_map_size}")
        self.agg = nn.AvgPool2d(activation_map_size)

        dense_layers = []
        dense_layers.append(nn.Flatten()),

        dense_layers.append(nn.Dropout(p=dropout1)),
        dense_layers.append(nn.Linear(filters, units1)),
        if use_batchnorm:
            dense_layers.append(nn.BatchNorm1d(units1))
        dense_layers.append(nn.ReLU())
        dense_layers.append(nn.Dropout(p=dropout2))
        dense_layers.append(nn.Linear(units1, units2))
        if use_batchnorm:
            dense_layers.append(nn.BatchNorm1d(units2))
        dense_layers.append(nn.ReLU())
        dense_layers.append(nn.Linear(units2, output_size))
        self.dense = nn.Sequential(*dense_layers)

    def _get_pad(self, padding, kernel_size):
        if isinstance(padding, str):
            if padding == "same":
                return (kernel_size - 1) // 2
            elif padding == "valid":
                return 0
            else:
                raise ValueError(f"Unknown padding: {padding}")
        return padding  # If already int

    def _conv_test(self, input_size=(batch_size, 1, 28, 28)):
        x = torch.ones(input_size)
        x = self.convolutions(x)
        return x.shape[-2:]

    def forward(self, x):
        x = self.convolutions(x)
        x = self.agg(x)
        logits = self.dense(x)
        return logits


Setup training

In [9]:
from hyperopt import fmin, STATUS_OK, tpe, Trials
from datetime import datetime
from torch import optim, nn
from hyperopt import hp


def objective(params):
    # End any previous MLflow run if still active
    if mlflow.active_run() is not None:
        mlflow.end_run()
    try:
        with mlflow.start_run():
            mlflow.set_experiment(experiment_name)
            mlflow.set_tag("model", "CNN")
            # Log parameters specific to this model
            mlflow.log_params({
                "batch_size": batch_size,
                "epochs": params["epochs"],
                "filters": params["filters"],
                "padding": params["padding"],
                "kernel_size": params["kernel_size"],
                "stride": params["stride"],
                "pooling_method": params["pooling_method"],
                "units1": params["units1"],
                "units2": params["units2"],
                "dropout1": params["dropout1"],
                "dropout2": params["dropout2"],
                "use_batchnorm": params["use_batchnorm"],
            })
            
            filters = params["filters"]
            kernel_size = params["kernel_size"]
            stride = params["stride"]
            pooling_layer = params["pooling_method"]
            units1 = params["units1"]
            units2 = params["units2"]
            epochs = params["epochs"]
            padding = params["padding"]
            dropout1 = params.get("dropout1", 0.5)
            dropout2 = params.get("dropout2", 0.3)

            model = CustomCNN(
                # input_size=(batch_size, 1, 28, 28),
                input_size=x.shape,
                filters=filters,
                units1=units1,
                units2=units2,
                output_size=output_size,
                kernel_size=kernel_size,
                stride=stride,
                pooling_layer=pooling_layer,
                padding=padding,
                dropout1=dropout1,
                dropout2=dropout2,
                use_batchnorm=True,
            ).to(device)

            # Print model summary
            logger.info(model)
            logger.info(f"Input size: {x.shape}")
            with torch.no_grad():
                x_input = x.to(device)
                if x_input.dim() == 2:
                    x_input = x_input.unsqueeze(1)
                for i, layer in enumerate(model.convolutions):
                    x_input = layer(x_input)
                logger.info(f"After convolutions[{i}] ({layer.__class__.__name__}): {x_input.shape}")
                x_input = model.agg(x_input)
                logger.info(f"After agg ({model.agg.__class__.__name__}): {x_input.shape}")
                x_input = model.dense[0](x_input)
                logger.info(f"After dense[0] (Flatten): {x_input.shape}")
                x_input = model.dense[1](x_input)
                logger.info(f"After dense[1] (Linear): {x_input.shape}")
                x_input = model.dense[2](x_input)
                logger.info(f"After dense[2] (ReLU): {x_input.shape}")
                x_input = model.dense[3](x_input)
                logger.info(f"After dense[3] (Linear): {x_input.shape}")
                x_input = model.dense[4](x_input)
                logger.info(f"After dense[4] (ReLU): {x_input.shape}")
                x_input = model.dense[5](x_input)
                logger.info(f"After dense[5] (Linear): {x_input.shape}")

            train_settings = TrainerSettings(
                epochs=epochs,
                reporttypes=[ReportTypes.MLFLOW, ReportTypes.TOML],
                metrics=[metrics.Accuracy()],
                logdir=model_dir,
                train_steps=100,
                valid_steps=100,
            )

            trainer = Trainer(
                model=model,
                optimizer=optim.Adam,
                loss_fn=torch.nn.CrossEntropyLoss(),
                scheduler=optim.lr_scheduler.ReduceLROnPlateau,
                traindataloader=trainstreamer,
                validdataloader=validstreamer,
                settings=train_settings,
                device=device,
            )
            trainer.loop()

            tag = datetime.now().strftime("%Y%m%d-%H%M")
            modelpath = model_dir / (tag + "model.pt")
            logger.info(f"Saving model to {modelpath}")
            torch.save(model, modelpath)

            mlflow.log_artifact(local_path=str(modelpath), artifact_path="pytorch_models")
            return {"loss": trainer.test_loss, "status": STATUS_OK}
    except Exception as e:
        logger.warning(f"Training failed due to error: {e}")
        return {"loss": 9999, "status": STATUS_OK}

# search_space = {
#     "filters": hp.choice("filters", [8, 64, 128]),
#     "kernel_size": hp.choice("kernel_size", [1, 2, 3]),
#     "stride": hp.choice("stride", [1, 2]),
#     "pooling_method": hp.choice("pooling_method", [nn.MaxPool2d, nn.AvgPool2d]),
#     "padding": hp.choice("padding", ["same", "valid"]),
#     "units1": hp.choice("units1", [128]),
#     "units2": hp.choice("units2", [64]),
#     "epochs": hp.choice("epochs", [10]),
# }

# search_space = {
#     "filters": hp.choice("filters", [64, 128, 256]),
#     "kernel_size": hp.choice("kernel_size", [1, 2, 3]),
#     "stride": hp.choice("stride", [1, 2, 3]),
#     "pooling_method": hp.choice("pooling_method", [nn.MaxPool2d, nn.AvgPool2d]),
#     # "padding": hp.choice("padding", ["same", "valid"]),
#     "padding": hp.choice("padding", ["valid"]),
#     "units1": hp.choice("units1", [128]),
#     "units2": hp.choice("units2", [64]),
#     "epochs": hp.choice("epochs", [5]),
# }

search_space = {
    "filters": hp.choice("filters", [175]),
    "kernel_size": hp.choice("kernel_size", [2]),
    "stride": hp.choice("stride", [1]),
    "pooling_method": hp.choice("pooling_method", [nn.MaxPool2d]),
    "padding": hp.choice("padding", ["same"]),
    "units1": hp.choice("units1", [128]),
    "units2": hp.choice("units2", [64]),
    "epochs": hp.choice("epochs", [50]),
    # "dropout1": hp.choice("dropout1", [0, 0.3, 0.5]),
    "dropout1": hp.choice("dropout1", [0.6]),
    # "dropout2": hp.choice("dropout2", [0, 0.3, 0.5]),
    "dropout2": hp.choice("dropout2", [0.6]),
    "use_batchnorm": hp.choice("use_batchnorm", [True]),
}


experiment_name = "Experiment gridsearch CNN improvements"
mlflow.set_experiment(experiment_name)

best_result = fmin(
    fn=objective,
    space=search_space,
    max_evals=1,
    algo=tpe.suggest,
    trials=Trials()
)

logger.info(f"Best result: {best_result}")

  0%|          | 0/1 [00:00<?, ?trial/s, best loss=?]

[32m2025-09-25 11:16:49.472[0m | [1mINFO    [0m | [36m__main__[0m:[36m__init__[0m:[36m49[0m - [1mAggregating activation map with size torch.Size([2, 2])[0m
[32m2025-09-25 11:16:49.474[0m | [1mINFO    [0m | [36m__main__[0m:[36mobjective[0m:[36m59[0m - [1mCustomCNN(
  (convolutions): Sequential(
    (0): Conv2d(1, 175, kernel_size=(2, 2), stride=(1, 1))
    (1): BatchNorm2d(175, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(175, 175, kernel_size=(2, 2), stride=(1, 1))
    (5): BatchNorm2d(175, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU()
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(175, 175, kernel_size=(2, 2), stride=(1, 1))
    (9): BatchNorm2d(175, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU()
    (11): MaxP

100%|██████████| 1/1 [05:51<00:00, 351.74s/trial, best loss: 0.37647830694913864]

[32m2025-09-25 11:22:40.961[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m166[0m - [1mBest result: {'dropout1': np.int64(0), 'dropout2': np.int64(0), 'epochs': np.int64(0), 'filters': np.int64(0), 'kernel_size': np.int64(0), 'padding': np.int64(0), 'pooling_method': np.int64(0), 'stride': np.int64(0), 'units1': np.int64(0), 'units2': np.int64(0), 'use_batchnorm': np.int64(0)}[0m



