In [None]:
! pip install catalyst[ml]==22.02rc0 scikit-learn==1.0.0 optuna==2.7.0 --upgrade

In [None]:
# import os
# os.environ["CUDA_VISIBLE_DEVICES"] = "0"

# Customization is all you need

This demo shows several examples for Catalyst runs customization for a variety of needs. If you want to get used to [Catalyst](https://github.com/catalyst-team/catalyst) first, please follow the [minimal examples section](https://github.com/catalyst-team/catalyst#minimal-examples) first.

To start from, let's prepare simple data to work with.

In [None]:
import os
import random
import numpy as np
from sklearn.datasets import make_moons, make_blobs

In [None]:
from typing import *

import torch
from torch import nn
from torch import optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, TensorDataset

from catalyst import dl, metrics

In [None]:
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
# make up a dataset
def make_dataset(seed=42, n_samples=int(1e3)):
    np.random.seed(seed)
    random.seed(seed)
    X, y = make_moons(n_samples=n_samples, noise=0.1)

    y = y*2 - 1 # make y be -1 or 1
    return X, y

def visualize_dataset(X, y):
    plt.figure(figsize=(5,5))
    plt.scatter(X[:,0], X[:,1], c=y, s=20, cmap='jet')

# let's create train data
X_train, y_train = make_dataset()
visualize_dataset(X_train, y_train)

In [None]:
# valid data
X_valid, y_valid = make_dataset(seed=137)
visualize_dataset(X_valid, y_valid)

In [None]:
# and another train one (why not?)
X_train2, y_train2 = make_dataset(seed=1337)
visualize_dataset(X_train2, y_train2)

In [None]:
# initialize a model 
# 2-layer neural network
model = nn.Sequential(
    nn.Linear(2, 16), nn.ReLU(), 
    nn.Linear(16, 16), nn.ReLU(), 
    nn.Linear(16, 1)
)
print(model)
# print("number of parameters", len(model.parameters()))

In [None]:
def visualize_decision_boundary(X, y, model):
    h = 0.25
    x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
    y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
    xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                         np.arange(y_min, y_max, h))
    Xmesh = np.c_[xx.ravel(), yy.ravel()]
    
    inputs = torch.tensor([list(xrow) for xrow in Xmesh]).float()
    scores = model(inputs)
    
    Z = np.array([s.data > 0 for s in scores])
    Z = Z.reshape(xx.shape)

    fig = plt.figure()
    plt.contourf(xx, yy, Z, cmap=plt.cm.Spectral, alpha=0.8)
    plt.scatter(X[:, 0], X[:, 1], c=y, s=40, cmap=plt.cm.Spectral)
    plt.xlim(xx.min(), xx.max())
    plt.ylim(yy.min(), yy.max())
    plt.show()
    return fig

In [None]:
_ = visualize_decision_boundary(X_valid, y_valid, model)

In [None]:
t1 = TensorDataset(torch.tensor(X_train).float(), torch.tensor(y_train > 0).float())
t2 = TensorDataset(torch.tensor(X_train2).float(), torch.tensor(y_train2 > 0).float())
v1 = TensorDataset(torch.tensor(X_valid).float(), torch.tensor(y_valid > 0).float())

loaders = {
    "train_1": DataLoader(t1, batch_size=32, num_workers=1), 
    "train_2": DataLoader(t2, batch_size=32, num_workers=1), 
    "valid": DataLoader(v1, batch_size=32, num_workers=1), 
}

---

### Act 1 - ``CustomRunner – batch handling by you own``

First case – defining everything by hands and `.run`. PyTorch for-loop wrapper example.

In [None]:
class CustomRunner(dl.IRunner):
    @property
    def num_epochs(self) -> int:
        return 5

    def get_engine(self) -> dl.IEngine:
        return dl.DeviceEngine()    
    
    def get_loaders(self) -> "OrderedDict[str, DataLoader]":
        return loaders
    
    def get_model(self):
        return nn.Sequential(
            nn.Linear(2, 16), nn.ReLU(), 
            nn.Linear(16, 16), nn.ReLU(), 
            nn.Linear(16, 1)
        )

    def get_optimizer(self, model):
        return torch.optim.Adam(model.parameters(), lr=0.02)
    
    def handle_batch(self, batch):
        x, y = batch
        y_hat = self.model(x)

        loss = F.binary_cross_entropy_with_logits(y_hat.view(-1), y)
        self.batch_metrics = {"loss": loss}
        if self.loader_batch_step % 10 == 0:
            print(
                f"{self.loader_key} ({self.loader_batch_step}/{self.loader_batch_len}:" 
                f"loss {loss.item()}"
            )

        if self.is_train_loader:
            loss.backward()
            self.optimizer.step()
            self.optimizer.zero_grad()

