In [None]:
# The dataset class can have the exact same code as before, no changes needed.

# Combined Waste Dataset for Multiclass Classification
# Uses CSV splits generated by create_dataset_splits.py

from torch.utils.data import Dataset
import pandas as pd
from PIL import Image
import os


# Define the classes (same as in the combination script)
final_classes = {
    'organic': 0,
    'battery': 1,
    'glass': 2,
    'metal': 3,
    'paper': 4,
    'cardboard': 5,
    'plastic': 6,
    'textiles': 7,
    'trash': 8,
}

class CombinedWasteDatasetMulti(Dataset):
    def __init__(self, root_dir="./datasets/combined_waste_dataset",
                 split='train', transform=None):
        """
        Multiclass waste dataset using CSV splits.

        Args:
            root_dir (str): Path to the combined waste dataset directory.
            split (str): 'train', 'val', or 'test'.
            transform: Transformations to apply to images.
        """
        self.root_dir = root_dir
        self.split = split
        self.transform = transform
        self.classes = list(final_classes.keys())
        self.class_to_idx = final_classes
        self.data = []
        self._load_data()

    def _load_data(self):
        csv_path = os.path.join(self.root_dir, f'{self.split}.csv')
        if not os.path.exists(csv_path):
            raise FileNotFoundError(f"CSV file {csv_path} not found. "
                                    "Run create_dataset_splits.py first.")

        df = pd.read_csv(csv_path)
        for _, row in df.iterrows():
            img_path = os.path.join(self.root_dir, row['image_path'])
            label = self.class_to_idx[row['class']]
            self.data.append((img_path, label))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, label = self.data[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label


In [None]:
# Test the dataset
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from collections import Counter

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

dataset_root = './datasets/combined_waste_dataset'

for split in ['train', 'val', 'test']:
    dataset = CombinedWasteDatasetMulti(
        root_dir=dataset_root, split=split, transform=transform)
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
    print(f"Split: {split}, Total Samples: {len(dataset)}")

    # Count images per class
    labels = [label for _, label in dataset.data]
    counts = Counter(labels)
    print("  Class distribution:")
    for cls_idx in sorted(counts.keys()):
        cls_name = dataset.classes[cls_idx]
        print(f"    {cls_name} ({cls_idx}): {counts[cls_idx]} images")

    if len(dataset) > 0:
        # Test one batch
        for images, labels_batch in dataloader:
            print(f"  Batch shape: {images.shape}, Labels: {labels_batch[0:5] } (...)")
            break

Ok, now we can think about the architecture of our model.
Our first attempt was an architecture "inspired" by VGG network.

But it was neither a direct copy nor something based on gained knowledge. It was more of a guess.
How would we know we picked a good architecture?

We can see from literature, that Architecutures are usualy designed through a combination of:
- Empirical testing
- Prior knowledge
- Automated search (Neural Architecture Search - NAS)

Let's try do do something similiar here.
We can try to implement our network in a modular way, so that we can easily change its architecture.

We can define building blocks for our network, such as convolutional layers, activation functions, and pooling layers, and then compose them to create the final architecture.

This way, we can easily experiment with different configurations and find the best-performing architecture for our task.

For now let's stick to things that are "inspired" by literature. We know what worked previously AND was simple:
- AlexNet
- VGG

Let's try to implement modular versions of these architectures and see how they perform on our dataset.
Knowing how to build modular architectures is a useful skill, as basically anything more complicated will be just a combination of simpler blocks.
So all future architectures you will encounter will be created like this.



The source of VGG architecture:
https://arxiv.org/pdf/1409.1556

We will get inspirations from it, BUT it has no drawing of the architecture itself.
Fortunately there are multiple articles that describe it in more detail, e.g.:
https://neurohive.io/en/popular-networks/vgg16/
https://lekhuyen.medium.com/an-overview-of-vgg16-and-nin-models-96e4bf398484

AND many other articles, like:
https://www.researchgate.net/figure/GG-16-network-architecture-for-feature-extraction_fig1_335184836

(These are just exaples, to visualize the architecture)



In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, WeightedRandomSampler
import torchvision.transforms as transforms
from pytorch_lightning import LightningModule, Trainer, LightningDataModule
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping, Callback
from torchsummary import summary
import pytorch_lightning as pl
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
import os

# Set seed for reproducibility
pl.seed_everything(42)


In [None]:
# Data augmentations for training
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # ImageNet normalization
])

