# Example - MNIST optimization with Pytorch

Here you can see an example on how to optimize a model made with Pytorch on the popular dataset MNIST.


## Imports

We start by importing some useful stuff.

In [1]:
# Some useful packages
from typing import Union, Tuple, Dict
import numpy as np
import pandas as pd
import pprint
import os
import enum
import tqdm


# Pytorch
import torch
from torch import nn, optim
from torch.utils.data import Subset, Dataset, DataLoader, TensorDataset
from torchvision.datasets.mnist import MNIST
from torchvision import transforms
from torchvision.transforms import ToTensor, ConvertImageDtype, Compose

# Importing the HPOptimizer and the RandomHpSearch from the AutoMLpy package.
from AutoMLpy import HpOptimizer, RandomHpSearch

## Dataset

Now we load the MNIST dataset in the pytorch way.

In [2]:
BASE_PATH = '~/examples/pytorch_datasets/'

def get_torch_MNIST_datasets(seed: int = 42, path=os.path.join(BASE_PATH, 'mnist'), **kwargs):
    train_split_ratio = 0.8

    np.random.seed(seed)

    mnist_transforms = Compose(
        [
            ToTensor(),
            ConvertImageDtype(torch.float),
            transforms.Lambda(lambda x: x/1.),
         ]
    )

    print("Downloading MNIST dataset...")
    full_train_dataset = MNIST(path, train=True, download=True, transform=mnist_transforms)
    test_dataset = MNIST(path, train=False, download=True, transform=mnist_transforms)
    print("Downloading MNIST dataset --> Done")

    indices = list(range(len(full_train_dataset)))
    np.random.shuffle(indices)

    split_index = np.floor(train_split_ratio * len(full_train_dataset)).astype(int)

    train_indices = indices[:split_index]
    train_dataset = Subset(full_train_dataset, train_indices)

    valid_indices = indices[split_index:]
    valid_dataset = Subset(full_train_dataset, valid_indices)

    return dict(train=train_dataset, valid=valid_dataset, test=test_dataset)


def get_torch_MNIST_X_y(**kwargs):
    datasets = get_torch_MNIST_datasets(**kwargs)
    X_y_dict = {phase: dict(x=[], y=[]) for phase in datasets}
    for phase, dataset in datasets.items():
        for x, y in dataset:
            X_y_dict[phase]["x"].append(x)
            X_y_dict[phase]["y"].append(y)
    for phase in X_y_dict:
        X_y_dict[phase]["x"] = torch.stack(X_y_dict[phase]["x"], dim=0)
        X_y_dict[phase]["y"] = torch.LongTensor(X_y_dict[phase]["y"])
    return X_y_dict

## Torch Model

Now we make a class that return a torch.nn model given a set of hyper-parameters (hp).