runner = CustomRunner().run()
model = runner.model

In [None]:
_ = visualize_decision_boundary(X_valid, y_valid, model)

---

### Act 2 - ``SupervisedRunner – Runner with Callbacks``

Same example, but with extra loggers and callbacks to simplify the run.

In [None]:
class CustomSupervisedRunner(dl.IRunner):
    @property
    def num_epochs(self) -> int:
        return 5

    def get_engine(self) -> dl.IEngine:
        return dl.DeviceEngine()
    
    def get_loggers(self):
        return {
            "console": dl.ConsoleLogger(),
    #         "csv": dl.LogdirLogger(logdir="./logdir02"),
            "tensorboard": dl.TensorboardLogger(logdir="./logdir02/tb"),
        }
    
    def get_loaders(self) -> "OrderedDict[str, DataLoader]":
        return loaders
    
    def get_model(self):
        return nn.Sequential(
            nn.Linear(2, 16), nn.ReLU(), 
            nn.Linear(16, 16), nn.ReLU(), 
            nn.Linear(16, 1)
        )

    def get_criterion(self):
        return nn.BCEWithLogitsLoss()

    def get_optimizer(self, model):
        return torch.optim.Adam(model.parameters(), lr=0.02)

    def get_scheduler(self, optimizer):
        return torch.optim.lr_scheduler.MultiStepLR(optimizer, [2, 4])
    
    def get_callbacks(self):
        return {
            # Let's use AUC metric as an example – it's loader-based, so we shouldn't compute it on each batch
            "auc": dl.LoaderMetricCallback(
                metric=metrics.AUCMetric(),
                input_key="scores", target_key="targets", 
            ), 
            # To wrap the criterion step logic, you could use CriterionCallback:
            "criterion": dl.CriterionCallback(
                metric_key="loss", 
                input_key="logits", 
                target_key="targets"
            ), 
            # To wrap the optimizer step logic, you could use BackwardCallback & OptimizerCallback:
            "backward": dl.BackwardCallback(metric_key="loss"), 
            "optimizer": dl.OptimizerCallback(metric_key="loss"), 
            # The same case with the scheduler:
            "scheduler": dl.SchedulerCallback(
                loader_key="valid", metric_key="loss"
            ),
            # We could also use lrfinder for lr scheduling:
#             "lr-finder": dl.LRFinder(
#                 final_lr=1.0,
#                 scale="log",
#                 num_steps=None,
#                 optimizer_key=None,
#             ),
            # You can select any number of metrics to checkpoint on:
            "checkpoint1": dl.CheckpointCallback(
                logdir="./logdir02/auc",
                loader_key="valid", metric_key="auc", 
                minimize=False, topk=3
            ),
            "checkpoint2": dl.CheckpointCallback(
                logdir="./logdir02/loss",
                loader_key="valid", metric_key="loss", 
                minimize=True, topk=1
            ),
            # Or turn on/off tqdm verbose during loader run:
            "verbose": dl.TqdmCallback(),
        }
    
    def handle_batch(self, batch):
        x, y = batch
        y_hat = self.model(x)
        
        self.batch = {
            "features": x,
            "targets": y,
            "logits": y_hat.view(-1),
            "scores": torch.sigmoid(y_hat.view(-1)),
        }

runner = CustomSupervisedRunner().run()
model = runner.model

In [None]:
_ = visualize_decision_boundary(X_valid, y_valid, model)

---

### Act 3 - ``CustomMetric``

Suppose you would like to add some custom metric to your pipeline...

In [None]:
class CustomAccuracyMetric(metrics.ICallbackBatchMetric, metrics.AdditiveMetric):
    def update(self, scores: torch.Tensor, targets: torch.Tensor) -> float:
        value = ((scores > 0.5) == targets).float().mean().item()
        value = super().update(value, len(targets))
        return value
    
    def update_key_value(self, scores: torch.Tensor, targets: torch.Tensor) -> Dict[str, float]:
        value = self.update(scores, targets)
        return {"accuracy": value}

    def compute_key_value(self) -> Dict[str, float]:
        mean, std = super().compute()
        return {"accuracy": mean, "accuracy/std": std}

    