# Validation and test transforms
val_test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
class WasteDataModuleMulti(LightningDataModule):
    def __init__(self, root_dir='datasets/combined_waste_dataset',
                 batch_size=32):
        super().__init__()
        self.root_dir = root_dir
        self.batch_size = batch_size
        self.num_classes = 9  # organic, battery, glass, metal, paper, cardboard, plastic, textiles, trash
        self.class_weights = None

    def setup(self, stage=None):
        self.train_dataset = CombinedWasteDatasetMulti(
            root_dir=self.root_dir, split='train', transform=train_transform
        )
        self.val_dataset = CombinedWasteDatasetMulti(
            root_dir=self.root_dir, split='val', transform=val_test_transform
        )
        self.test_dataset = CombinedWasteDatasetMulti(
            root_dir=self.root_dir, split='test', transform=val_test_transform
        )

        self.num_workers = os.cpu_count() - 1

        # Compute sample weights for weighted random sampling
        # Now why would we do that?
        # In imbalanced datasets, some classes may be underrepresented in the training data.
        # By assigning higher weights to these classes, we can ensure that the model pays more attention to them during training.
        # This can help improve the model's performance on minority classes and lead to a more balanced overall performance.

        # In "statistical" terms we are trying to reduce the variance of our estimator by ensuring that all classes are adequately represented in each batch.
        # In Easy english - because our dataset is imbalanced, we want to make sure that during training, the model sees enough examples from all classes.
        # otherwise the probability of batches without any samples from minority classes is high, and the model will not learn to recognize them well.

        # This approach Creates class-balanced batches for the model to train on,
        # which changes the actual data distribution seen by the model during training.


        if self.class_weights is not None:
            self.sample_weights = [self.class_weights[label].item() for _, label in self.train_dataset.data]

    def train_dataloader(self):
        if hasattr(self, 'sample_weights'):
            # Create a weighted random sampler for stratified sampling
            # replacement = True to allow sampling with replacement
            # as usual - more info in the docs:
            # https://docs.pytorch.org/docs/stable/data.html#torch.utils.data.WeightedRandomSampler
            # if replacement = False -> without replacement - when a sample index is drawn for a row, it cannot be drawn again for that row
            sampler = WeightedRandomSampler(self.sample_weights, len(self.train_dataset), replacement=True)

            return DataLoader(self.train_dataset, batch_size=self.batch_size,
                              sampler=sampler, num_workers=self.num_workers)
        else:
            return DataLoader(self.train_dataset, batch_size=self.batch_size,
                              shuffle=True, num_workers=self.num_workers)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size,
                          shuffle=False, num_workers=self.num_workers)

    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size,
                          shuffle=False, num_workers=self.num_workers)

