# DDPLKO Moduł 4 - praca domowa - Quickdraw 10 class - regularyzacja - pytorch

Twoim zadaniem w tym module będzie przygotowanie własnego modelu sieci neuronowej korzystając z regularyzacji.

Lista rzeczy które musi spełnić Twój model:
- [x] działać na wybranych przez Ciebie 10 klasach (bazuj na kodzie z modułu 3)
- [ ] liczba parametrów pomiędzy 100'000 a 200'000
- [ ] wykorzystane przynajmniej 2 sposoby walki z regularyzacją
- [ ] mieć wykonane co najmniej 4 zmiany w celu poprawy wyniku; zachowaj wszystkie iteracje (modyfikując model możesz dodać opcje w funkji, bądź skopiować klasę/funkcję, tak by było widać kolejne architektury)
- [ ] opisz co chcesz sprawdzić w kolejnych eksperymentach (np. sprawdzę czy Dropout pomaga i z jaką wartością drop ratio najbardziej)
- [ ] uzyskiwać lepsze `validation accuracy` niż w przypadku pierwszego modelu z poprzedniego modułu (im więcej punktów procentowych różnicy tym lepiej)

Zwizualizuj proszę:
- [ ] historie treningów (wystarczy Val acc, ale train acc czy lossy też mogą być)
- [ ] zależność: liczba parametrów - val acc

Możesz (czyli opcjonalne rzeczy):
- pracować na zmniejszonym zbiorze, by dobrać wartość parametrów
- np. zastosować dropout, pooling i early stopping
- zastosować TF2 - Keras / PyTorcha czy PL (Pytorch Lightning)
- dodać LR scheduler do swojego treningu (i sprawdzić czy to poprawiło wynik)
- zwizualizować dodatkowo:
  - confussion matrix
  - błędne przypadki

Warto:
- zmieniać 1 parametr między eksperymentami (szczególnie trudne gdy się już nabierze wyczucia)

## Importy

In [1]:
import gc
import os
import pathlib
import pprint
import urllib
from typing import Any, Tuple

import matplotlib.pyplot as plt
import numpy as np
import torch
from sklearn.model_selection import train_test_split
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader, Dataset, TensorDataset

from torchvision import datasets, transforms



### Klasa QuickDrawDataset

In [2]:
# wczytanie danych

class_names = [
    "airplane",
    "banana",
    "cookie",
    "diamond",
    "dog",
    "hot air balloon",
    "knife",
    "parachute",
    "scissors",
    "wine glass",
]
data_folder = "../data/quickdraw/"

# make sure data_folder exists - pathlib
pathlib.Path(data_folder).mkdir(parents=True, exist_ok=True)


class QuickDrawDataset(Dataset):
    """A Quick, Draw! dataset"""

    def __init__(
        self, classes, root_dir, download_data=False, load_data=True, transform=None
    ):
        """
        Arguments:
            classes (list[string]): List of classes to be used.
            root_dir (string): Directory with all the images.
            download (bool, optional) – If True, downloads the dataset from the internet and puts it in root directory. If dataset is already downloaded, it is not downloaded again.
        """
        self.classes = classes
        self.root_dir = root_dir

        if download_data:
            self.download_data()

        if load_data:
            self.data, self.targets = self._load_data()

        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx: int) -> Tuple[Any, Any]:
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img, target = self.data[idx], int(self.targets[idx])

        if self.transform:
            img = self.transform(img)

        return img, target

    def download_data(self):
        for name in self.classes:
            url = (
                "https://storage.googleapis.com/quickdraw_dataset/full/numpy_bitmap/%s.npy"
                % name
            )
            file_name = self.root_dir + url.split("/")[-1].split("?")[0]

            url = url.replace(" ", "%20")

            if not os.path.isfile(file_name):
                print(url, "==>", file_name)
                urllib.request.urlretrieve(url, file_name)

    def _load_data(self):
        raw_data = []
        for name in self.classes:
            file_name = self.root_dir + name + ".npy"
            raw_data.append(np.load(file_name, fix_imports=True, allow_pickle=True))
            print("%-15s" % name, type(raw_data[-1]))

        reshaped_data = np.concatenate(raw_data).reshape(-1, 28, 28, 1)
        reshaped_targets = np.concatenate(
            [np.full(d.shape[0], i) for i, d in enumerate(raw_data)]
        )

        return reshaped_data, reshaped_targets

    def _set_data(self, data, targets):
        self.data = data
        self.targets = targets

    def split_train_test(self, test_size=0.2):
        """Split data into train and test sets using sklearn.model_selectiontrain_test_split function."""

        X_train, X_test, y_train, y_test = train_test_split(
            self.data,
            self.targets,
            test_size=test_size,
            random_state=12,
            stratify=self.targets,
        )

        train_dataset = QuickDrawDataset(
            self.classes,
            self.root_dir,
            download_data=False,
            load_data=False,
            transform=self.transform,
        )
        test_dataset = QuickDrawDataset(
            self.classes,
            self.root_dir,
            download_data=False,
            load_data=False,
            transform=self.transform,
        )

        train_dataset._set_data(X_train, y_train)
        test_dataset._set_data(X_test, y_test)

        return train_dataset, test_dataset

