## Notebook for CNN training for AE4317 Autonomous Flight of Micro Air Vehicles
This notebook contains everything needed to train a Convolutional Neural Network to be deployed on a drone. It allows maximum flexibility, giving you the opportunity the find the best architecture. To be able to keep track of all you models and see how they are performing, it is recommended to use [Weights and Biases](https://wandb.ai/), as this is already set up in the logging.

By now you should have a labeled dataset either generated using the previous notebook `dataset_generation.ipynb` or of your own.

Make sure to go through the entire notebook at least once, and try to understand what every part does. In the end you will mostly be using the parameter dashboard, as this where you change the model architecture and hyperparameters.

Uses torch lightning, link blabla

Some explanation on onnx to 2c + limitations that it brings

### Imports
Below are all the imports we need, make sure to have all of them installed!

In [None]:
from utils import calc_mean_std_dataset

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.io import read_image
from torchvision.transforms.functional import convert_image_dtype
import torchsummary
import lightning as L
from lightning.pytorch.loggers import WandbLogger
from lightning.pytorch.callbacks.early_stopping import EarlyStopping
import matplotlib.pyplot as plt
from pathlib import Path

# check device
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device}")

### Parameters Dashboard
This is your main place to tune your CNN. You can fully specify the architecture and a few other hyperparameters, such as the number of epochs and learning rate. 

In [None]:
# Training parameters
num_epochs = 100
learning_rate = 0.01
label_smoothing = 0.0 # prob 0
normalize_images = True
batch_norm = True

analyze_model_before_training = False

# CNN Architecture
architecture = {
    "n_layers": 4,
    "conv_layers": {
        "input_size": (1, 65, 15),
        "output_channel": (8, 16, 32, 64),
        "kernel_size": (3, 3, 3, 3),
        "stride": (1, 1, 1, 1),
        "padding": (1, 1, 1, 1),   
    },
    "batch_norm": batch_norm,
    "max_pool_layers": {
        "kernel_size": ((2,1), 2, 2, 2),
        "stride": ((2,1), 2, 2, 2)
    },
    "dropout_layers": {
        "p": (0.1, 0.075, 0.05, 0.025)
    },
    "fc_layer": {
        "input_size": None,
        "output_size": 3
    }
}

### Data
This dataset assumes that you have a .csv with the first column being the file name of the image and the other columns that label. If this does not apply to your dataset, make sure to update it accordingly. 

In [None]:
# Create dataset class
class DroneImagesDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        self.annotations = pd.read_csv(csv_file, skiprows=1, header=None)
        self.transform = transform

    def __getitem__(self, index):
        img_path = self.annotations.iloc[index, 0]
        # img_path = str(Path(img_path)) # TODOOOOO
        img_path = img_path.replace("\\", "/")
        image = convert_image_dtype(read_image(img_path), torch.float)
        y_label = torch.tensor(list(self.annotations.iloc[index, 1:]), dtype=torch.float32)
        
        if self.transform:
            image = self.transform(image)

        return (image, y_label)
    
    def __len__(self):
        return len(self.annotations)

We define two transforms, one for training, which is used for data augmentation and one which is used during validation and testing. Using the training data transform to augment images can be rather slow, so you might one to use that once you have a proper model and not while quickly prototyping.

In [None]:
# to try: bilinear, bicubic or nearest exact
IMAGE_TRANSFORM = transforms.Compose([
    transforms.Grayscale(),
    transforms.CenterCrop((520, 120)),
    transforms.Resize((architecture["conv_layers"]["input_size"][1], architecture["conv_layers"]["input_size"][2]), interpolation=transforms.InterpolationMode.NEAREST_EXACT),
])

TRAIN_IMAGE_TRANSFORM = transforms.Compose([
    transforms.ColorJitter(brightness=0.3, contrast=0.2, saturation=0.3, hue=0.05),
    transforms.RandomRotation(degrees=5),
    transforms.Grayscale(),
    transforms.CenterCrop((520, 120)),
    transforms.Resize((architecture["conv_layers"]["input_size"][1], architecture["conv_layers"]["input_size"][2]), interpolation=transforms.InterpolationMode.NEAREST_EXACT),
])