In [None]:
class ParametricCNN(LightningModule):
    def __init__(self, num_classes=9, class_weights=None, architecture_config=None):
        super().__init__()
        self.save_hyperparameters()
        self.num_classes = num_classes

        # Default architecture config (our first, "VGG-like")
        if architecture_config is None:
            architecture_config = {
                'blocks': [
                    {'filters': 64, 'convs': 2},
                    {'filters': 128, 'convs': 2},
                    {'filters': 256, 'convs': 2},
                ],
                'dropout_conv': 0.25,
                'fc_units': 512,
                'dropout_fc': 0.5
            }

        self.architecture_config = architecture_config

        # we can start building the model now
        # this is not the most elegant way to do it, but it is straightforward and clear
        # this is also not super flexible, it does not allow us to build any possible architecture,
        # but it is not supposed to

        # almost every architecture you will see will work like that
        # just a lot of them will be more complex (ResNet, DenseNet, etc.)
        # however, the basic idea is still the same - we have a configurtion that defines the architecture
        # and we build the model based on that configuration, using simple "factory" pattern,
        # creating our network from blocks, defined in the configuration
        # this is especially true when creating a Sequential / linear stack of layers


        # Build convolutional layers
        layers = []
        in_channels = 3
        for block in architecture_config['blocks']:
            for _ in range(block['convs']):
                layers.append(nn.Conv2d(in_channels, block['filters'], kernel_size=3, padding=1))
                layers.append(nn.BatchNorm2d(block['filters']))
                layers.append(nn.ReLU())
                in_channels = block['filters']
            layers.append(nn.MaxPool2d(2, 2))
            layers.append(nn.Dropout(architecture_config['dropout_conv']))

        layers.append(nn.Flatten())

        # Compute the flatten size dynamically
        # otherwise we would have to hardcode it, which would be almost impossible to do

        # let's use a trick - "run" the model we have so far on a dummy input
        # we just have to be careful to not track gradients here
        # that is why we use torch.inference_mode()
        # TODO: what is the difference between torch.no_grad() and torch.inference_mode()?
        # This was your previous homework, so you should know the answer already, though it is technical and subtle.
        # Hint: the main difference is about creating tensor - one mode makes them impossible to use in the "grad" mode
        # TODO: read this: https://docs.pytorch.org/docs/stable/notes/autograd.html#grad-modes
        with torch.inference_mode():
            dummy_input = torch.zeros(1, 3, 224, 224)
            output = torch.tensor(dummy_input) # so we will have a tensor to save results into

            for layer in layers[:-1]:  # Exclude Flatten for now, though it doesn't matter really, Flatten does not change number of element
                                       # (TODO: try it yourself); It is just so we can train accessing any part of the network we want
                output = layer(output)
            self.flatten_size = output.numel() # numel = number of elements in the tensor

        # Add fully connected layers
        layers.append(nn.Linear(self.flatten_size, architecture_config['fc_units']))
        layers.append(nn.BatchNorm1d(architecture_config['fc_units']))
        layers.append(nn.ReLU())
        layers.append(nn.Dropout(architecture_config['dropout_fc']))
        layers.append(nn.Linear(architecture_config['fc_units'], num_classes))

        self.layers = nn.Sequential(*layers)

        # Loss function
        # we are using class weights here to address class imbalance
        # this makes sure, that misclassifying samples from minority classes
        # is penalized more than misclassifying samples from majority classes

        # The gradient steps for minority class samples are magnified, making the model learn to prioritize correctly classifying these samples.
        # The batches themselves are still drawn from the imbalanced dataset distribution.
        self.loss_fn = nn.CrossEntropyLoss(weight=class_weights) if class_weights is not None else nn.CrossEntropyLoss()


        # If we look at our code hard enough we will notice, that we are using BOTH
        # the class weights and the weighted random sampler to address the class imbalance issue.
        # Now, is it a good idea to use both of them at the same time?
        # As always - it depends. It is, unfortunately, a nuanced topic that depends on the dataset and the problem.

        # By using both we are effectively training on class-balanced batches (due to the Sampler) where the minority class samples' misclassification
        # still receives a higher penalty (due to the weighted loss). This combination can be aggressive and might
        # be useful for highly severe imbalance, but it could also lead to overfitting to the minority class if the weights are too extreme.


        # In some cases, using both can lead to better performance, as the sampler ensures that
        # the model sees a balanced representation of classes during training, while the loss function
        # emphasizes the importance of minority classes.
        # However, in other cases, it might lead to overcompensation for minority classes,
        # causing the model to perform poorly on majority classes.
        # It is often a good idea to experiment with both approaches separately and together
        # to see what works best for your specific use case.

        # TODO: experiment with using only one of these techniques at a time, and compare the results.

    def forward(self, x):
        return self.layers(x)

    def training_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = self.loss_fn(outputs, labels)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = self.loss_fn(outputs, labels)
        _, predicted = torch.max(outputs, 1)
        acc = (predicted == labels).float().mean()
        self.log('val_loss', loss)
        self.log('val_acc', acc)
        return loss

    def test_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = self.loss_fn(outputs, labels)
        _, predicted = torch.max(outputs, 1)
        acc = (predicted == labels).float().mean()
        self.log('test_loss', loss)
        self.log('test_acc', acc)
        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=1e-3)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.1)
        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": scheduler,
                "monitor": "val_loss",
            },
        }


In [None]:
# Compute class weights for imbalanced dataset
def compute_class_weights(dataset):
    labels = [label for _, label in dataset.data]
    class_counts = np.bincount(labels, minlength=9)
    total_samples = len(labels)
    class_weights = total_samples / (len(class_counts) * class_counts)

    # weights as tensor (to pass to the loss function)
    return torch.tensor(class_weights, dtype=torch.float)

In [None]:
class ConfusionMatrixCallback(Callback):
    def __init__(self, class_names):
        self.class_names = class_names
        self.val_labels = []
        self.val_preds = []

    def on_validation_epoch_start(self, trainer, pl_module):
        self.val_labels = []
        self.val_preds = []

    def on_validation_batch_end(self, trainer, pl_module, outputs, batch, batch_idx, dataloader_idx=0):
        images, labels = batch
        with torch.no_grad():
            pl_module.eval()
            images = images.to(pl_module.device)
            outputs = pl_module(images)
            _, predicted = torch.max(outputs, 1)
            self.val_labels.extend(labels.cpu().numpy())
            self.val_preds.extend(predicted.cpu().numpy())

    def on_validation_epoch_end(self, trainer, pl_module):
        cm = confusion_matrix(self.val_labels, self.val_preds)