def get_torch_optimizer(optimizer_name, model_params, lr):
    if optimizer_name == "Adam":
        return torch.optim.Adam(
            model_params,
            lr=lr,
        )
    elif optimizer_name == "SGD":
        return torch.optim.SGD(
            model_params,
            lr=lr,
        )
    else:
        raise ValueError(f"Unknown optimizer {optimizer_name}")

def get_torch_loss(loss_name):
    if loss_name == "cross_entropy":
        return torch.nn.CrossEntropyLoss()
    elif loss_name == "mse":
        return torch.nn.MSELoss()
    else:
        raise ValueError(f"Unknown loss {loss_name}")

# from https://pytorch.org/tutorials/beginner/basics/quickstart_tutorial.html

def train(dataloader, model, loss_fn, optimizer, device):
    size = len(dataloader.dataset)
    model.train()

    loss_samples = []
    acc_samples = []
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss_samples.append(loss.item())
            acc_samples.append(accuracy(pred, y))
     
    return np.mean(loss_samples), np.mean(acc_samples)

def test(dataloader, model, loss_fn, device):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    
all_dataset = QuickDrawDataset(
    class_names,
    data_folder,
    download_data=True,
    load_data=True,
    transform=transforms.ToTensor(),
)

train_dataset, val_dataset = all_dataset.split_train_test(test_size=0.2)

# to save RAM
del all_dataset
gc.collect()

print(f"train_dataset: {len(train_dataset)} samples")
print(f"test_dataset: {len(train_dataset)} samples")

airplane        <class 'numpy.ndarray'>
banana          <class 'numpy.ndarray'>
cookie          <class 'numpy.ndarray'>
diamond         <class 'numpy.ndarray'>
dog             <class 'numpy.ndarray'>
hot air balloon <class 'numpy.ndarray'>
knife           <class 'numpy.ndarray'>
parachute       <class 'numpy.ndarray'>
scissors        <class 'numpy.ndarray'>
wine glass      <class 'numpy.ndarray'>
train_dataset: 1245340 samples
test_dataset: 1245340 samples


In [3]:
config = {
    "batch_size": 64,
    "epochs": 10,
    "learning_rate": 1e-3,
    "loss": "cross_entropy",
    "optimizer": "Adam",
    "device": (
        "cuda"
        if torch.cuda.is_available()
        else "cpu"
    ),
}

    
pprint.pprint(config)

{'batch_size': 64,
 'device': 'cuda',
 'epochs': 10,
 'learning_rate': 0.001,
 'loss': 'cross_entropy',
 'optimizer': 'Adam'}


In [4]:
train_dataloader = DataLoader(train_dataset, batch_size=config["batch_size"])
test_dataloader = DataLoader(test_dataset, batch_size=config["batch_size"])

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