if normalize_images:
    # mean, std = calc_mean_std_dataset('all_images', IMAGE_TRANSFORM)
    mean, std = 0.30496492981910706, 0.1657978892326355 # (1, 65, 15)
    # mean, std = 0.3050636351108551, 0.16549718379974365 # (1, 26, 12)
    # mean, std = 0.3046516478061676, 0.1654612421989441 # (1, 40, 12)
    print(f"Mean: {mean}, Std: {std}")

    IMAGE_TRANSFORM = transforms.Compose([IMAGE_TRANSFORM,
        transforms.Normalize(mean=[mean], std=[std])
    ])

    TRAIN_IMAGE_TRANSFORM = transforms.Compose([TRAIN_IMAGE_TRANSFORM,
        transforms.Normalize(mean=[mean], std=[std])
    ])

Now we create dataloaders for our training, validation and test set.

In [None]:
val_ratio = 0.2
test_ratio = 0.1
batch_size = 128
dataset = DroneImagesDataset(csv_file='labeled_images.csv', transform=IMAGE_TRANSFORM) # TRY AND OVERFIT ??

# # count how many per class --------------> [5950. 6670. 5950.]
# labels = []
# for i in range(len(dataset)):
#     labels.append(dataset[i][1].tolist())
# labels = np.array(labels)
# print(np.sum(labels, axis=0))

# Split the dataset into training, validation, and test sets
num_samples = len(dataset)
num_val_samples = int(val_ratio * num_samples)
num_test_samples = int(test_ratio * num_samples)
num_train_samples = num_samples - num_val_samples - num_test_samples
train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(
    dataset, [num_train_samples, num_val_samples, num_test_samples]
)

# Create DataLoaders for the training, validation, and test sets
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, prefetch_factor=4, persistent_workers=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4, prefetch_factor=4, persistent_workers=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4, prefetch_factor=4, persistent_workers=True)

In [None]:
# val_ratio = 0.2
# test_ratio = 0.1
# batch_size = 128

# train_dataset = DroneImagesDataset(csv_file='labeled_images.csv', transform=TRAIN_IMAGE_TRANSFORM)
# val_dataset = DroneImagesDataset(csv_file='labeled_images.csv', transform=IMAGE_TRANSFORM)
# test_dataset = DroneImagesDataset(csv_file='labeled_images.csv', transform=IMAGE_TRANSFORM)

# indices = torch.randperm(len(train_dataset)).tolist()
# train_dataset = torch.utils.data.Subset(train_dataset, indices[:-int(len(indices)*(val_ratio+test_ratio))])
# val_dataset = torch.utils.data.Subset(val_dataset, indices[-int(len(indices)*(val_ratio+test_ratio)):-int(len(indices)*test_ratio)])
# test_dataset = torch.utils.data.Subset(test_dataset, indices[-int(len(indices)*test_ratio):])

# # Create DataLoaders for the training, validation, and test sets
# train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, prefetch_factor=2, persistent_workers=True)
# val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4, prefetch_factor=2, persistent_workers=True)
# test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4, prefetch_factor=2, persistent_workers=True)

time to check out transforms / see some data blabal

In [None]:
# # Show 3 original images and their corresponding transformed images
# fig, axs = plt.subplots(2, 3, figsize=(10, 10))
# for i in range(3):
#     dataset.transform = None
#     # first the original image in colors
#     img, label = dataset[i*2]
#     axs[0, i].imshow(img.permute(1, 2, 0))
#     axs[0, i].set_title(f"Original Image {i}")
#     # then the transformed image in grayscale
#     dataset.transform = IMAGE_TRANSFORM
#     img, label = dataset[i*2]
#     axs[1, i].imshow(img[0], cmap='gray')
#     axs[1, i].set_title(f"Transformed Image {i}")
# plt.show()

### Lightning module
The config file below saves all your hyperparameters, such that they show up in your weights and biases dashboard.

In [None]:
cfg={"epochs": num_epochs,
     "learning_rate": learning_rate,
     "label_smoothing": label_smoothing,
     "normalize_images": normalize_images,
     "architecture": architecture,
     "comments": "no train augment"
     }

The code below actually creats your model, you should not have to change anything here, but don't hesitate to experiment!