class CustomSupervisedRunner(dl.IRunner):
    @property
    def num_epochs(self) -> int:
        return 5

    def get_engine(self) -> dl.IEngine:
        return dl.DeviceEngine()
    
    def get_loggers(self):
        return {
            "console": dl.ConsoleLogger(),
            "tensorboard": dl.TensorboardLogger(logdir="./logdir03/tb"),
        }
    
    def get_loaders(self) -> "OrderedDict[str, DataLoader]":
        return loaders
    
    def get_model(self):
        return nn.Sequential(
            nn.Linear(2, 16), nn.ReLU(), 
            nn.Linear(16, 16), nn.ReLU(), 
            nn.Linear(16, 1)
        )

    def get_criterion(self):
        return nn.BCEWithLogitsLoss()

    def get_optimizer(self, model):
        return torch.optim.Adam(model.parameters(), lr=0.02)

    def get_scheduler(self, optimizer):
        return torch.optim.lr_scheduler.MultiStepLR(optimizer, [2, 4])
    
    def get_callbacks(self):
        return {
            "accuracy": dl.BatchMetricCallback(
                metric=CustomAccuracyMetric(), log_on_batch=True,
                input_key="scores", target_key="targets", 
            ),
            "auc": dl.LoaderMetricCallback(
                metric=metrics.AUCMetric(),
                input_key="scores", target_key="targets", 
            ), 
            "criterion": dl.CriterionCallback(
                metric_key="loss", 
                input_key="logits", 
                target_key="targets"
            ), 
            "backward": dl.BackwardCallback(metric_key="loss"), 
            "optimizer": dl.OptimizerCallback(metric_key="loss"), 
            "scheduler": dl.SchedulerCallback(
                loader_key="valid", metric_key="loss"
            ),
            "checkpoint1": dl.CheckpointCallback(
                logdir="./logdir03/accuracy",
                loader_key="valid", metric_key="accuracy", 
                minimize=False, topk=3
            ),
            "checkpoint2": dl.CheckpointCallback(
                logdir="./logdir03/loss",
                loader_key="valid", metric_key="loss", 
                minimize=True, topk=1
            ),
    #         "verbose": dl.TqdmCallback(),
        }
    
    def handle_batch(self, batch):
        x, y = batch
        y_hat = self.model(x)
        
        self.batch = {
            "features": x,
            "targets": y,
            "logits": y_hat.view(-1),
            "scores": torch.sigmoid(y_hat.view(-1)),
        }

runner = CustomSupervisedRunner().run()
model = runner.model

In [None]:
_ = visualize_decision_boundary(X_valid, y_valid, model)

---

### Act 4 - ``CustomCallback``

Or some custom functionality – Callback.

In [None]:
# Let's plot the decision doundary after each epoch:
class VisualizationCallback(dl.Callback):
    def __init__(self):
        super().__init__(order=dl.CallbackOrder.External)

    def on_epoch_end(self, runner):
        img = visualize_decision_boundary(X_valid, y_valid, runner.model)


class CustomSupervisedRunner(dl.IRunner):
    @property
    def num_epochs(self) -> int:
        return 5

    def get_engine(self) -> dl.IEngine:
        return dl.DeviceEngine("cpu")
    
    def get_loggers(self):
        return {
            "console": dl.ConsoleLogger(),
            "tensorboard": dl.TensorboardLogger(logdir="./logdir04/tb"),
        }
    
    def get_loaders(self) -> "OrderedDict[str, DataLoader]":
        return loaders
    
    def get_model(self):
        return nn.Sequential(
            nn.Linear(2, 16), nn.ReLU(), 
            nn.Linear(16, 16), nn.ReLU(), 
            nn.Linear(16, 1)
        )

    def get_criterion(self):
        return nn.BCEWithLogitsLoss()

    def get_optimizer(self, model):
        return torch.optim.Adam(model.parameters(), lr=0.02)

    def get_scheduler(self, optimizer):
        return torch.optim.lr_scheduler.MultiStepLR(optimizer, [2, 4])
    
    def get_callbacks(self):
        return {
            "criterion": dl.CriterionCallback(
                metric_key="loss", 
                input_key="logits", 
                target_key="targets"
            ), 
            "backward": dl.BackwardCallback(metric_key="loss"), 
            "optimizer": dl.OptimizerCallback(metric_key="loss"), 
            "scheduler": dl.SchedulerCallback(
                loader_key="valid", metric_key="loss"
            ),
            "checkpoint": dl.CheckpointCallback(
                logdir="./logdir04/loss",
                loader_key="valid", metric_key="loss", 
                minimize=True, topk=1
            ),
            # And include it into callbacks:        
            "visualization": VisualizationCallback()
        }
    
    def handle_batch(self, batch):
        x, y = batch
        y_hat = self.model(x)
        
        self.batch = {
            "features": x,
            "targets": y,
            "logits": y_hat.view(-1),
            "scores": torch.sigmoid(y_hat.view(-1)),
        }