Shape of X [N, C, H, W]: torch.Size([64, 1, 28, 28])
Shape of y: torch.Size([64]) torch.int64


In [5]:
class QuickDrawNetwork_V1(nn.Module):
    def __init__(self, dimensions, num_classes):
        super().__init__()

        self.channels, self.width, self.height = dimensions
        self.num_classes = num_classes

        self.conv_layers = nn.Sequential(
            nn.Conv2d(self.channels, 32, 3),
            nn.ReLU(),
            nn.MaxPool2d(2),
            #nn.AdaptiveAvgPool2d((1, 1)),
            nn.Conv2d(32, 64, 3),
            nn.ReLU(),
            nn.MaxPool2d(2),
            #nn.AdaptiveAvgPool2d((1, 1)),
            nn.Conv2d(64, 256, 3),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )

        self.fully_connected_layers = nn.Sequential(
            nn.Linear(256, 32),
            nn.Linear(32, self.num_classes)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = nn.Flatten()(x)
        logits = self.fully_connected_layers(x)
        return logits

img_dimensions = (1, 28, 28)
model = QuickDrawNetwork_V1(img_dimensions, len(class_names))
num_of_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

assert num_of_params > 100_000, "Za mało parametrów"
assert num_of_params < 200_000, "Za dużo parametrów"

print(model)
print('Number of parameters:', num_of_params)

QuickDrawNetwork_V1(
  (conv_layers): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(64, 256, kernel_size=(3, 3), stride=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fully_connected_layers): Sequential(
    (0): Linear(in_features=256, out_features=32, bias=True)
    (1): Linear(in_features=32, out_features=10, bias=True)
  )
)
Number of parameters: 175082


In [6]:
epochs = config["epochs"]
loss_fn = get_torch_loss(config["loss"])
optimizer = get_torch_optimizer(config["optimizer"], model.parameters(), config["learning_rate"])
device = config["device"]

model.to(device)

train_loss_history = []
train_acc_history = []

for t in range(epochs):
    print(f"Training on : {device}")
    print(f"Epoch {t+1}\n-------------------------------")
    
    loss, acc = train(train_dataloader, model, loss_fn, optimizer, device)
    
    train_loss_history.append(loss)
    train_acc_history.append(acc)

    test(test_dataloader, model, loss_fn, device)
print("Done!")

# plot training history with matplotlib

fig, ax = plt.subplots(1, 2, figsize=(15, 5))

ax[0].plot(train_loss_history)
ax[0].set_title("Training loss")
ax[0].set_xlabel("Epoch")
ax[0].set_ylabel("Loss")

ax[1].plot(train_acc_history)
ax[1].set_title("Training accuracy")
ax[1].set_xlabel("Epoch")
ax[1].set_ylabel("Accuracy")

plt.show()

Training on : cuda
Epoch 1
-------------------------------


TypeError: accuracy() missing 1 required positional argument: 'task'

In [None]:
raise Exception("work in progress")

# Trening

In [None]:
# You are using a CUDA device ('NVIDIA GeForce RTX 4070 Ti') that has Tensor Cores. 
# To properly utilize them, you should set 
# `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance
torch.set_float32_matmul_precision('medium')

BATCH_SIZE = 512



model = QuickDrawCNN_PL(X_train,y_train,X_val,y_val, BATCH_SIZE)
model_name = "baseline"
logger = TensorBoardLogger("lightning_logs", name=model_name)

trainer = pl.Trainer(
    max_epochs=10, 
    precision=32, 
    accelerator="gpu",
    logger=logger,
    limit_train_batches=0.1, # trenujemy tylko na 10% batchy z podzbioru treningowego
    callbacks=[
        #EarlyStopping(monitor="val_acc", min_delta=0.01, patience=4, verbose=False, mode="max")
    ]
)


trainer.fit(model)

del model
del trainer
del logger
gc.collect()

In [None]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs/

### Pierwsza zmiana

Dodaję LR scheduler (OneCycleLR), żeby skrócić czas treningu i przyspieszyć kolejne iteracje zmian

In [None]:
class QuickDrawCNN_PL_LR_Scheduler(pl.LightningModule):
    def __init__(self, X_train,y_train,X_val,y_val, batch_size):
        super().__init__()

        self.X_train = torch.FloatTensor(X_train).permute(0, 3, 1, 2)
        self.X_val = torch.FloatTensor(X_val).permute(0, 3, 1, 2)
        self.y_train = torch.LongTensor(y_train)
        self.y_val = torch.LongTensor(y_val)
        self.train_dataset = TensorDataset(self.X_train, self.y_train)
        self.val_dataset = TensorDataset(self.X_val, self.y_val)

        self.batch_size = batch_size

        self.num_classes = 10
        self.dims = (1, 28, 28)
        channels, width, height = self.dims

        self.model = nn.Sequential(
            nn.Conv2d(channels, 32, 3),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 256, 3),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Flatten(),
            nn.Linear(256, 32),
            nn.Linear(32, self.num_classes), 
        )

    def forward(self, x):
        x = self.model(x)
        return F.log_softmax(x, dim=1)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)

        # logs metrics for each training_step,
        # and the average across the epoch, to the progress bar and logger
        self.log("train_loss", loss, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def configure_optimizers(self):
        # zmiana tutaj
        #return torch.optim.Adam(self.parameters(), lr=1e-2, weight_decay=1e-5)
        optimizer = torch.optim.Adam(self.parameters(), weight_decay=1e-5)
        scheduler = torch.optim.lr_scheduler.OneCycleLR(
            optimizer, max_lr=1e-3, total_steps=self.trainer.estimated_stepping_batches
        )
        return [optimizer], [scheduler]

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = accuracy(preds, y, task="multiclass", num_classes=self.num_classes)

        # Calling self.log will surface up scalars for you in TensorBoard
        self.log('val_loss', loss, on_epoch=True, prog_bar=True, logger=True)
        self.log('val_acc', acc, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def test_step(self, batch, batch_idx):
        # Here we just reuse the validation_step for testing
        return self.validation_step(batch, batch_idx)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, num_workers=16)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, num_workers=16)