Ok, now we can analyze what has changed in this code:
- The architecture is now defined by a configuration dictionary (architecture_config).
- The convolutional blocks are created in a loop based on the configuration.
- This allows us to easily modify the architecture by changing the configuration dictionary.

Let's try running our network first

In [None]:
# Setup data module
batch_size = 64
data_module = WasteDataModuleMulti(batch_size=batch_size)
data_module.setup()

# Compute class weights from training data
class_weights = compute_class_weights(data_module.train_dataset)
print("Class weights:", class_weights)

# Set class weights for weighted sampling
data_module.class_weights = class_weights
data_module.setup()  # Re-setup to compute sample weights

In [None]:
# Initialize model with default architecture configuration
# model = ParametricCNN()


# Initialize model with custom architecture configuration
architecture_config = {
    'blocks': [
        {'filters': 64, 'convs': 2},
        {'filters': 128, 'convs': 2},
        {'filters': 256, 'convs': 2},
    ],
    'dropout_conv': 0.25,
    'fc_units': 512,
    'dropout_fc': 0.5
}

model = ParametricCNN(architecture_config=architecture_config)

In [None]:
# Print model summary
summary(model, (3, 224, 224), device="cpu")


Or we can make it different

In [None]:
architecture_config_alt_alternative = {
    'blocks': [
        {'filters': 64, 'convs': 3},
        {'filters': 128, 'convs': 3},
        {'filters': 256, 'convs': 3},
        {'filters': 512, 'convs': 2},
    ],
    'dropout_conv': 0.3,
    'fc_units': 1024,
    'dropout_fc': 0.5
}

model = ParametricCNN(architecture_config=architecture_config_alt_alternative)

In [None]:
# Print model summary
summary(model, (3, 224, 224), device="cpu")


OK, this one "looks fine", let's try it

In [None]:
# Setup callbacks
checkpoint_callback = ModelCheckpoint(
    dirpath='checkpoints/lightning_multiclass',
    filename='best',
    save_top_k=1,
    monitor='val_loss',
    mode='min',
    save_last=True
)

early_stopping_callback = EarlyStopping(
    monitor='val_loss',
    patience=7,
    mode='min'
)

# Confusion matrix callback
class_names = list(final_classes.keys())
confusion_matrix_callback = ConfusionMatrixCallback(class_names)

In [None]:
# Setup trainer
accelerator = 'gpu' if torch.cuda.is_available() else 'cpu'
devices = list(range(torch.cuda.device_count())) if accelerator == 'gpu' else 1

# low number, just to check everything works
NUM_EPOCHS = 1

trainer = Trainer(
    max_epochs=NUM_EPOCHS,
    callbacks=[checkpoint_callback, early_stopping_callback, confusion_matrix_callback],
    accelerator=accelerator,
    devices=devices
)


In [None]:
# Train the model
trainer.fit(model, datamodule=data_module)

In [None]:
# Test the model
trainer.test(model, datamodule=data_module)

We can now try to experiment with different versions of this architecture.
Right now we do not have any intuition about how the network should work, what it should have (elements, layers, blocks etc). We can only experiment to gain knowledge and understanding.

The task is always the same - to create the best possible architecture. That means:


*   Fastest possible
*   Smallest size (memory footprint)
*   Highest/lowest metric (usually accuracy)

By changing our architecture we can try to reach balance between these points




# TASK: Experiment manually with different configurations of the network.

# TASK: Based on your knowledge implement a second architecture. This one will not be VGG-inspired, but based on AlexNET

Implement modular versions of AlexNet based on this article:
https://proceedings.neurips.cc/paper_files/paper/2012/file/c399862d3b9d6b76c8436e924a68c45b-Paper.pdf

WARNING: The original AlexNet worked on dual GPUs, so you might need to adjust the architecture.

If you want (suggestion, not necessity) you can read some more details here:
https://arxiv.org/abs/1404.5997

**Experiment with different configurations**

**Evaluate performance on the dataset**


# Manual vs automatic tuning

After experimenting with architectures we know one thing for sure - this process can be tedious. Making small changes to architecture (like number of layers, filters etc) will impact how the network behaves. However, this is a slow and boring process, that needs supervision. Fortunately there are always better ways to do this.

Therefore let us try to automate it.
We can even use what we already have installed and tested in previous classes - CometML framework.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, WeightedRandomSampler
import torchvision.transforms as transforms
from pytorch_lightning import LightningModule, Trainer, LightningDataModule
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping, Callback
from pytorch_lightning.loggers import CometLogger
from torchsummary import summary
import pytorch_lightning as pl
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
import os
import comet_ml