In [3]:
class MnistNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.backbone = nn.Sequential(
            nn.Conv2d(1, 10, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d((2, 2)),
            nn.Conv2d(10, 20, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d((2, 2)),
        )

        self.clf = nn.Sequential(
            nn.Flatten(),
            nn.Linear(20 * 7 * 7, 84),
            nn.ReLU(),
            nn.Linear(84, 10),
        )

    def forward(self, x):
        if len(x.shape) == 3:
            x = x[:, np.newaxis, :, :]
        feat = self.backbone(x)
        logits = self.clf(feat)
        return logits

## The training functions


In [4]:
class PhaseType(enum.Enum):
    train = 0
    val = 1
    test = 2


def train_pytorch_network(
        network,
        loaders,
        verbose: bool = False,
        **training_kwargs,
):
    """
    Fit the given network with the given training data.

    Parameters
    ----------
    network: The neural network to fit.
    loaders: The data loaders as a dictionary with keys: {train, valid}.
    verbose: True to show some training stats else False.
    training_kwargs:
        optimiser (torch.optim): The optimizer used to make the weights updates.
        momentum (float): The momentum of the optimiser if the optimiser is not given.
        nesterov (bool): The nesterov of the optimiser if the optimiser is not given.
        use_cuda (bool): True to use cuda device else False.

    Returns
    -------
    last train accuracy, last validation accuracy, the training history.
    """

    training_kwargs.setdefault(
        "optimizer",
        torch.optim.SGD(
            (p for p in network.parameters() if p.requires_grad),
            lr=training_kwargs.get("lr", 1e-3),
            momentum=training_kwargs.get("momentum", 0.9),
            nesterov=training_kwargs.get("nesterov", True),
        )
    )

    training_kwargs.setdefault(
        "criterion",
        torch.nn.CrossEntropyLoss()
    )

    history = []
    nb_epochs = training_kwargs.get("epochs", 5)

    for epoch in range(nb_epochs):
        epoch_logs = {}
        train_logs = execute_phase(network, loaders["train"], PhaseType.train, verbose, **training_kwargs)
        epoch_logs["train"] = train_logs

        if "valid" in loaders:
            val_logs = execute_phase(network, loaders["valid"], PhaseType.val, verbose, **training_kwargs)
            epoch_logs["val"] = val_logs

        history.append(epoch_logs)

    return history


def execute_phase(
    network: nn.Module,
    data_loader: DataLoader,
    phase_type: PhaseType = PhaseType.train,
    verbose: bool = False,
    **kwargs
) -> Dict[str, float]:
    """
    Execute a training phase on a network. The possible phase are {train, val, test}.

    Parameters
    ----------
    network: The model to fit.
    data_loader: The data loader used to make the current training phase.
    phase_type: The phase type in {train, val, test}.
    verbose: True to show some training stats else False.
    kwargs:
        use_cuda (bool): True to use cuda device else False.

    Returns
    -------
    The phase logs.
    """
    if phase_type == PhaseType.train:
        network.train()
    else:
        network.eval()

    if kwargs.get("use_cuda", True):
        device = "cuda"
        if torch.cuda.is_available():
            network.to(device)
    else:
        device = "cpu"
        network.to(device)

    if "scheduler" in kwargs and kwargs["scheduler"] is not None:
        kwargs["scheduler"].step()

    phase_logs = {"loss": 0, "acc": 0}

    if verbose:
        phase_progress = tqdm.tqdm(range(len(data_loader)), unit="batch")
        phase_progress.set_description_str(f"Phase: {phase_type.name}")
    for j, (inputs, targets) in enumerate(data_loader):
        if device == "cuda":
            if torch.cuda.is_available():
                inputs = inputs.float().to(device)
                targets = targets.to(device)

        batch_logs = execute_batch_training(network, inputs, targets, phase_type, verbose, **kwargs)
        for metric_name, metric in batch_logs.items():
            phase_logs[metric_name] = (j * phase_logs[metric_name] + metric) / (j + 1)

        if verbose:
            phase_progress.update()
            phase_progress.set_postfix_str(' '.join([str(_m)+': '+str(f"{_v:.5f}")
                                                     for _m, _v in phase_logs.items()]))
    if verbose:
        phase_progress.close()
    return phase_logs


def execute_batch_training(
    network: nn.Module,
    inputs,
    targets,
    phase_type: PhaseType = PhaseType.train,
    verbose: bool = False,
    **kwargs
) -> Dict[str, float]:
    """
    Execute a training batch on a network.

    Parameters
    ----------
    network: The model to fit.
    inputs: The inputs of the model.
    targets: The targets of the model.
    phase_type: The phase type in {train, val, test}.
    verbose: True to show some training stats else False.
    kwargs:
        optimiser (torch.optim): The optimizer used to make the weights updates.

    Returns
    -------
    Batch logs as dict.
    """
    network.zero_grad()
    output = network(inputs)

    batch_logs = dict(loss=kwargs["criterion"](output, targets))

    if phase_type == PhaseType.train:
        batch_logs["loss"].backward()
        kwargs["optimizer"].step()

    batch_logs['acc'] = np.mean((torch.argmax(output, dim=-1) == targets).cpu().detach().numpy())

    batch_logs["loss"] = batch_logs["loss"].cpu().detach().numpy()
    return batch_logs

## The Optimizer Model

It's time to implement the optimizer model. You just have to implement the following methods: "build_model",
"fit_model_" and "score". Those methods must respect their signature and output type. The objective here is to make the building, the training and the score phase depend on some hyper-parameters. So the optimizer can use those to find the best set of hp.


In [5]:
class TorchMNISTHpOptimizer(HpOptimizer):
    def __init__(self, use_cuda: bool = True):
        self.use_cuda = use_cuda
    
    def build_model(self, **hp) -> torch.nn.Module:
        model = MnistNet()
        if torch.cuda.is_available() and self.use_cuda:
            model.cuda()
        return model

    def fit_model_(
            self,
            model: torch.nn.Module,
            X: Union[np.ndarray, pd.DataFrame, torch.Tensor],
            y: Union[np.ndarray, torch.Tensor],
            verbose=False,
            **hp
    ) -> object:
        if hp.get("pre_normalized", True):
            X = X/torch.max(X)

        optimizer = optim.SGD(model.parameters(),
                              lr=hp.get("learning_rate", 1e-3),
                              momentum=hp.get("momentum", 0.9),
                              nesterov=hp.get("nesterov", True))

        train_pytorch_network(
            model,
            loaders=dict(
                train=DataLoader(
                    TensorDataset(torch.FloatTensor(X), torch.LongTensor(y)),
                    batch_size=hp.get("batch_size", 32),
                    num_workers=2,
                    shuffle=True
                )
            ),
            verbose=verbose,
            optimizer=optimizer,
            use_cuda=self.use_cuda,
            **hp
        )

        return model

    def score(
            self,
            model: torch.nn.Module,
            X: Union[np.ndarray, pd.DataFrame, torch.Tensor],
            y: Union[np.ndarray, torch.Tensor],
            **hp
    ) -> float:
        if hp.get("pre_normalized", True):
            X = X/torch.max(X)

        model_device = next(model.parameters()).device
        if isinstance(X, torch.Tensor):
            X = X.float().to(model_device)
            y = y.to(model_device)
        test_acc = np.mean((torch.argmax(model(X), dim=-1) == y).cpu().detach().numpy())
        return test_acc


## Execution & Optimization

First thing after creating our classes is to load the dataset in memory.


In [6]:
mnist_X_y_dict = get_torch_MNIST_X_y()
mnist_hp_optimizer = TorchMNISTHpOptimizer(use_cuda=False)

Downloading MNIST dataset...
Downloading MNIST dataset --> Done


After you will define your hyper-parameters space with a dictionary like this.

In [7]:
hp_space = dict(
    epochs=list(range(1, 16)),
    batch_size=[16, 32],
    learning_rate=np.linspace(1e-4, 1e-1, 50),
    nesterov=[True, False],
    momentum=np.linspace(0.01, 0.99, 50),
    pre_normalized=[False, True],
)

It's time to define you hp search algorithm and give it your budget in time and iteration.

In [8]:
param_gen = RandomHpSearch(hp_space, max_seconds=60*5, max_itr=1_000)

Finally, you start the optimization by giving your parameter generator to the optimize method. Note that the "stop_criterion" argument is to stop the optimization when the given score is reached. It's really useful to save some time.

In [9]:
save_kwargs = dict(
    save_name=f"tf_mnist_hp_opt",
    title="Random search: MNIST",
)

param_gen = mnist_hp_optimizer.optimize(
    param_gen,
    mnist_X_y_dict["train"]["x"],
    mnist_X_y_dict["train"]["y"],
    n_splits=2,
    stop_criterion=1.0,
    save_kwargs=save_kwargs,
)

  0%|▌                                                                                                                                                                         | 3/1000 [05:08<28:29:42, 102.89s/itr, mean_score: 0.98]


## Testing

Now, you can test the optimized hyper-parameters by fitting again with the full train dataset. Yes with the full dataset, because in the optimization phase a cross-validation is made which crop your train dataset by half. Plus, it's time to test the fitted model on the test dataset.


In [10]:
opt_hp = param_gen.get_best_param()

model = mnist_hp_optimizer.build_model(**opt_hp)
mnist_hp_optimizer.fit_model_(
    model,
    mnist_X_y_dict["train"]["x"],
    mnist_X_y_dict["train"]["y"],
    **opt_hp
)

test_acc = mnist_hp_optimizer.score(
    model.cpu(),
    mnist_X_y_dict["test"]["x"],
    mnist_X_y_dict["test"]["y"],
    **opt_hp
)
print(f"test_acc: {test_acc*100:.3f}%")

test_acc: 98.980%


The optimized hyper-parameters:

In [11]:
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(opt_hp)

{   'batch_size': 32,
    'epochs': 11,
    'learning_rate': 0.06330204081632654,
    'momentum': 0.27,
    'nesterov': True,
    'pre_normalized': True}


## Visualization

You can visualize the optimization with an interactive html file.

In [13]:
fig = param_gen.write_optimization_to_html(show=True, dark_mode=True, **save_kwargs)