In [None]:
# ! pip install git+https://github.com/catalyst-team/catalyst@dev --upgrade

In [None]:
# ! pip install scikit-learn>=0.20 optuna

###  Kittylyst demo

In [None]:
import random
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_moons, make_blobs
%matplotlib inline

In [None]:
from typing import *

import torch
from torch import nn
from torch import optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, TensorDataset

from catalyst import dl, metrics

In [None]:
# make up a dataset
def make_dataset(seed=42, n_samples=100):
    np.random.seed(seed)
    random.seed(seed)
    X, y = make_moons(n_samples=100, noise=0.1)

    y = y*2 - 1 # make y be -1 or 1
    return X, y

def visualize_dataset(X, y):
    plt.figure(figsize=(5,5))
    plt.scatter(X[:,0], X[:,1], c=y, s=20, cmap='jet')

# let's create train data
X_train, y_train = make_dataset()
visualize_dataset(X_train, y_train)

In [None]:
# valid data
X_valid, y_valid = make_dataset(seed=137)
visualize_dataset(X_valid, y_valid)

In [None]:
# and another train one (why not?)
X_train2, y_train2 = make_dataset(seed=1337)
visualize_dataset(X_train2, y_train2)

In [None]:
# initialize a model 
# 2-layer neural network
model = nn.Sequential(
    nn.Linear(2, 16), nn.ReLU(), 
    nn.Linear(16, 16), nn.ReLU(), 
    nn.Linear(16, 1)
)
print(model)
# print("number of parameters", len(model.parameters()))

In [None]:
def visualize_decision_boundary(X, y, model):
    h = 0.25
    x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
    y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
    xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                         np.arange(y_min, y_max, h))
    Xmesh = np.c_[xx.ravel(), yy.ravel()]
    
    inputs = torch.tensor([list(xrow) for xrow in Xmesh]).float()
    scores = model(inputs)
    
    Z = np.array([s.data > 0 for s in scores])
    Z = Z.reshape(xx.shape)

    fig = plt.figure()
    plt.contourf(xx, yy, Z, cmap=plt.cm.Spectral, alpha=0.8)
    plt.scatter(X[:, 0], X[:, 1], c=y, s=40, cmap=plt.cm.Spectral)
    plt.xlim(xx.min(), xx.max())
    plt.ylim(yy.min(), yy.max())
    plt.show()
    return fig

In [None]:
_ = visualize_decision_boundary(X_valid, y_valid, model)

In [None]:
t1 = TensorDataset(torch.tensor(X_train).float(), torch.tensor(y_train > 0).float())
t2 = TensorDataset(torch.tensor(X_train2).float(), torch.tensor(y_train2 > 0).float())
v1 = TensorDataset(torch.tensor(X_valid).float(), torch.tensor(y_valid > 0).float())

loaders = {
    "train_1": DataLoader(t1, batch_size=32, num_workers=1), 
    "train_2": DataLoader(t2, batch_size=32, num_workers=1), 
    "valid": DataLoader(v1, batch_size=32, num_workers=1), 
}

---

### Act 1 - ``CustomRunner`` solution

Suppose you have your favorite `for-loop` pipeline and want to get rid of `for-for-for` stuff to make code more clean. In this case you can define your own ``CustomRunner`` with ``_handle_batch`` method and just run it.

Whole ``Runner`` source code is located [here](github.com/Scitator/kittylyst/blob/master/kittylyst/runner.py) (~100 lines of code).

In [None]:
# 2-layer neural network
model = nn.Sequential(
    nn.Linear(2, 16), nn.ReLU(), 
    nn.Linear(16, 16), nn.ReLU(), 
    nn.Linear(16, 1)
)
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)
experiment = dl.SingleStageExperiment(
    model=model, 
    optimizer=optimizer, 
    loaders=loaders, 
    num_epochs=10
)