In [None]:
def add_conv_block(architecture, i):
    if i == 0:
        input_channels = architecture["conv_layers"]["input_size"][0]
    else:
        input_channels = architecture["conv_layers"]["output_channel"][i-1]

    return nn.Sequential(
        nn.Conv2d(in_channels=input_channels,
                        out_channels=architecture["conv_layers"]["output_channel"][i],
                        kernel_size=architecture["conv_layers"]["kernel_size"][i],
                        stride=architecture["conv_layers"]["stride"][i],
                        padding=architecture["conv_layers"]["padding"][i],
                        bias= not architecture["batch_norm"]),
        nn.BatchNorm2d(architecture["conv_layers"]["output_channel"][i]) if architecture["batch_norm"] else nn.Identity(),
        nn.MaxPool2d(kernel_size=architecture["max_pool_layers"]["kernel_size"][i],
                           stride=architecture["max_pool_layers"]["stride"][i]),
        nn.ReLU(inplace=True),
        nn.Dropout2d(architecture["dropout_layers"]["p"][i])
    )

class LightningCNN(L.LightningModule):
    def __init__(self, cfg):
        super().__init__()

        layers = [add_conv_block(cfg["architecture"], i) for i in range(cfg["architecture"]["n_layers"])] + [nn.Flatten()]

        self.embedder = nn.Sequential(*layers)

        with torch.no_grad():
            training = self.embedder.training
            self.embedder.eval()
            cfg["architecture"]["fc_layer"]["input_size"] = self.embedder(torch.empty(1, *cfg["architecture"]["conv_layers"]["input_size"])).size(-1)
            self.embedder.train(training)

        self.fc = nn.Linear(cfg["architecture"]["fc_layer"]["input_size"], cfg["architecture"]["fc_layer"]["output_size"])

        self.save_hyperparameters(cfg)

    def forward(self, x):
        return self.fc(self.embedder(x))

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.cross_entropy(y_hat, y, label_smoothing=self.hparams.label_smoothing)
        acc = torch.sum(torch.argmax(y_hat, dim=1) == torch.argmax(y, dim=1)) / len(y)
        self.log("train/loss", loss)
        self.log("train/acc", acc)
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.cross_entropy(y_hat, y)
        acc = torch.sum(torch.argmax(y_hat, dim=1) == torch.argmax(y, dim=1)) / len(y)
        self.log("val/loss", loss)
        self.log("val/acc", acc)
        return loss
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = F.cross_entropy(y_hat, y)
        acc = torch.sum(torch.argmax(y_hat, dim=1) == torch.argmax(y, dim=1)) / len(y)
        self.log("test/loss", loss)
        self.log("test/acc", acc)
        return loss
    
    def configure_optimizers(self):
        opt = torch.optim.Adam(self.parameters(), lr=self.hparams.learning_rate, weight_decay=1e-5)
        scheduler = torch.optim.lr_scheduler.StepLR(opt, step_size=7, gamma=0.5)
        return {"optimizer": opt, "lr_scheduler": scheduler}

In [None]:
wand_blogger = WandbLogger(project="MAV-CNN-Project")
early_stop_callback = EarlyStopping(monitor="val/acc", min_delta=0.005, patience=10, verbose=False, mode="max")

# Create the trainer
trainer = L.Trainer(max_epochs=cfg["epochs"], logger=wand_blogger, callbacks=[early_stop_callback])

# Create the model
model = LightningCNN(cfg)
torchsummary.summary(model, (1, architecture["conv_layers"]["input_size"][1], architecture["conv_layers"]["input_size"][2]))

if False:
    # perform inference 10000 times and calculate the average time
    import time

    model = LightningCNN(cfg)
    input = torch.randn(1, 1, 40, 12)

    times = []
    model.eval()
    for i in range(10000):
        start = time.time()
        model(input)
        end = time.time()
        times.append(end - start)

    print(f"Average inference time: {np.mean(times)}")

### Training

In [None]:
trainer.fit(model, train_loader, val_loader)

### Testing

In [None]:
test = trainer.test(model, test_loader)

### Save to onnx

In [None]:
# Save model to onnx
if True:
    model.eval()
    dummy_input = torch.randn(1, 1, architecture["conv_layers"]["input_size"][1], architecture["conv_layers"]["input_size"][2])
    torch.onnx.export(model, dummy_input, f"models/{wand_blogger.experiment.name}.onnx")