# Set seed for reproducibility
pl.seed_everything(42)

In [None]:

# Data augmentations for training
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # ImageNet normalization
])

# Validation and test transforms
val_test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

class WasteDataModuleMulti(LightningDataModule):
    def __init__(self, root_dir='datasets/combined_waste_dataset',
                 batch_size=32):
        super().__init__()
        self.root_dir = root_dir
        self.batch_size = batch_size
        self.num_classes = 9  # organic, battery, glass, metal, paper, cardboard, plastic, textiles, trash
        self.class_weights = None

    def setup(self, stage=None):
        self.train_dataset = CombinedWasteDatasetMulti(
            root_dir=self.root_dir, split='train', transform=train_transform
        )
        self.val_dataset = CombinedWasteDatasetMulti(
            root_dir=self.root_dir, split='val', transform=val_test_transform
        )
        self.test_dataset = CombinedWasteDatasetMulti(
            root_dir=self.root_dir, split='test', transform=val_test_transform
        )

        self.num_workers = os.cpu_count() - 1


        if self.class_weights is not None:
            self.sample_weights = [self.class_weights[label].item() for _, label in self.train_dataset.data]

    def train_dataloader(self):
        if hasattr(self, 'sample_weights'):

            sampler = WeightedRandomSampler(self.sample_weights, len(self.train_dataset), replacement=True)

            return DataLoader(self.train_dataset, batch_size=self.batch_size,
                              sampler=sampler, num_workers=self.num_workers)
        else:
            return DataLoader(self.train_dataset, batch_size=self.batch_size,
                              shuffle=True, num_workers=self.num_workers)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size,
                          shuffle=False, num_workers=self.num_workers)

    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size,
                          shuffle=False, num_workers=self.num_workers)

In [None]:
class ParametricCNN(LightningModule):
    def __init__(self, num_classes=9, class_weights=None, architecture_config=None, learning_rate=1e-3):
        super().__init__()
        self.save_hyperparameters()
        self.num_classes = num_classes
        self.learning_rate = learning_rate

        # Default architecture config (our first, "VGG-like")
        if architecture_config is None:
            architecture_config = {
                'blocks': [
                    {'filters': 64, 'convs': 2},
                    {'filters': 128, 'convs': 2},
                    {'filters': 256, 'convs': 2},
                ],
                'dropout_conv': 0.25,
                'fc_units': 512,
                'dropout_fc': 0.5
            }

        self.architecture_config = architecture_config

        # Build convolutional layers
        layers = []
        in_channels = 3
        for block in architecture_config['blocks']:
            for _ in range(block['convs']):
                layers.append(nn.Conv2d(in_channels, block['filters'], kernel_size=3, padding=1))
                layers.append(nn.BatchNorm2d(block['filters']))
                layers.append(nn.ReLU())
                in_channels = block['filters']
            layers.append(nn.MaxPool2d(2, 2))
            layers.append(nn.Dropout(architecture_config['dropout_conv']))

        layers.append(nn.Flatten())

        with torch.inference_mode():
            dummy_input = torch.zeros(1, 3, 224, 224)
            output = torch.tensor(dummy_input) # so we will have a tensor to save results into

            for layer in layers[:-1]:
                output = layer(output)
            self.flatten_size = output.numel() # numel = number of elements in the tensor

        # Add fully connected layers
        layers.append(nn.Linear(self.flatten_size, architecture_config['fc_units']))
        layers.append(nn.BatchNorm1d(architecture_config['fc_units']))
        layers.append(nn.ReLU())
        layers.append(nn.Dropout(architecture_config['dropout_fc']))
        layers.append(nn.Linear(architecture_config['fc_units'], num_classes))

        self.layers = nn.Sequential(*layers)

        # Loss function

        self.loss_fn = nn.CrossEntropyLoss(weight=class_weights) if class_weights is not None else nn.CrossEntropyLoss()

    def forward(self, x):
        return self.layers(x)

    def training_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = self.loss_fn(outputs, labels)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = self.loss_fn(outputs, labels)
        _, predicted = torch.max(outputs, 1)
        acc = (predicted == labels).float().mean()
        self.log('val_loss', loss)
        self.log('val_acc', acc)
        return loss

    def test_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = self.loss_fn(outputs, labels)
        _, predicted = torch.max(outputs, 1)
        acc = (predicted == labels).float().mean()
        self.log('test_loss', loss)
        self.log('test_acc', acc)
        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=self.learning_rate)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.1)
        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": scheduler,
                "monitor": "val_loss",
            },
        }