class CustomRunner(dl.IStageBasedRunner):
    def handle_batch(self, batch):
        x, y = batch
        y_hat = self.model(x)

        loss = F.binary_cross_entropy_with_logits(y_hat.view(-1), y)
        self.batch_metrics = {"loss": loss}
        print(
            f"{self.loader_key} ({self.loader_batch_step}/{self.loader_batch_len}:" 
            f"loss {loss.item()}"
        )

        if self.is_train_loader:
            loss.backward()
            self.optimizer.step()
            self.optimizer.zero_grad()

CustomRunner().run(experiment)

In [None]:
_ = visualize_decision_boundary(X_valid, y_valid, model)

---

### Act 2 - ``SupervisedRunner`` solution

Let's make a bit more abstract. Let's introduce ``SupervisedRunner``, that knows how to execute Supervised models and ``CriterionCallback/OptimizerCallback/SchedulerCallback`` for typical criterions/optimizers/schedulers steps. 

Additionaly let's wrap accuracy with ``AccuracyCallback``, as far as metrics are general too. 

Finally, let's make our logs looks consistent with ``LoggerCallback``.

Whole ``Callback`` source code is located [here](github.com/Scitator/kittylyst/blob/master/kittylyst/callback.py) (~100 lines of code).

In [None]:
num_epochs=6
model = nn.Sequential(
    nn.Linear(2, 16), nn.ReLU(), 
    nn.Linear(16, 16), nn.ReLU(), 
    nn.Linear(16, 1)
)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

experiment = dl.SingleStageExperiment(
    model=model, 
    criterion=criterion, 
    optimizer=optimizer, 
    scheduler=scheduler,
    loaders=loaders, 
    num_epochs=num_epochs,
    callbacks={
        "criterion": dl.CriterionCallback(
            metric_key="loss", 
            outputs_key="logits", 
            targets_key="targets"
        ), 
#         "accuracy": dl.MetricCallback(
#             metric=metrics.AccuracyMetric(), compute_on_batch=True,
#             outputs_key="logits", targets_key="targets", 
#         ),
        "auc": dl.MetricCallback(
            metric=metrics.AUCMetric(), compute_on_batch=False,
            outputs_key="scores", targets_key="targets", 
        ), 
        "optimizer": dl.OptimizerCallback(metric_key="loss"), 
        "scheduler": dl.SchedulerCallback(
            loader_key="valid", metric_key="loss"
        ),
        "checkpoint1": dl.CheckpointCallback(
            loader_key="valid", metric_key="auc", 
            minimize=False, save_n_best=3
        ),
        "checkpoint2": dl.CheckpointCallback(
            loader_key="valid", metric_key="loss", 
            minimize=True, save_n_best=1
        ),
        "verbose": dl.VerboseCallback(),
    },
    loggers={
        "console": dl.ConsoleLogger(),
        "csv": dl.LogdirLogger(logdir="./logdir"),
    }
)

class CustomRunner(dl.IStageBasedRunner):
    def handle_batch(self, batch):
        x, y = batch
        y_hat = self.model(x)
        
        self.batch = {
            "features": x,
            "targets": y,
            "logits": y_hat.view(-1),
            "scores": torch.sigmoid(y_hat.view(-1)),
        }

CustomRunner().run(experiment)

In [None]:
_ = visualize_decision_boundary(X_valid, y_valid, model)

---

### Act 3 - ``CustomExperiment`` solution - multi-stage experiment with ``VisualizationCallback``

Let's go even harder. Suppose we want to firstly train our model on `train_1` data, and only after that - on `train_2`. This case could be easily handled with ``CustomExperiment``, where you could redefine your experiment components for each stage of your experiment.

Whole ``Experiemnt`` source code is located [here](github.com/Scitator/kittylyst/blob/master/kittylyst/experiment.py) (~100 lines of code).

In [None]:
import io
import cv2
import numpy as np
import matplotlib.pyplot as plt

