# SSNE Miniproject 3
### 318703 Tomasz Owienko
### 318718 Anna Schäfer
### Grupa piątek

In [1]:
from typing import Any, Callable

import torch
import torchmetrics
import torch.nn as nn
import torchvision.transforms as transforms
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.utilities.types import TRAIN_DATALOADERS, EVAL_DATALOADERS
from torch.utils.data import DataLoader, Dataset
from torchvision.datasets import ImageFolder
import pytorch_lightning as pl
import torch.nn.functional as F

In [2]:
RANDOM_SEED = 123
torch.manual_seed(RANDOM_SEED)

<torch._C.Generator at 0x7f0244315370>

In [3]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
torch.set_float32_matmul_precision('medium')

In [4]:
class ImagesDataModule(pl.LightningDataModule):
    class FastDataset(Dataset):
        def __init__(self, data, labels, num_classes):
            self.dataset = data
            self.labels = labels
            self.number_classes = num_classes

        def __len__(self):
            return len(self.dataset)

        def __getitem__(self, index):
            return self.dataset[index], self.labels[index]

    def __init__(self, path: str, transform: Callable[[Any], torch.Tensor], *, val_fraction: float,
                 test_fraction: float, in_memory=False):
        super().__init__()
        assert 0 <= val_fraction + test_fraction <= 1
        assert val_fraction * test_fraction >= 0

        self.image_folder = ImageFolder(path, transform=transform)
        self.dataset: ImagesDataModule.FastDataset | None = None
        self._val_fraction = val_fraction
        self._test_fraction = test_fraction
        self._in_memory = in_memory

        self._train = self._val = self._test = None

    def prepare_data(self) -> None:
        if self._in_memory:
            loader = DataLoader(self.image_folder, batch_size=len(self.image_folder))
            data = next(iter(loader))
            dataset = ImagesDataModule.FastDataset(data[0], data[1], num_classes=len(self.image_folder.classes))
        else:
            dataset = self.image_folder

        val_size = int(len(dataset) * self._val_fraction)
        test_size = int(len(dataset) * self._test_fraction)
        train_size = len(dataset) - val_size - test_size

        self._train, self._val, self._test = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])

    def train_dataloader(self) -> TRAIN_DATALOADERS:
        return DataLoader(self._train, batch_size=512, shuffle=True, num_workers=8 if not self._in_memory else 0,
                          pin_memory=True)

    def val_dataloader(self) -> EVAL_DATALOADERS:
        return DataLoader(self._val, batch_size=64, shuffle=False, num_workers=8 if not self._in_memory else 0,
                          pin_memory=True)

    def test_dataloader(self) -> EVAL_DATALOADERS:
        return DataLoader(self._test, batch_size=64, shuffle=False, num_workers=8 if not self._in_memory else 0,
                          pin_memory=True)

In [5]:
class ImageClassifier(pl.LightningModule):
    def __init__(self, num_classes, lr, weight_decay, loss):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.pool1 = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool2 = nn.MaxPool2d(2, 2)
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.pool3 = nn.MaxPool2d(2, 2)
        self.conv4 = nn.Conv2d(128, 256, 3, padding=1)
        self.pool4 = nn.MaxPool2d(2, 2)
        self.dropout1 = nn.Dropout(0.25)
        self.fc1 = nn.Linear(256 * 4 * 4, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.dropout2 = nn.Dropout(0.25)
        self.fc3 = nn.Linear(512, num_classes)

        self._accuracy = torchmetrics.classification.MulticlassAccuracy(num_classes=num_classes).to(device)

        self.example_input_array = torch.rand((16, 3, 64, 64)).to(device)

        self._lr = lr
        self._weight_decay = weight_decay
        self._loss = loss

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = self.pool3(F.relu(self.conv3(x)))
        x = self.pool4(F.relu(self.conv4(x)))
        x = x.view(-1, 256 * 4 * 4)
        x = self.dropout1(x)
        x = F.relu(self.fc1(x))
        x = self.dropout2(x)
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.001, weight_decay=0.001)
        return optimizer

    def training_step(self, batch, batch_idx):
        images, labels = batch
        out = self(images)
        loss = self._loss(out, labels)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        images, labels = batch
        out = self(images)
        loss = self._loss(out, labels)
        self.log('val_loss', loss)
        self.log('val_accuracy', self._accuracy(out, labels))

        return loss

    def on_train_start(self) -> None:
        self.logger.experiment.add_graph(self, self.example_input_array)

    def test_step(self, batch, batch_idx):
        images, labels = batch
        out = self(images)
        loss = self._loss(out, labels)
        self.log('test_loss', loss)
        
        self.logger.log_hyperparams({
            'lr': self._lr,
            'weight_decay': self._weight_decay,
            'loss': str(self._loss)
        }, {
            'test_accuracy': self._accuracy(out, labels)
        })
        return loss

In [6]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

dm = ImagesDataModule('data/train', transform, val_fraction=0.1, test_fraction=0.1)

In [7]:
model = ImageClassifier(num_classes=len(dm.image_folder.classes), lr=1e-3, weight_decay=1e-4,
                        loss=torch.nn.CrossEntropyLoss())
trainer = pl.Trainer(max_epochs=1, enable_checkpointing=False)
trainer.fit(model, datamodule=dm)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

   | Name      | Type               | Params | In sizes          | Out sizes        
------------------------------------------------------------------------------------------
0  | conv1     | Conv2d             | 896    | [16, 3, 64, 64]   | [16, 32, 64, 64] 
1  | pool1     | MaxPool2d          | 0      | [16, 32, 64, 64]  | [16, 32, 32, 32] 
2  | conv2     | Conv2d             | 18.5 K | [16, 32, 32, 32]  | [16, 64, 32, 32] 
3  | pool2     | MaxPool2d          | 0      | [16, 64, 32, 32]  | [16, 64, 16, 16] 
4  | conv3     | Conv2d             | 73.9 K | [16, 64, 16, 16]  | [16, 128, 16, 16]
5  | pool3     | MaxPool2d          | 0      | [16, 128, 16, 16] | [16, 128, 8, 8]  
6  | conv4     | Conv2d             | 295 K  | [16, 128, 8, 8]   | [16, 256, 8, 8]  
7  | pool4     | MaxPool2d        

Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=1` reached.


In [8]:
trainer.test(model, datamodule=dm)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: 0it [00:00, ?it/s]

[{'test_loss': 3.157754421234131}]

In [9]:
%load_ext tensorboard
%tensorboard --logdir=lightning_logs

Launching TensorBoard...