In [None]:
# Compute class weights for imbalanced dataset
def compute_class_weights(dataset):
    labels = [label for _, label in dataset.data]
    class_counts = np.bincount(labels, minlength=9)
    total_samples = len(labels)
    class_weights = total_samples / (len(class_counts) * class_counts)

    # weights as tensor (to pass to the loss function)
    return torch.tensor(class_weights, dtype=torch.float)


class ConfusionMatrixCallback(Callback):
    def __init__(self, class_names):
        self.class_names = class_names
        self.val_labels = []
        self.val_preds = []

    def on_validation_epoch_start(self, trainer, pl_module):
        self.val_labels = []
        self.val_preds = []

    def on_validation_batch_end(self, trainer, pl_module, outputs, batch, batch_idx, dataloader_idx=0):
        images, labels = batch
        with torch.no_grad():
            pl_module.eval()
            images = images.to(pl_module.device)
            outputs = pl_module(images)
            _, predicted = torch.max(outputs, 1)
            self.val_labels.extend(labels.cpu().numpy())
            self.val_preds.extend(predicted.cpu().numpy())

    def on_validation_epoch_end(self, trainer, pl_module):
        cm = confusion_matrix(self.val_labels, self.val_preds)



In [None]:
# Setup data module
batch_size = 64
data_module = WasteDataModuleMulti(batch_size=batch_size)
data_module.setup()

# Compute class weights from training data
class_weights = compute_class_weights(data_module.train_dataset)
print("Class weights:", class_weights)

# Set class weights for weighted sampling
data_module.class_weights = class_weights
data_module.setup()  # Re-setup to compute sample weights


# Hyperparameter Tuning Setup with CometML

We use CometML's Optimizer for automated hyperparameter search.

This allows us to search over architecture configurations efficiently.

More details are available here (for example):

https://www.comet.com/site/blog/hyperparameter-optimization-with-comet/


In [None]:
# for testing purpuses let's set this to a very low value
NUM_EPOCHS = 1


In [None]:
# Function to create and train a model with given hyperparameters
def train_model_with_params(params, data_module, class_weights, num_epochs=NUM_EPOCHS):
    # Create architecture config from params
    architecture_config = {
        'blocks': [
            {'filters': params['filters_block1'], 'convs': params['convs_block1']},
            {'filters': params['filters_block2'], 'convs': params['convs_block2']},
            {'filters': params['filters_block3'], 'convs': params['convs_block3']},
        ],
        'dropout_conv': params['dropout_conv'],
        'fc_units': params['fc_units'],
        'dropout_fc': params['dropout_fc']
    }

    # Initialize model
    model = ParametricCNN(architecture_config=architecture_config, class_weights=class_weights, learning_rate=params['learning_rate'])

    # Comet Logger for this experiment
    comet_logger = CometLogger(
            project="waste-classification-multiclass",
            name="lab4-hyperparam-tuning")

    # Callbacks
    checkpoint_callback = ModelCheckpoint(
        dirpath='checkpoints/lightning_multiclass',
        filename='best-{epoch:02d}-{val_loss:.2f}',
        save_top_k=1,
        monitor='val_loss',
        mode='min',
        save_last=True
    )

    early_stopping_callback = EarlyStopping(
        monitor='val_loss',
        patience=7,
        mode='min'
    )

    confusion_matrix_callback = ConfusionMatrixCallback(list(final_classes.keys()))

    # Trainer
    accelerator = 'gpu' if torch.cuda.is_available() else 'cpu'
    devices = [0]  # using a single GPU for hyperparameter tuning to avoid complications
                   # TODO: IF you have access to multiple GPUs, you can try and see what happens, with the CURRENT code configuration
                   # and distribution strategy set in Pytorch Lightning Trainer.

    trainer = Trainer(
        max_epochs=num_epochs,
        callbacks=[checkpoint_callback, early_stopping_callback, confusion_matrix_callback],
        accelerator=accelerator,
        devices=devices,
        logger=comet_logger
    )

    # Train
    trainer.fit(model, datamodule=data_module)
    # this will log "train_loss", "val_loss" and "val_acc"  metrics
    return trainer.callback_metrics['val_loss'].item()


    # This part is commented out for a reason - more on that below
    # # Test the model
    # trainer.test(model, datamodule=data_module)
    # # this will log "test_loss" and "test_acc" logged metrics

    # # Return the best validation loss for optimization
    # return trainer.callback_metrics['test_loss'].item()


We now must as ourselves a very important questions - which is correct here? Can we use validation loss or test loss for hyperparameter tuning?
Or, to be more precise, are we ALLOWED to use test loss for hyperparameter tuning?