def get_img_from_fig(fig, dpi=180):
    buf = io.BytesIO()
    fig.savefig(buf, format="png", dpi=dpi)
    buf.seek(0)
    
    img_arr = np.frombuffer(buf.getvalue(), dtype=np.uint8)
    buf.close()
    img = cv2.imdecode(img_arr, 1)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img

In [None]:
class CustomExperiment(dl.SingleStageExperiment):
    @property
    def stages(self):
        return self._stage

    def get_loaders(self, stage: str):
        return self._loaders[stage]

    
# class VisualizationCallback(ICallback):
#     def on_epoch_end(self, runner):
#         img = visualize_decision_boundary(X_valid, y_valid, runner.model)
#         img = get_img_from_fig(img)
#         runner.log_image(img, scope="epoch")

        
loaders = {
    "stage_1": {
        "train_1": DataLoader(t1, batch_size=32, num_workers=1), 
        "valid": DataLoader(v1, batch_size=32, num_workers=1), 
    },
    "stage_2": {
        "train_2": DataLoader(t2, batch_size=32, num_workers=1), 
        "valid": DataLoader(v1, batch_size=32, num_workers=1), 
    },
}
stages = loaders.keys()
num_epochs=10

model = nn.Sequential(
    nn.Linear(2, 16), nn.ReLU(), 
    nn.Linear(16, 16), nn.ReLU(), 
    nn.Linear(16, 1)
)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.02)
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

experiment = CustomExperiment(
    model=model, 
    criterion=criterion, 
    optimizer=optimizer, 
    scheduler=scheduler,
    loaders=loaders, 
    num_epochs=num_epochs,
    callbacks={
        "criterion": dl.CriterionCallback(
            metric_key="loss", 
            outputs_key="logits", 
            targets_key="targets"
        ), 
#         "accuracy": dl.MetricCallback(
#             metric=metrics.AccuracyMetric(), compute_on_batch=True,
#             outputs_key="logits", targets_key="targets", 
#         ),
        "auc": dl.MetricCallback(
            metric=metrics.AUCMetric(), compute_on_batch=False,
            outputs_key="scores", targets_key="targets", 
        ), 
        "optimizer": dl.OptimizerCallback(metric_key="loss"), 
        "scheduler": dl.SchedulerCallback(
            loader_key="valid", metric_key="loss"
        ),
        "checkpoint1": dl.CheckpointCallback(
            loader_key="valid", metric_key="auc", 
            minimize=False, save_n_best=3
        ),
        "checkpoint2": dl.CheckpointCallback(
            loader_key="valid", metric_key="loss", 
            minimize=True, save_n_best=1
        ),
#         "visualization": VisualizationCallback(),
#         "verbose": VerboseCallback(),
        
    },
    loggers={
#         "console": ConsoleLogger(exclude=["batch", "global_batch", "loader", "global_epoch"]),
        "console": dl.ConsoleLogger(),
        "csv": dl.LogdirLogger(logdir="./logdir2"),
    },
    stage=stages  # <-- here is the trick for multi-stage support
)

class CustomRunner(dl.IStageBasedRunner):
    def handle_batch(self, batch):
        x, y = batch
        y_hat = self.model(x)
        
        self.batch = {
            "features": x,
            "targets": y,
            "logits": y_hat.view(-1),
            "scores": torch.sigmoid(y_hat.view(-1)),
        }

CustomRunner().run(experiment)

In [None]:
_ = visualize_decision_boundary(X_valid, y_valid, model)

### Act 4 - integration with hyperparameter search

In [None]:
from datetime import datetime
import optuna    

class CustomRunner(dl.IStageBasedRunner):
    def handle_batch(self, batch):
        x, y = batch
        y_hat = self.model(x)
        
        self.batch = {
            "features": x,
            "targets": y,
            "logits": y_hat.view(-1),
            "scores": torch.sigmoid(y_hat.view(-1)),
        }