model = QuickDrawCNN_PL_LR_Scheduler(X_train,y_train,X_val,y_val, batch_size=BATCH_SIZE)

model_name = "OneCycleLR"
logger = TensorBoardLogger("lightning_logs", model_name)

trainer = pl.Trainer(
    max_epochs=10, 
    precision=32, 
    accelerator="gpu",
    logger=logger,
    limit_train_batches=0.1,
    callbacks=[
        EarlyStopping(monitor="val_acc", min_delta=0.01, patience=4, verbose=False, mode="max")
    ]
)


trainer.fit(model)

del model
del trainer
del logger
gc.collect()

In [None]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs/

### Druga zmiana

Dodaję regularyzację poprzez BatchNormalization w celu poprawienia wyniku po ok 18k kroku, gdy zaczyna się overfitting.

In [None]:
class QuickDrawCNN_PL_BN(pl.LightningModule):
    def __init__(self, X_train,y_train,X_val,y_val, batch_size):
        super().__init__()

        self.X_train = torch.FloatTensor(X_train).permute(0, 3, 1, 2)
        self.X_val = torch.FloatTensor(X_val).permute(0, 3, 1, 2)
        self.y_train = torch.LongTensor(y_train)
        self.y_val = torch.LongTensor(y_val)
        self.train_dataset = TensorDataset(self.X_train, self.y_train)
        self.val_dataset = TensorDataset(self.X_val, self.y_val)

        self.batch_size = batch_size

        self.num_classes = 10
        self.dims = (1, 28, 28)
        channels, width, height = self.dims

        self.model = nn.Sequential(
            nn.Conv2d(channels, 32, 3),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(2, 2),

            nn.Conv2d(32, 64, 3),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2, 2),

            nn.Conv2d(64, 256, 3),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.MaxPool2d(2, 2),
            
            nn.Flatten(),
            nn.Linear(256, 32),
            nn.Linear(32, self.num_classes), 
        )

    def forward(self, x):
        x = self.model(x)
        return F.log_softmax(x, dim=1)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)

        # logs metrics for each training_step,
        # and the average across the epoch, to the progress bar and logger
        self.log("train_loss", loss, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def configure_optimizers(self):
        # zmiana tutaj
        #return torch.optim.Adam(self.parameters(), lr=1e-2, weight_decay=1e-5)
        optimizer = torch.optim.Adam(self.parameters(), weight_decay=1e-5)
        scheduler = torch.optim.lr_scheduler.OneCycleLR(
            optimizer, max_lr=1e-2, total_steps=self.trainer.estimated_stepping_batches
        )
        return [optimizer], [scheduler]

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = accuracy(preds, y, task="multiclass", num_classes=self.num_classes)

        # Calling self.log will surface up scalars for you in TensorBoard
        self.log('val_loss', loss, on_epoch=True, prog_bar=True, logger=True)
        self.log('val_acc', acc, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def test_step(self, batch, batch_idx):
        # Here we just reuse the validation_step for testing
        return self.validation_step(batch, batch_idx)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, num_workers=16)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, num_workers=16)