**TODO: Think about this question carefully and answer yourselves**

I will give you my point of view on this question:

In general, you should NOT use test loss for hyperparameter tuning.
The test set is supposed to be a completely unseen dataset that you only use at the very end of your model development process to get an unbiased estimate of your model's performance.

If you use the test set for hyperparameter tuning, you are effectively "peeking" at the test data, using the test set to guide your model selection.

This can lead to overfitting to the test set and an overly optimistic estimate of your model's performance, not reflective of its true generalization ability.

Therefore, it is best practice to use a separate validation set for hyperparameter tuning, and only use the test set once you have finalized your model.

If you, however, have a different oppinion on this matter, I would be very interested to hear it. Never to argue/mock, but to discuss what it really means to have a "good" model evaluation strategy, as this is one of the most important parts of creating and later deploting a proper model.


We will be using CometML Optimizer for Hyperparameter Tuning
Main references:

**TODO: READ THESE LINKS THOROUGHLY**

using Comet Optimizer is an advanced topic, and to use it properly it is ALWAYS advised to read the documentation.

Truth be told - even with the documentation it can be tricky to set up properly.

But once set up, it can save you a lot of time and effort in hyperparameter tuning.

Digresion: Any hyperparameter tuning framework is "complicated" to set up properly.
Because hyperparameter tuning is inherently complicated.

You need to make sure that your training code is properly modularized,
that hyperparameters are passed correctly, that logging is set up properly, etc.

So don't be discouraged if it takes some time to get it right.
Once you have a working setup, you can reuse it for future projects.


https://www.comet.com/docs/v2/api-and-sdk/python-sdk/reference/Optimizer/

https://www.comet.com/docs/v2/guides/optimizer/quickstart/


*Source: https://www.comet.com/docs/v2/guides/optimizer/quickstart/*

Before you begin optimization, you need to choose:

    * The hyperparameters to tune.
    * The search space for each hyperparameter to tune.
    * The search algorithm (one of grid search, random search, and Bayesian optimization).

Additionally, make sure to refactor your existing model training code so that hyperparameters are defined as parametrized variables.

In [None]:

# Define hyperparameter search space
# CometML Optimizer uses this to suggest parameters

# SOOOOOOOOO MANY parameters to tune here...
# is it too much maybe? Will we even be able to train anything in reasonable time?

param_space = {
    "filters_block1": {"type": "discrete", "values": [32, 64, 128]}, # DO NOT USE INTEGER, where there is so much difference between values
    "convs_block1": {"type": "integer", "min": 1, "max": 3}, # here is ok
    "filters_block2": {"type": "discrete", "values": [64, 128, 256]},
    "convs_block2": {"type": "integer", "min": 1, "max": 3},
    "filters_block3": {"type": "discrete", "values": [128, 256, 512]},
    "convs_block3": {"type": "integer", "min": 1, "max": 3},
    "dropout_conv": {"type": "float", "min": 0.1, "max": 0.5},
    "fc_units": {"type": "discrete", "values": [256, 512, 1024]},
    "dropout_fc": {"type": "float", "min": 0.3, "max": 0.7}, # TODO: WHY NOT DISCRETE? or maybe it should be discrete????????
    "learning_rate": {"type": "float", "scalingType": "loguniform", "min": 1e-4, "max": 1e-2} # TODO: log uniform?? what is that??? Try to read about this parameter
                                                                                              # other options are: ['linear', 'uniform', 'loguniform', 'normal', 'lognormal']
}


## TODO: look at the parameters above.

Do we really need to test all of them, especially at once? If not, maybe you can trim this parameters space?


## Strategy 1: Random Search (Active)

Random search samples parameters randomly from the space.

It's simple, effective, and doesn't assume parameter relationships.

Random search is, as the name implies, randomly selecting combinations of hyperparameter values to try out.

It can be good, if we do not know much about the hyperparameter space,
or if we suspect that only a few hyperparameters have a significant impact on performance.

As with a lot of things in machine learning, doing something randomly can sometimes outperform more "intelligent" approaches.

Though Grid is still usually better than pure random guessing.

*Source: https://www.comet.com/site/blog/hyperparameter-optimization-with-comet/*

Random search offers slightly more flexibility than grid search.
Instead of exhaustively iterating through all possible combinations like in the grid search algorithm,
random search selects combinations at random from the possible parameter
values until the run is explicitly stopped or the max combinations are met.

Similar to grid search, the random algorithm does not use past experiments to inform future experiments,
but when only a small number of hyperparameters have an effect on the final model performance,
the random search can outperform grid search.