runner = CustomSupervisedRunner().run()
model = runner.model

---

### Act 5 - ``CustomLogger``

But what if you also want custom logging? For your private pipeline storage, maybe?

In [None]:
import io
import cv2
import numpy as np
import matplotlib.pyplot as plt

def get_img_from_fig(fig, dpi=180):
    buf = io.BytesIO()
    fig.savefig(buf, format="png", dpi=dpi)
    buf.seek(0)
    
    img_arr = np.frombuffer(buf.getvalue(), dtype=np.uint8)
    buf.close()
    img = cv2.imdecode(img_arr, 1)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img

In [None]:
# We need to add only a few lines to log the image to all runner's loggers
class VisualizationCallback(dl.Callback):
    def __init__(self):
        super().__init__(order=dl.CallbackOrder.External)

    def on_epoch_end(self, runner):
        image = visualize_decision_boundary(X_valid, y_valid, runner.model)
        image = get_img_from_fig(image)
        # runner will propagate it to all loggers
        runner.log_image(tag="decision_boundary", image=image, scope="epoch")


# Let's also add our own Logger to store image on the disk
class VisualizationLogger(dl.ILogger):
    def __init__(self, logdir: str):
        self.logdir = logdir
        os.makedirs(self.logdir, exist_ok=True)
        
    def log_image(
        self,
        tag: str,
        image: np.ndarray,
        runner,
        scope: str = None,
    ) -> None:
        if scope == "epoch":
            plt.imsave(
                os.path.join(self.logdir, f"{tag}_{runner.epoch_step}.png"),
                image,
            )


class CustomSupervisedRunner(dl.IRunner):
    @property
    def num_epochs(self) -> int:
        return 5

    def get_engine(self) -> dl.IEngine:
        return dl.DeviceEngine()
    
    def get_loggers(self):
        return {
            "console": dl.ConsoleLogger(),
            "visualization": VisualizationLogger(logdir="./logdir05/visualization"),
            "tensorboard": dl.TensorboardLogger(logdir="./logdir05/tb"),
        }
    
    def get_loaders(self) -> "OrderedDict[str, DataLoader]":
        return loaders
    
    def get_model(self):
        return nn.Sequential(
            nn.Linear(2, 16), nn.ReLU(), 
            nn.Linear(16, 16), nn.ReLU(), 
            nn.Linear(16, 1)
        )

    def get_criterion(self):
        return nn.BCEWithLogitsLoss()

    def get_optimizer(self, model):
        return torch.optim.Adam(model.parameters(), lr=0.02)

    def get_scheduler(self, optimizer):
        return torch.optim.lr_scheduler.MultiStepLR(optimizer, [2, 4])
    
    def get_callbacks(self):
        return {
            "criterion": dl.CriterionCallback(
                metric_key="loss", 
                input_key="logits", 
                target_key="targets"
            ), 
            "backward": dl.BackwardCallback(metric_key="loss"), 
            "optimizer": dl.OptimizerCallback(metric_key="loss"), 
            "scheduler": dl.SchedulerCallback(
                loader_key="valid", metric_key="loss"
            ),
            "checkpoint": dl.CheckpointCallback(
                logdir="./logdir05/loss",
                loader_key="valid", metric_key="loss", 
                minimize=True, topk=1
            ),
            "visualization": VisualizationCallback()
        }
    
    def handle_batch(self, batch):
        x, y = batch
        y_hat = self.model(x)
        
        self.batch = {
            "features": x,
            "targets": y,
            "logits": y_hat.view(-1),
            "scores": torch.sigmoid(y_hat.view(-1)),
        }