model = QuickDrawCNN_PL_BN(X_train,y_train,X_val,y_val, batch_size=BATCH_SIZE)

model_name = "OneCycleLR_BatchNorm"
logger = TensorBoardLogger("lightning_logs", model_name)

trainer = pl.Trainer(
    max_epochs=10, 
    precision=32, 
    accelerator="gpu",
    logger=logger,
    limit_train_batches=0.1,
    callbacks=[
        EarlyStopping(monitor="val_acc", min_delta=0.01, patience=4, verbose=False, mode="max")
    ]
)


trainer.fit(model)

del model
del trainer
del logger
gc.collect()

In [None]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs/

### Trzecia zmiana

Porównujemy z regularyzacją poprzez Dropout

In [None]:
class QuickDrawCNN_PL_Dropout(pl.LightningModule):
    def __init__(self, X_train,y_train,X_val,y_val, batch_size, dropout_ratio=[0.2, 0.2]):
        super().__init__()

        self.X_train = torch.FloatTensor(X_train).permute(0, 3, 1, 2)
        self.X_val = torch.FloatTensor(X_val).permute(0, 3, 1, 2)
        self.y_train = torch.LongTensor(y_train)
        self.y_val = torch.LongTensor(y_val)
        self.train_dataset = TensorDataset(self.X_train, self.y_train)
        self.val_dataset = TensorDataset(self.X_val, self.y_val)

        self.batch_size = batch_size

        self.num_classes = 10
        self.dims = (1, 28, 28)
        channels, width, height = self.dims

        self.model = nn.Sequential(
            nn.Conv2d(channels, 32, 3),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(p=dropout_ratio[0]),

            nn.Conv2d(32, 64, 3),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(p=dropout_ratio[1]),

            nn.Conv2d(64, 256, 3),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(p=dropout_ratio[1]),
            
            nn.Flatten(),
            nn.Linear(256, 32),
            nn.Linear(32, self.num_classes), 
        )

    def forward(self, x):
        x = self.model(x)
        return F.log_softmax(x, dim=1)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)

        # logs metrics for each training_step,
        # and the average across the epoch, to the progress bar and logger
        self.log("train_loss", loss, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def configure_optimizers(self):
        # zmiana tutaj
        #return torch.optim.Adam(self.parameters(), lr=1e-2, weight_decay=1e-5)
        optimizer = torch.optim.Adam(self.parameters(), weight_decay=1e-5)
        scheduler = torch.optim.lr_scheduler.OneCycleLR(
            optimizer, max_lr=1e-2, total_steps=self.trainer.estimated_stepping_batches
        )
        return [optimizer], [scheduler]

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = accuracy(preds, y, task="multiclass", num_classes=self.num_classes)

        # Calling self.log will surface up scalars for you in TensorBoard
        self.log('val_loss', loss, on_epoch=True, prog_bar=True, logger=True)
        self.log('val_acc', acc, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def test_step(self, batch, batch_idx):
        # Here we just reuse the validation_step for testing
        return self.validation_step(batch, batch_idx)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, num_workers=16)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, num_workers=16)


