In [42]:
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from sklearn.metrics import accuracy_score
import time
from lets_plot import *
import polars as pl
import numpy as np

LetsPlot.setup_html()


# Data


In [4]:
def get_mnist_dataset(
    train_transformers=transforms.ToTensor(), test_transformers=transforms.ToTensor()
) -> tuple[Dataset, Dataset]:
    mnist_train_dataset = datasets.MNIST(
        root="./data", train=True, download=True, transform=train_transformers
    )
    mnist_test_dataset = datasets.MNIST(
        root="./data", train=False, download=True, transform=test_transformers
    )
    return mnist_train_dataset, mnist_test_dataset


In [8]:
def get_mnist_dataloaders(
    batch_size: int, train_dataset: Dataset, test_dataset: Dataset
) -> tuple[DataLoader, DataLoader]:
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    return train_dataloader, test_dataloader


In [9]:
mnist_train_dataset, mnist_test_dataset = get_mnist_dataset()


In [11]:
mnist_train_dataloder, mnist_test_dataloder = get_mnist_dataloaders(
    64, mnist_train_dataset, mnist_test_dataset
)

In [None]:
def plot_dataset_image(idx: int, dataset: Dataset):
    image, label = dataset[idx]
    image_reshaped = image.reshape(28, 28)

    plt.imshow(image_reshaped, cmap="gray")
    plt.title(label)
    plt.show()


plot_dataset_image(4, mnist_train_dataset)

# Image preprocessing with kernels


In [65]:
def blurring_image(image: torch.Tensor, kernel_size: int = 3) -> torch.Tensor:
    kernel = torch.ones(kernel_size, kernel_size) / kernel_size**2
    return F.conv2d(image.unsqueeze(0), kernel.unsqueeze(0).unsqueeze(0))


def edging_image(image: torch.Tensor):
    kernel = torch.tensor([[-1, -1, -1], [-1, 8, -1], [-1, -1, -1]], dtype=torch.float)
    return F.conv2d(image.unsqueeze(0), kernel.unsqueeze(0).unsqueeze(0))


def finding_horizontal_lines(image: torch.Tensor):
    kernel = torch.tensor([[1, 1, 1], [0, 0, 0], [-1, -1, -1]], dtype=torch.float)
    return F.conv2d(image.unsqueeze(0), kernel.unsqueeze(0).unsqueeze(0))


def finding_vertical_lines(image: torch.Tensor):
    kernel = torch.tensor([[1, 0, -1], [1, 0, -1], [1, 0, -1]], dtype=torch.float)
    return F.conv2d(image.unsqueeze(0), kernel.unsqueeze(0).unsqueeze(0))


In [None]:
image, _ = mnist_train_dataset[90]

new_image = blurring_image(image)

plt.imshow(image.reshape(28, 28), cmap="gray")
plt.show()
plt.imshow(new_image.reshape(26, 26), cmap="gray")
plt.show()


In [None]:
image, _ = mnist_train_dataset[90]

new_image = edging_image(image)

plt.imshow(image.reshape(28, 28), cmap="gray")
plt.show()
plt.imshow(new_image.reshape(26, 26), vmin=0, vmax=1, cmap="gray")
plt.show()

In [None]:
image, _ = mnist_train_dataset[90]

new_image = finding_horizontal_lines(image)

plt.imshow(image.reshape(28, 28), cmap="gray")
plt.show()
plt.imshow(new_image.reshape(26, 26), vmin=0, vmax=1, cmap="gray")
plt.show()


In [None]:
image, _ = mnist_train_dataset[90]

new_image = finding_vertical_lines(image)

plt.imshow(image.reshape(28, 28), cmap="gray")
plt.show()
plt.imshow(new_image.reshape(26, 26), vmin=0, vmax=1, cmap="gray")
plt.show()

# Training a CNN


In [15]:
B = 64  # batch size
D = 28 * 28  # image dimensionality
C = 1  # number of channels
classes = 10
filters = 16
kernel_size = 3

fc_model = nn.Sequential(
    nn.Flatten(),  # (B, C, W, H) -> (B, C * W * H) = (B, D)
    nn.Linear(D, 256),
    nn.Tanh(),
    nn.Linear(256, classes),
)