def objective(trial):
    num_epochs = 6
    num_hidden1 = int(trial.suggest_loguniform("num_hidden1", 2, 16))
    num_hidden2 = int(trial.suggest_loguniform("num_hidden2", 2, 16))
    
    loaders = {
        "train_1": DataLoader(t1, batch_size=32, num_workers=1), 
        "train_2": DataLoader(t2, batch_size=32, num_workers=1), 
        "valid": DataLoader(v1, batch_size=32, num_workers=1), 
    }

    model = nn.Sequential(
        nn.Linear(2, num_hidden1), nn.ReLU(), 
        nn.Linear(num_hidden1, num_hidden2), nn.ReLU(), 
        nn.Linear(num_hidden2, 1)
    )
    criterion = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.02)
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])

    experiment = dl.SingleStageExperiment(
        model=model, 
        criterion=criterion, 
        optimizer=optimizer, 
        scheduler=scheduler,
        loaders=loaders, 
        num_epochs=num_epochs,
        callbacks={
            "criterion": dl.CriterionCallback(
                metric_key="loss", 
                outputs_key="logits", 
                targets_key="targets"
            ), 
    #         "accuracy": dl.MetricCallback(
    #             metric=metrics.AccuracyMetric(), compute_on_batch=True,
    #             outputs_key="logits", targets_key="targets", 
    #         ),
            "auc": dl.MetricCallback(
                metric=metrics.AUCMetric(), compute_on_batch=False,
                outputs_key="scores", targets_key="targets", 
            ), 
            "optimizer": dl.OptimizerCallback(metric_key="loss"), 
            "scheduler": dl.SchedulerCallback(
                loader_key="valid", metric_key="loss"
            ),
            "checkpoint": dl.CheckpointCallback(
                loader_key="valid", metric_key="auc", 
                minimize=False, save_n_best=3
            ),
            "optuna": dl.OptunaPruningCallback(loader_key="valid", metric_key="auc")
        },
        loggers={
            "console": dl.ConsoleLogger(),
            "csv": dl.LogdirLogger(logdir=f"./logdir/{datetime.now().strftime('%Y%m%d-%H%M%S')}"),
        },
        trial=trial,
#         hparams={"l2_alpha": l2_alpha, "num_hidden1": num_hidden1, "num_hidden2": num_hidden2},
    )

    runner = CustomRunner()
    runner.run(experiment)
    score = runner.callbacks["checkpoint"].top_best_metrics[0][0]
    
    return score

study = optuna.create_study(
    direction="maximize",
#     direction="minimize",
    pruner=optuna.pruners.MedianPruner(
        n_startup_trials=0, n_warmup_steps=0, interval_steps=1
    ),
)
study.optimize(objective, n_trials=5, timeout=300)
print(study.best_value, study.best_params)

### Act 5 - fully custom experiment