ratios = [[0.2, 0.2], [0.3, 0.3], [0.5, 0.5]]
ratios_different = [ [0.2, 0.5], [0.3, 0.7], [0.5, 0.2] ]


for ratio in ratios + ratios_different: 
    model = QuickDrawCNN_PL_Dropout(X_train,y_train,X_val,y_val, batch_size=BATCH_SIZE, dropout_ratio=ratio)

    ratio_string = "_".join([str(x) for x in ratio])
    logger = TensorBoardLogger("lightning_logs", name=f"modul_3_OneCycleLR_BatchNorm_Dropout_{ratio_string}", log_graph=True)

    trainer = pl.Trainer(
        max_epochs=10, 
        precision=32, 
        accelerator="gpu",
        logger=logger,
        limit_train_batches=0.1,
        callbacks=[
            EarlyStopping(monitor="val_acc", min_delta=0.01, patience=4, verbose=False, mode="max")
        ]
    )


    trainer.fit(model)

    del model
    del trainer
    del logger
    gc.collect()

In [None]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs/

Wniosek: Dropout najlepiej zadziałał z usuwając 50% map cech z pierwszej warstwy i 20% z warstw późniejszych.

### Czwarta zmiana pt1 - GlobalAvgPool

Zmiana warstwy MaxPool na GlobalAvgPool - za pomocą nn.AdaptiveAvgPool2d((1, 1)) 

In [None]:
class QuickDrawCNN_PL_GlobalAvgPool(pl.LightningModule):
    def __init__(self, X_train,y_train,X_val,y_val, batch_size):
        super().__init__()

        self.X_train = torch.FloatTensor(X_train).permute(0, 3, 1, 2)
        self.X_val = torch.FloatTensor(X_val).permute(0, 3, 1, 2)
        self.y_train = torch.LongTensor(y_train)
        self.y_val = torch.LongTensor(y_val)
        self.train_dataset = TensorDataset(self.X_train, self.y_train)
        self.val_dataset = TensorDataset(self.X_val, self.y_val)

        self.batch_size = batch_size

        self.num_classes = 10
        self.dims = (1, 28, 28)
        channels, width, height = self.dims

        self.model = nn.Sequential(
            nn.Conv2d(channels, 32, 3),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            
            nn.Conv2d(32, 64, 3),
            nn.ReLU(),
            nn.BatchNorm2d(64),

            nn.Conv2d(64, 256, 3),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.AdaptiveAvgPool2d(1),
            
            nn.Flatten(),
            nn.Linear(256, 32),
            nn.Linear(32, self.num_classes), 
        )

    def forward(self, x):
        x = self.model(x)
        return F.log_softmax(x, dim=1)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)

        # logs metrics for each training_step,
        # and the average across the epoch, to the progress bar and logger
        self.log("train_loss", loss, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def configure_optimizers(self):
        # zmiana tutaj
        #return torch.optim.Adam(self.parameters(), lr=1e-2, weight_decay=1e-5)
        optimizer = torch.optim.Adam(self.parameters(), weight_decay=1e-5)
        scheduler = torch.optim.lr_scheduler.OneCycleLR(
            optimizer, max_lr=1e-2, total_steps=self.trainer.estimated_stepping_batches
        )
        return [optimizer], [scheduler]

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = accuracy(preds, y, task="multiclass", num_classes=self.num_classes)

        # Calling self.log will surface up scalars for you in TensorBoard
        self.log('val_loss', loss, on_epoch=True, prog_bar=True, logger=True)
        self.log('val_acc', acc, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def test_step(self, batch, batch_idx):
        # Here we just reuse the validation_step for testing
        return self.validation_step(batch, batch_idx)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, num_workers=16)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, num_workers=16)

model = QuickDrawCNN_PL_GlobalAvgPool(X_train,y_train,X_val,y_val, batch_size=BATCH_SIZE)