In [None]:
# https://www.comet.com/docs/v2/guides/optimizer/configure-optimizer/
config_dict = {
    "algorithm": "random",
    "spec": {
        "metric": "accuracy",
        "maxCombo": 5, # VERY important parameter - make sure you know what it does
        "gridSize": 5,
        "minSampleSize": 150,
    },
    "parameters": param_space,
    "name": "My Random Search",
    "trials": 1, # read the docs to understand what this does - might be misleading at first
}

In [None]:
opt_random = comet_ml.Optimizer(config=config_dict)

Below is our actual hyperparameter tuning loop
As you can see we are calling our training function with different parameters suggested by the optimizer
This makes it very easy to integrate hyperparameter tuning into existing training code

This is mostly the same thing as calling it manually, but we do not have to implement the tuning "strategy" ourselves
or, properly called, the "search algorithm".
This is the main point of all the frameworks for hyperparameter tuning - to avoid reinventing the wheel
But you could easily do this yourself if you wanted to.


What we are doing is called "black-box optimization", because we do not have to know anything about the function we are optimizing
we just provide inputs (hyperparameters) and get outputs (validation loss)

In [None]:
# Run the optimization
for experiment in opt_random.get_experiments():
    params = experiment.params
    val_loss = train_model_with_params(params, data_module, class_weights, num_epochs=NUM_EPOCHS)  # Short epochs for demo
    experiment.log_metric("val_loss", val_loss)

## Strategy 2: Bayesian Optimization (Active Alternative)
Bayesian optimization uses probabilistic models to suggest promising parameters.

It's more efficient than random search for continuous spaces.
It is an "intelligent" search strategy that builds a model of the hyperparameter space and uses it to select the most promising hyperparameters to evaluate next.

This approach can be more efficient than random search, especially when the hyperparameter space is large and continuous, as it focuses on areas likely to yield better performance.


*Source: https://www.comet.com/site/blog/hyperparameter-optimization-with-comet/*

Comet documentation states “the Bayes algorithm may be the best choice for most of your Optimizer uses.”

    “Bayesian optimization has been shown to obtain better results in fewer evaluations compared to grid search and random search,
    due to the ability to reason about the quality of experiments before they are run.” — Wikipedia

Bayes optimization works by iteratively evaluating a promising hyperparameter configuration
based on the current model, then updating it. The main aim of the technique is to gather observations
that reveal as much information as possible about the location of the optimum.

In [None]:
config_dict = {
    "algorithm": "bayes",
    "spec": {
        "objective": "maximize",
        "metric": "accuracy",
        "maxCombo": 5, # VERY important parameter - make sure you know what it does
        "retryLimit": 10,
        "retryAssignLimit": 10,
    },
    "parameters": param_space,
    "name": "My Bayesian Search",
    "trials": 1,
}

opt_bayes = comet_ml.Optimizer(config=config_dict)

In [None]:
# Run the optimization
for experiment in opt_bayes.get_experiments():
    params = experiment.params
    val_loss = train_model_with_params(params, data_module, class_weights, num_epochs=NUM_EPOCHS)
    experiment.log_metric("val_loss", val_loss)

Other Strategies Available in CometML:
- Grid Search: Exhaustive search over specified combinations.
  Set algorithm="grid" and provide a list of values for each parameter.

**TODO: Implement grid search yourself**

It is more widely used than random search, but not as much as Bayesian optimization.
Pretty please read the documentation to understand how to set up the parameters dictionary for grid search.
Not all the parameters exist for every search strategy!






**TODO: Is this the only way to do hyperparameter tuning with PyTorch Lightning?**

Of course not. There are many other libraries and frameworks available for hyperparameter tuning, such as Optuna, Ray Tune, etc. Each has its own strengths and weaknesses.

Even PyTorch Lightning itself has built-in support for very simple tuning:

https://lightning.ai/docs/pytorch/stable/api/lightning.pytorch.tuner.tuning.Tuner.html#lightning.pytorch.tuner.tuning.Tuner

CometML is just one option that integrates well with PyTorch Lightning and provides a user-friendly interface for tracking experiments and visualizing results.
Depending on your specific needs and preferences, you may find other tools more suitable.




## Homework Task:
based on what you have learned in this lab, implement hyperparameter tuning using Optuna

You will find more information about Optuna here:

https://optuna.org/


and here:

https://pytorch-lightning.readthedocs.io/en/stable/extensions/optuna.html


and also here:

https://optuna-integration.readthedocs.io/en/stable/index.html