model_conv = nn.Sequential(
    nn.Conv2d(C, filters, kernel_size, padding=kernel_size // 2),
    nn.Tanh(),
    nn.Flatten(),
    nn.Linear(filters * D, classes),
)


## Boilerplate

Esse é apenas um código boilerplate para o treinamento de uma rede neural. O mesmo foi retirado do livro Inside Deep Learning e é uma mão na roda para não precisarmos ficar reescrevendo o loop de treino.

Juntamente com o loop, temos algumas funcionalidades como salvar o modelo, métricas, loss, mover os tensores pro device correto, entre outras coisas...


In [16]:
def moveTo(obj, device):
    """
    obj: the python object to move to a device, or to move its contents to a device
    device: the compute device to move objects to
    """
    if hasattr(obj, "to"):
        return obj.to(device)
    elif isinstance(obj, list):
        return [moveTo(x, device) for x in obj]
    elif isinstance(obj, tuple):
        return tuple(moveTo(list(obj), device))
    elif isinstance(obj, set):
        return set(moveTo(list(obj), device))
    elif isinstance(obj, dict):
        to_ret = dict()
        for key, value in obj.items():
            to_ret[moveTo(key, device)] = moveTo(value, device)
        return to_ret
    else:
        return obj


def run_epoch(
    model,
    optimizer,
    data_loader,
    loss_func,
    device,
    results,
    score_funcs,
    prefix="",
    desc=None,
):
    running_loss = []
    y_true = []
    y_pred = []
    start = time.time()
    for inputs, labels in tqdm(data_loader, desc=desc, leave=False):
        # Move the batch to the device we are using.
        inputs = moveTo(inputs, device)
        labels = moveTo(labels, device)

        y_hat = model(inputs)  # this just computed f_Θ(x(i))
        # Compute loss.
        loss = loss_func(y_hat, labels)

        if model.training:
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        # Now we are just grabbing some information we would like to have
        running_loss.append(loss.item())

        if len(score_funcs) > 0 and isinstance(labels, torch.Tensor):
            # moving labels & predictions back to CPU for computing / storing predictions
            labels = labels.detach().cpu().numpy()
            y_hat = y_hat.detach().cpu().numpy()
            # add to predictions so far
            y_true.extend(labels.tolist())
            y_pred.extend(y_hat.tolist())
    # end training epoch
    end = time.time()

    y_pred = np.asarray(y_pred)
    if (
        len(y_pred.shape) == 2 and y_pred.shape[1] > 1
    ):  # We have a classification problem, convert to labels
        y_pred = np.argmax(y_pred, axis=1)
    # Else, we assume we are working on a regression problem

    results[prefix + " loss"].append(np.mean(running_loss))
    for name, score_func in score_funcs.items():
        try:
            results[prefix + " " + name].append(score_func(y_true, y_pred))
        except:
            results[prefix + " " + name].append(float("NaN"))
    return end - start  # time spent on epoch


def train_simple_network(
    model,
    loss_func,
    train_loader,
    test_loader=None,
    score_funcs=None,
    epochs=50,
    device="cpu",
    checkpoint_file=None,
    lr=0.001,
):
    to_track = ["epoch", "total time", "train loss"]

    if test_loader is not None:
        to_track.append("test loss")
    for eval_score in score_funcs:
        to_track.append("train " + eval_score)
        if test_loader is not None:
            to_track.append("test " + eval_score)

    total_train_time = 0  # How long have we spent in the training loop?
    results = {}
    # Initialize every item with an empty list
    for item in to_track:
        results[item] = []

    # SGD is Stochastic Gradient Decent.
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
    # Place the model on the correct compute resource (CPU or GPU)
    model.to(device)
    for epoch in tqdm(range(epochs), desc="Epoch"):
        model = model.train()  # Put our model in training mode

        total_train_time += run_epoch(
            model,
            optimizer,
            train_loader,
            loss_func,
            device,
            results,
            score_funcs,
            prefix="train",
            desc="Training",
        )

        results["total time"].append(total_train_time)
        results["epoch"].append(epoch)

        if test_loader is not None:
            model = model.eval()
            with torch.no_grad():
                run_epoch(
                    model,
                    optimizer,
                    test_loader,
                    loss_func,
                    device,
                    results,
                    score_funcs,
                    prefix="test",
                    desc="Testing",
                )

    if checkpoint_file is not None:
        torch.save(
            {
                "epoch": epoch,
                "model_state_dict": model.state_dict(),
                "optimizer_state_dict": optimizer.state_dict(),
                "results": results,
            },
            checkpoint_file,
        )

    return pl.from_dict(results)

In [None]:
loss_func = nn.CrossEntropyLoss()
cnn_results = train_simple_network(
    model_conv,
    loss_func,
    mnist_train_dataloder,
    mnist_test_dataloder,
    score_funcs={"accuracy": accuracy_score},
    device="cuda",
    epochs=2,
    checkpoint_file="./model/cnn_checkpoint.pth",
)

loss_func = nn.CrossEntropyLoss()
fc_results = train_simple_network(
    fc_model,
    loss_func,
    mnist_train_dataloder,
    mnist_test_dataloder,
    score_funcs={"accuracy": accuracy_score},
    device="cuda",
    epochs=2,
    checkpoint_file="./model/cnn_checkpoint.pth",
)


## Visualizing results


In [None]:
results = pl.concat(
    [
        fc_results.with_columns(pl.lit("Fully Connected").alias("nn_type")),
        cnn_results.with_columns(pl.lit("CNN").alias("nn_type")),
    ]
)

(
    ggplot(results, aes(x="epoch", y="test accuracy", group="nn_type", color="nn_type"))
    + geom_line()
)


## Verificando movimentação do objeto


In [25]:
img_idx = 10
img, correct_class = mnist_train_dataset[img_idx]
img = img.reshape(28, 28)
img_lr = np.roll(np.roll(img, 4, axis=1), 1, axis=0)
img_ul = np.roll(np.roll(img, -4, axis=1), -1, axis=0)


In [None]:
f, axarr = plt.subplots(1, 3)
axarr[0].imshow(img, cmap="gray")
axarr[1].imshow(img_lr, cmap="gray")
axarr[2].imshow(img_ul, cmap="gray")
plt.show()

## Avaliando o modelo treinado


In [28]:
model = model_conv.cpu().eval()  # passing to cpu to be simpler


def pred(model, img):
    with torch.no_grad():
        w, h = img.shape

        if not isinstance(img, torch.Tensor):
            img = torch.tensor(img)

        # We need add some dimensions to the image so that it is in the correct shape
        x = img.reshape(1, -1, w, h)
        logits = model(x)

        # We need to apply softmax to get the probabilities
        y_hat = F.softmax(logits, dim=1)

    return y_hat.numpy().flatten()


In [None]:
img_lr_pred = pred(model, img_lr)
img_pred = pred(model, img)
img_ul_pred = pred(model, img_ul)

print(
    f"Predicted class for original image: {np.argmax(img_pred)} / Probability: {img_pred[np.argmax(img_pred)]}"
)
print(
    f"Predicted class for left-right image: {np.argmax(img_lr_pred)} / Probability: {img_lr_pred[np.argmax(img_lr_pred)]}"
)
print(
    f"Predicted class for up-left image: {np.argmax(img_ul_pred)} / Probability: {img_ul_pred[np.argmax(img_ul_pred)]}"
)


## Training with pooling


In [None]:
model_cnn_pool = nn.Sequential(
    nn.Conv2d(C, filters, 3, padding=3 // 2),
    nn.Tanh(),
    nn.Conv2d(filters, filters, 3, padding=3 // 2),
    nn.Tanh(),
    nn.Conv2d(filters, filters, 3, padding=3 // 2),
    nn.Tanh(),
    nn.MaxPool2d(2),
    nn.Conv2d(filters, 2 * filters, 3, padding=3 // 2),
    nn.Tanh(),
    nn.Conv2d(2 * filters, 2 * filters, 3, padding=3 // 2),
    nn.Tanh(),
    nn.Conv2d(2 * filters, 2 * filters, 3, padding=3 // 2),
    nn.Tanh(),
    nn.MaxPool2d(2),
    nn.Flatten(),
    nn.Linear(2 * filters * D // (4**2), classes),
)

loss_func = nn.CrossEntropyLoss()
cnn_results_with_pool = train_simple_network(
    model_cnn_pool,
    loss_func,
    mnist_train_dataloder,
    mnist_test_dataloder,
    score_funcs={"accuracy": accuracy_score},
    device="cuda",
    epochs=1,
    checkpoint_file="./model/cnn_pooling_checkpoint.pth",
)

### Avaliando o modelo


In [None]:
model_pool = model_cnn_pool.cpu().eval()  # passing to cpu to be simpler

img_pred = pred(model_pool, img)
img_lr_pred = pred(model_pool, img_lr)
img_ul_pred = pred(model_pool, img_ul)

print(
    f"Predicted class for original image: {np.argmax(img_pred)} / Probability: {img_pred[np.argmax(img_pred)]}"
)
print(
    f"Predicted class for left-right image: {np.argmax(img_lr_pred)} / Probability: {img_lr_pred[np.argmax(img_lr_pred)]}"
)
print(
    f"Predicted class for up-left image: {np.argmax(img_ul_pred)} / Probability: {img_ul_pred[np.argmax(img_ul_pred)]}"
)

In [None]:
# visualizing with pooling and without
results = pl.concat(
    [
        cnn_results.with_columns(pl.lit("Without Pooling").alias("pooling")),
        cnn_results_with_pool.with_columns(pl.lit("With Pooling").alias("pooling")),
    ]
)

(
    ggplot(results, aes(x="epoch", y="test accuracy", group="pooling", color="pooling"))
    + geom_line()
)


## Training with Data Augmentation


In [None]:
sample_transforms = {
    "Rotation": transforms.RandomAffine(degrees=45),
    "Translation": transforms.RandomAffine(0, translate=(0.1, 0.1)),
    "Shear": transforms.RandomAffine(0, shear=45),
    "RandomCrop": transforms.RandomCrop((20, 20)),
    "Horizontal Flip": transforms.RandomHorizontalFlip(p=1.0),
    "Vertical Flip": transforms.RandomVerticalFlip(p=1.0),
    "Color Jitter": transforms.ColorJitter(
        brightness=0.5, contrast=0.5, saturation=0.5, hue=0.5
    ),
    "Perspective": transforms.RandomPerspective(p=1.0),
}

pil_img = transforms.ToPILImage()(img)

f, axarr = plt.subplots(2, 4, figsize=(15, 10))

for i, (name, transform) in enumerate(sample_transforms.items()):
    pil_img_transformed = transform(pil_img)
    axarr[i // 4, i % 4].imshow(pil_img_transformed, cmap="gray")
    axarr[i // 4, i % 4].set_title(name)

plt.show()

In [38]:
train_transform = transforms.Compose(
    [
        transforms.RandomAffine(degrees=5, translate=(0.05, 0.05), scale=(0.95, 1.05)),
        transforms.ToTensor(),
    ]
)
test_transform = transforms.ToTensor()

mnist_dataset_train_v2, mnist_dataset_test_v2 = get_mnist_dataset(
    train_transformers=train_transform, test_transformers=test_transform
)
mnist_train_dataloder_v2, mnist_test_dataloader_v2 = get_mnist_dataloaders(
    B, mnist_dataset_train_v2, mnist_dataset_test_v2
)


In [None]:
loss_func = nn.CrossEntropyLoss()
model_cnn_pool_data_aug = nn.Sequential(
    nn.Conv2d(C, filters, 3, padding=3 // 2),
    nn.Tanh(),
    nn.Conv2d(filters, filters, 3, padding=3 // 2),
    nn.Tanh(),
    nn.Conv2d(filters, filters, 3, padding=3 // 2),
    nn.Tanh(),
    nn.MaxPool2d(2),
    nn.Conv2d(filters, 2 * filters, 3, padding=3 // 2),
    nn.Tanh(),
    nn.Conv2d(2 * filters, 2 * filters, 3, padding=3 // 2),
    nn.Tanh(),
    nn.Conv2d(2 * filters, 2 * filters, 3, padding=3 // 2),
    nn.Tanh(),
    nn.MaxPool2d(2),
    nn.Flatten(),
    nn.Linear(2 * filters * D // (4**2), classes),
)

cnn_results_with_pool_w_data_aug = train_simple_network(
    model_cnn_pool_data_aug,
    loss_func,
    mnist_test_dataloader_v2,
    mnist_test_dataloader_v2,
    score_funcs={"accuracy": accuracy_score},
    device="cuda",
    epochs=1,
    checkpoint_file="./model/cnn_pooling_checkpoint_w_data_aug.pth",
)


In [None]:
# visualizing with pooling and without
results = pl.concat(
    [
        cnn_results_with_pool.with_columns(pl.lit("With Pooling").alias("pooling")),
        cnn_results_with_pool_w_data_aug.with_columns(
            pl.lit("Without Pooling").alias("pooling")
        ),
    ]
)

(
    ggplot(results, aes(x="epoch", y="test accuracy", group="pooling", color="pooling"))
    + geom_line()
)

In [None]:
model_v3 = model_cnn_pool_data_aug.cpu().eval()  # passing to cpu to be simpler

img_pred = pred(model_v3, img)
img_lr_pred = pred(model_v3, img_lr)
img_ul_pred = pred(model_v3, img_ul)

print(
    f"Predicted class for original image: {np.argmax(img_pred)} / Probability: {img_pred[np.argmax(img_pred)]}"
)
print(
    f"Predicted class for left-right image: {np.argmax(img_lr_pred)} / Probability: {img_lr_pred[np.argmax(img_lr_pred)]}"
)
print(
    f"Predicted class for up-left image: {np.argmax(img_ul_pred)} / Probability: {img_ul_pred[np.argmax(img_ul_pred)]}"
)

# Torchvision models


In [None]:
dir(models)