model_name = "OneCycleLR_BatchNorm_GlobalAvgPool"
logger = TensorBoardLogger("lightning_logs", model_name)

trainer = pl.Trainer(
    max_epochs=10, 
    precision=32, 
    accelerator="gpu",
    logger=logger,
    limit_train_batches=0.1,
    callbacks=[
        #EarlyStopping(monitor="val_acc", min_delta=0.01, patience=4, verbose=False, mode="max")
    ]
)


trainer.fit(model)

del model
del trainer
del logger
gc.collect()

In [None]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs/

### Czwarta zmiana pt2 - GlobalMaxPool

Zamieniamy GlobalAvgPooling na GlobalMaxPooling żeby sprawdzić różnicę

In [None]:
class GlobalAvgPool2d(nn.Module):
    def forward(self, x):
        return torch.mean(x, dim=(-2, -1))
    
class QuickDrawCNN_PL_GlobalMaxPool(pl.LightningModule):
    def __init__(self, X_train,y_train,X_val,y_val, batch_size):
        super().__init__()

        self.X_train = torch.FloatTensor(X_train).permute(0, 3, 1, 2)
        self.X_val = torch.FloatTensor(X_val).permute(0, 3, 1, 2)
        self.y_train = torch.LongTensor(y_train)
        self.y_val = torch.LongTensor(y_val)
        self.train_dataset = TensorDataset(self.X_train, self.y_train)
        self.val_dataset = TensorDataset(self.X_val, self.y_val)

        self.batch_size = batch_size

        self.num_classes = 10
        self.dims = (1, 28, 28)
        channels, width, height = self.dims

        self.model = nn.Sequential(
            nn.Conv2d(channels, 32, 3),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            
            nn.Conv2d(32, 64, 3),
            nn.ReLU(),
            nn.BatchNorm2d(64),

            nn.Conv2d(64, 256, 3),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.AdaptiveMaxPool2d(1),
            
            nn.Flatten(),
            nn.Linear(256, 32),
            nn.Linear(32, self.num_classes), 
        )

    def forward(self, x):
        x = self.model(x)
        return F.log_softmax(x, dim=1)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)

        # logs metrics for each training_step,
        # and the average across the epoch, to the progress bar and logger
        self.log("train_loss", loss, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def configure_optimizers(self):
        # zmiana tutaj
        #return torch.optim.Adam(self.parameters(), lr=1e-2, weight_decay=1e-5)
        optimizer = torch.optim.Adam(self.parameters(), weight_decay=1e-5)
        scheduler = torch.optim.lr_scheduler.OneCycleLR(
            optimizer, max_lr=1e-2, total_steps=self.trainer.estimated_stepping_batches
        )
        return [optimizer], [scheduler]

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = accuracy(preds, y, task="multiclass", num_classes=self.num_classes)

        # Calling self.log will surface up scalars for you in TensorBoard
        self.log('val_loss', loss, on_epoch=True, prog_bar=True, logger=True)
        self.log('val_acc', acc, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def test_step(self, batch, batch_idx):
        # Here we just reuse the validation_step for testing
        return self.validation_step(batch, batch_idx)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, num_workers=16)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, num_workers=16)

model = QuickDrawCNN_PL_GlobalMaxPool(X_train,y_train,X_val,y_val, batch_size=1024)


model_name = "OneCycleLR_BatchNorm_GlobalMaxPool"
logger = TensorBoardLogger("lightning_logs", model_name)

trainer = pl.Trainer(
    max_epochs=10, 
    precision=32, 
    accelerator="gpu",
    logger=logger,
    limit_train_batches=0.1,
    callbacks=[
        EarlyStopping(monitor="val_acc", min_delta=0.01, patience=4, verbose=False, mode="max")
    ]
)


trainer.fit(model)

del model
del trainer
del logger
gc.collect()

In [None]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs/

Wniosek: GlobalAvgPool osiąga nieznacznie wyższy wynik

In [None]:
class GlobalAvgPool2d(nn.Module):
    def forward(self, x):
        return torch.mean(x, dim=(-2, -1))
    