runner = CustomSupervisedRunner().run()
model = runner.model

In [None]:
! ls ./logdir05
! ls ./logdir05/loss
! ls ./logdir05/tb
! ls ./logdir05/visualization

---

### Act 6 - Confusion Matrix logging - IMetric+ICallback+ILogger

In the end, let's review a simple example with a bunch of metrics, loggers, and confusion matrix computation.

In [None]:
# useful in colab:
# %load_ext tensorboard
# tensorboard --logdir=./logdir06/

In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from catalyst import dl, metrics, utils

# sample data
num_samples, num_features, num_classes = int(1e4), int(1e1), 6
num_epochs = 6

class CustomSupervisedRunner(dl.IRunner):
    @property
    def num_epochs(self) -> int:
        return num_epochs

    def get_engine(self) -> dl.IEngine:
        return dl.DeviceEngine("cpu")
    
    def get_loggers(self):
        return {
            "console": dl.ConsoleLogger(),
            "csv": dl.CSVLogger(logdir="./logdir06"),
            "tensorboard": dl.TensorboardLogger(logdir="./logdir06/tb"),
        }
    
    def get_loaders(self) -> "OrderedDict[str, DataLoader]":
        # sample data
        num_samples, num_features, num_classes = int(1e4), int(1e1), 6
        X = torch.rand(num_samples, num_features)
        y = (torch.rand(num_samples, ) * num_classes).to(torch.int64)

        # pytorch loaders
        dataset = TensorDataset(X, y)
        loader = DataLoader(dataset, batch_size=32, num_workers=1)
        loaders = {"train": loader, "valid": loader}
        return loaders
    
    def get_model(self):
        return torch.nn.Linear(num_features, num_classes)

    def get_criterion(self):
        return torch.nn.CrossEntropyLoss()

    def get_optimizer(self, model):
        return torch.optim.Adam(model.parameters())

    def get_scheduler(self, optimizer):
        return torch.optim.lr_scheduler.MultiStepLR(optimizer, [2])
    
    def get_callbacks(self):
        return {
            "accuracy": dl.BatchMetricCallback(
                metric=metrics.AccuracyMetric(num_classes=num_classes),
                input_key="probs", target_key="targets", 
            ),
            "auc": dl.LoaderMetricCallback(
                metric=metrics.AUCMetric(),
                input_key="scores", target_key="targets", 
            ), 
            "criterion": dl.CriterionCallback(
                metric_key="loss", 
                input_key="logits", 
                target_key="targets",
            ), 
            "backward": dl.BackwardCallback(metric_key="loss"), 
            "optimizer": dl.OptimizerCallback(metric_key="loss"), 
            "scheduler": dl.SchedulerCallback(
                loader_key="valid", metric_key="loss"
            ),
            "checkpoint1": dl.CheckpointCallback(
                logdir="./logdir06/loss",
                loader_key="valid", metric_key="loss", 
                minimize=False, topk=3
            ),
            "checkpoint2": dl.CheckpointCallback(
                logdir="./logdir06/auc",
                loader_key="valid", metric_key="auc", 
                minimize=True, topk=1
            ),
            "checkpoint3": dl.CheckpointCallback(
                logdir="./logdir06/accuracy",
                loader_key="valid", metric_key="accuracy01", 
                minimize=True, topk=1,
            ),
            "verbose": dl.TqdmCallback(),
            "confusion_matrix": dl.ConfusionMatrixCallback(
                input_key="probs", 
                target_key="targets",
                prefix="confusion_matrix",
                num_classes=num_classes,
            )
        }
    
    def handle_batch(self, batch):
        x, y = batch
        y_hat = self.model(x)
        
        self.batch = {
            "features": x,
            "targets": y,
            "logits": y_hat,
            "scores": torch.sigmoid(y_hat),
            "probs": torch.softmax(y_hat, dim=1),
        }

runner = CustomSupervisedRunner().run()
model = runner.model

---

Congrats! You have finished the Catalyst customization tutorial and are now ready to create your own different pipelines and R&D breakthroughs. See you at the next stage!