In [None]:
class CustomExperiment(dl.IExperiment):
    @property
    def seed(self) -> int:
        return 73

    @property
    def name(self) -> str:
        # @TODO: auto-generate name?
        return "experiment73"

    @property
    def hparams(self) -> Dict:
        return {}

    @property
    def stages(self) -> List[str]:
        return ["stage_1", "stage_2"]

    def get_stage_params(self, stage: str) -> Dict[str, Any]:
        if stage == "stage_1":
            return {
                "num_epochs": 10,
                "migrate_model_from_previous_stage": False,
                "migrate_callbacks_from_previous_stage": False,
            }
        elif stage == "stage_2":
            return {
                "num_epochs": 6,
                "migrate_model_from_previous_stage": True,
                "migrate_callbacks_from_previous_stage": False,
            }
        else:
            raise NotImplemented()

    def get_loaders(self, stage: str) -> Dict[str, Any]:
        if stage == "stage_1":
            return {
                "train_1": DataLoader(t1, batch_size=32, num_workers=1), 
                "valid": DataLoader(v1, batch_size=32, num_workers=1), 
            }
        elif stage == "stage_2":
            return {
                "train_2": DataLoader(t2, batch_size=32, num_workers=1), 
                "valid": DataLoader(v1, batch_size=32, num_workers=1), 
            }
        else:
            raise NotImplemented()

    def get_model(self, stage: str):
        return nn.Sequential(
            nn.Linear(2, 16), nn.ReLU(), 
            nn.Linear(16, 16), nn.ReLU(), 
            nn.Linear(16, 1)
        )

    def get_criterion(self, stage: str):
        return nn.BCEWithLogitsLoss()

    def get_optimizer(self, stage: str, model):
        if stage == "stage_1":
            return torch.optim.Adam(model.parameters(), lr=0.02)
        elif stage == "stage_2":
            return torch.optim.SGD(model.parameters(), lr=0.01)
        else:
            raise NotImplemented()

    def get_scheduler(self, stage: str, optimizer):
        if stage == "stage_1":
            return torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 8])
        elif stage == "stage_2":
            return torch.optim.lr_scheduler.MultiStepLR(optimizer, [3, 6])
        else:
            raise NotImplemented()
        

    def get_callbacks(self, stage: str) -> Dict[str, dl.Callback]:
        if stage == "stage_1":
            
            return {
                "criterion": dl.CriterionCallback(
                    metric_key="loss", 
                    outputs_key="logits", 
                    targets_key="targets"
                ), 
#                 "accuracy": dl.MetricCallback(
#                     metric=metrics.AccuracyMetric(), compute_on_batch=True,
#                     outputs_key="logits", targets_key="targets", 
#                 ),
#                 "auc": dl.MetricCallback(
#                     metric=metrics.AUCMetric(), compute_on_batch=False,
#                     outputs_key="scores", targets_key="targets", 
#                 ), 
                "optimizer": dl.OptimizerCallback(metric_key="loss"), 
                "scheduler": dl.SchedulerCallback(
                    loader_key="valid", metric_key="loss"
                ),
                "checkpoint": dl.CheckpointCallback(
                    loader_key="valid", metric_key="loss", 
                    minimize=True, save_n_best=3
                ),
            }
        elif stage == "stage_2":
            return {
                "criterion": dl.CriterionCallback(
                    metric_key="loss", 
                    outputs_key="logits", 
                    targets_key="targets"
                ), 
                "auc": dl.MetricCallback(
                    metric=metrics.AUCMetric(), compute_on_batch=False,
                    outputs_key="scores", targets_key="targets", 
                ), 
                "optimizer": dl.OptimizerCallback(metric_key="loss"), 
                "scheduler": dl.SchedulerCallback(
                    loader_key="valid", metric_key="loss"
                ),
                "checkpoint_auc": dl.CheckpointCallback(
                    loader_key="valid", metric_key="auc", 
                    minimize=False, save_n_best=3
                ),
            }
        else:
            raise NotImplemented()
        

    def get_engine(self):
        return dl.Engine()

    def get_trial(self):
        return None

    def get_loggers(self):
        return {
            "console": dl.ConsoleLogger(),
            "csv": dl.LogdirLogger(logdir="./logdir5"),
        }

    
class CustomRunner(dl.IStageBasedRunner):
    def handle_batch(self, batch):
        x, y = batch
        y_hat = self.model(x)
        
        self.batch = {
            "features": x,
            "targets": y,
            "logits": y_hat.view(-1),
            "scores": torch.sigmoid(y_hat.view(-1)),
        }
    
experiment = CustomExperiment()
CustomRunner().run(experiment)

---

🎉 You have passed ``Kittylyst`` tutorial! This is just a minimal educational demo, but I hope you found it interesting for your deep learning research code organisation.

For more advanced and production-ready solution please follow our [Catalyst](https://github.com/catalyst-team/catalyst) repository.

PS. If you are interested in deep learning you could also try out our [dl-course](https://github.com/catalyst-team/dl-course).