class QuickDrawCNN_PL_GlobalMaxPool(pl.LightningModule):
    def __init__(self, X_train,y_train,X_val,y_val, batch_size):
        super().__init__()

        self.X_train = torch.FloatTensor(X_train).permute(0, 3, 1, 2)
        self.X_val = torch.FloatTensor(X_val).permute(0, 3, 1, 2)
        self.y_train = torch.LongTensor(y_train)
        self.y_val = torch.LongTensor(y_val)
        self.train_dataset = TensorDataset(self.X_train, self.y_train)
        self.val_dataset = TensorDataset(self.X_val, self.y_val)

        self.batch_size = batch_size

        self.num_classes = 10
        self.dims = (1, 28, 28)
        channels, width, height = self.dims

        self.model = nn.Sequential(
            nn.Conv2d(channels, 32, 3),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(2),
            nn.Dropout2d(0.5),

            nn.Conv2d(32, 64, 3),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2),
            nn.Dropout2d(0.2),

            nn.Conv2d(64, 256, 3),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.AdaptiveAvgPool2d(1),
            


            nn.Flatten(),
            nn.Linear(256, 32),
            nn.Linear(32, self.num_classes), 
        )

    def forward(self, x):
        x = self.model(x)
        return F.log_softmax(x, dim=1)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)

        # logs metrics for each training_step,
        # and the average across the epoch, to the progress bar and logger
        self.log("train_loss", loss, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def configure_optimizers(self):
        # zmiana tutaj
        #return torch.optim.Adam(self.parameters(), lr=1e-2, weight_decay=1e-5)
        optimizer = torch.optim.Adam(self.parameters(), weight_decay=1e-5)
        scheduler = torch.optim.lr_scheduler.OneCycleLR(
            optimizer, max_lr=1e-2, total_steps=self.trainer.estimated_stepping_batches
        )
        return [optimizer], [scheduler]

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = accuracy(preds, y, task="multiclass", num_classes=self.num_classes)

        # Calling self.log will surface up scalars for you in TensorBoard
        self.log('val_loss', loss, on_epoch=True, prog_bar=True, logger=True)
        self.log('val_acc', acc, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def test_step(self, batch, batch_idx):
        # Here we just reuse the validation_step for testing
        return self.validation_step(batch, batch_idx)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, num_workers=16)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, num_workers=16)

model = QuickDrawCNN_PL_GlobalMaxPool(X_train,y_train,X_val,y_val, batch_size=1024)

model_name = "final_network"
logger = TensorBoardLogger("lightning_logs", model_name)

trainer = pl.Trainer(
    max_epochs=10, 
    precision=32, 
    accelerator="gpu",
    logger=logger,
    callbacks=[
        EarlyStopping(monitor="val_acc", min_delta=0.01, patience=4, verbose=False, mode="max")
    ]
)


trainer.fit(model)

del model
del trainer
del logger
gc.collect()

Finalny model osiąga validation accuracy 

In [None]:
%load_ext tensorboard
%tensorboard --logdir lightning_logs/

# Wyślij rozwiązanie
Możesz skorzystać z jednego z poniższych sposobów:
**mailem na specjalny adres** ze strony pracy domowej w panelu programu prześlij jedno z poniższych:
- notebooka (jeżeli plik ma mniej niż np. 10MB)
- notebooka w zipie
- link do Colaba (udostępniony)
- link do pliku przez GDrive/Dropboxa/WeTransfer/...
- pdfa (poprzez download as pdf)
- jako plik w repozytorium na np. GitHubie, by budować swoje portfolio (wtedy uważaj na wielkość pliku, najlepiej kilka MB, Max 25MB)

Najlepiej, by w notebooku było widać wyniki uruchomienia komórek, chyba, że przez nie plik będzie mieć 100+MB wtedy najlepiej Colab lub jakieś przemyślenie co poszło nie tak (zbyt dużo dużych zdjęć wyświetlonych w komórkach).

## Co otrzymasz?
Informację zwrotną z ewentualnymi sugestiami, komentarzami.