# Train example notebook

This notebook is used to implement the training of a neural network for classification of `Cloud`, `Edge`, `Good` images. <br> It is advisable to use this notebook to get practice and debug your code. To speed up the execution, once you are ready, you should move to a scripted version.

## 1. - Imports

Select `CUDA_VISIBLE_DEVICES` to the `Graphics Proceesing Unit (GPU)` index that you want to use to enable the use of GPU.

In [1]:
import os 
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"  
os.environ["CUDA_VISIBLE_DEVICES"]="0" # GPU index

Enabling autoreload of different packages.

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
import torch 
import sys
sys.path.insert(1, os.path.join("..", "data"))
sys.path.insert(1, os.path.join("..", "utils"))
from torchvision import datasets, transforms
from plot_utils import plot_image
from torch.utils.data import DataLoader
from torch.utils.data import random_split

## 2. - Datasets

### 2.1 - Creating datasets

Now we read the images from the target directory `path_data`. Set `path_data` to the directory containing the `Cloud`, `Edge`, `Good` subfolders.  Moreover, it will automatically split the total dataset into the train, cross validation and test splits by using a pseudo-random splitting algorithm. You can reproduce the split by specifying the variable `seed`. **NB**:
- The train split contains 70% of the whole images.
- The valid splits contains 15% of the whole images.
- The test splits contains 15% of the whole images.<br>**YOU MUST NOT CHANGE THE TEST SPLIT SIZE!!!**

The datase loader defines the transforms to be applied to each batch of imageis being loaded. These can be used to augment the dataset and limited teh amount of input paramters to the network by for exampling resizing the image or converting it to greyscale.

In [4]:
# Path to the data folder (update the variable to your path).
path_data=os.path.join("..", "data")
# Seed value
seed=22
torch.manual_seed(seed)

from torch.utils.data import DataLoader
from torch.utils.data import random_split
from torchvision.transforms import v2

import numpy as np

image_width = 1024
image_height = 1942
scale_factor = 0.1

valid_size = 0.15
test_size = 0.15

dataset_mean = [0.2391, 0.4028, 0.4096]
dataset_std = [0.2312, 0.3223, 0.3203]

transform = transforms.Compose([v2.ToImage(),
                                v2.Resize((int(256), int(256))),
                                v2.RandomHorizontalFlip(p=0.5),
                                v2.RandomVerticalFlip(p=0.5),
                                v2.ToDtype(torch.float32, scale=True),
                                v2.Grayscale(num_output_channels=1),
                                v2.Normalize((dataset_mean),(dataset_std))
                                ])

dataset = datasets.ImageFolder(root=path_data, 
                                 transform=transform)

n_val = int(np.floor(valid_size * len(dataset)))
n_test = int(np.floor(test_size * len(dataset)))
n_train = len(dataset) - n_val - n_test

train_ds, val_ds, test_ds = random_split(dataset, [n_train, n_val, n_test])

### 2.2. - Create data loaders.

The next lines will create a dataloader. A data loader is used to break the dataset into batches of a size `batch_size`. <br> This is useful to ensure that your dataset will fit into your memory and to create a "stochastic" implementation of gradient descent. <br> For more information, please, check: [data loader](https://www.educative.io/answers/what-is-pytorch-dataloader).<br>
Specify `batch_size` (**Hint**: use powers of 2. Typical values are between 8 and 64).

In [5]:
batch_size=32
# Train loader
train_loader = DataLoader(train_ds, batch_size=batch_size, pin_memory=False, shuffle=True)
# Cross validation data loader
valid_loader = DataLoader(val_ds, batch_size=batch_size, pin_memory=False, shuffle=True)
# Test data loader
test_loader = DataLoader(test_ds, batch_size=batch_size, pin_memory=False, shuffle=True)

#0 - cloud
#1 - edge
#2 - good

unique, counts = np.unique(torch.tensor([train_ds.dataset.targets[i] for i in train_ds.indices]), return_counts=True)
print("Train split: ", dict(zip(unique, counts)))

unique, counts = np.unique(torch.tensor([test_ds.dataset.targets[i] for i in test_ds.indices]), return_counts=True)
print("Test split: ", dict(zip(unique, counts)))

unique, counts = np.unique(torch.tensor([val_ds.dataset.targets[i] for i in val_ds.indices]), return_counts=True)
print("Validation split: ", dict(zip(unique, counts)))

Train split:  {0: 61, 1: 73, 2: 80}
Test split:  {0: 13, 1: 12, 2: 20}
Validation split:  {0: 16, 1: 12, 2: 17}


Now, it is your turn! Add your code below to load a Neural Network model, select optimizers, learning rate and perform training. <br>
Good luck!

## 3 - Training

### 3.1 - Defining the model and optimzer

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data
import torch.nn.functional as F
import torchvision
from torchvision import transforms
from PIL import Image, ImageFile
from datetime import datetime
import matplotlib.pyplot as plt
import torchvision.models as models

    
model = models.mobilenet_v3_small(pretrained=True)
    
import torch.optim as optim
optimizer = torch.optim.SGD(model.parameters(), lr=0.003)

# Initialize or load history
training_history = {"train_loss": [], "val_loss": [], "accuracy": []}

# Assuming model, optimizer, train_loader, val_loader are defined
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)



MobileNetV3(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
      (2): Hardswish()
    )
    (1): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=16, bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
        )
        (1): SqueezeExcitation(
          (avgpool): AdaptiveAvgPool2d(output_size=1)
          (fc1): Conv2d(16, 8, kernel_size=(1, 1), stride=(1, 1))
          (fc2): Conv2d(8, 16, kernel_size=(1, 1), stride=(1, 1))
          (activation): ReLU()
          (scale_activation): Hardsigmoid()
        )
        (2): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(1, 1), 

### 3.1 - The training function

In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data
import torch.nn.functional as F
import torchvision
from torchvision import transforms
from PIL import Image, ImageFile
from datetime import datetime
import matplotlib.pyplot as plt
import torchvision.models as models

    
model = models.mobilenet_v3_small(pretrained=True)
    
import torch.optim as optim
optimizer = torch.optim.SGD(model.parameters(), lr=0.003)

# Initialize or load history
training_history = {"train_loss": [], "val_loss": [], "accuracy": []}

# Assuming model, optimizer, train_loader, val_loader are defined
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
import torch
import matplotlib.pyplot as plt

def train(model, optimizer, loss_fn, train_loader, val_loader, epochs, device, history):
    for epoch in range(1, epochs + 1):
        training_loss = 0.0
        valid_loss = 0.0
        model.train()

        for batch in train_loader:
            optimizer.zero_grad()
            inputs, targets = batch
            inputs = inputs.to(device)
            targets = targets.to(device)
            output = model(inputs)
            loss = loss_fn(output, targets)
            loss.backward()
            optimizer.step()
            training_loss += loss.data.item() * inputs.size(0)

        training_loss /= len(train_loader.dataset)

        model.eval()
        num_correct = 0
        num_examples = 0

        for batch in val_loader:
            inputs, targets = batch
            inputs = inputs.to(device)
            output = model(inputs)
            targets = targets.to(device)
            loss = loss_fn(output, targets)
            valid_loss += loss.data.item() * inputs.size(0)
            correct = torch.eq(torch.max(F.softmax(output, dim=1), dim=1)[1], targets)
            num_correct += torch.sum(correct).item()
            num_examples += correct.shape[0]

        valid_loss /= len(val_loader.dataset)

        history["train_loss"].append(training_loss)
        history["val_loss"].append(valid_loss)
        history["accuracy"].append(num_correct / num_examples)

        print(f'Epoch: {epoch}, Training Loss: {training_loss:.2f}, Validation Loss: {valid_loss:.2f}, Accuracy: {num_correct / num_examples:.2f}')

    return history

def plot_history(history):
    plt.style.use('fivethirtyeight')
    plt.figure(figsize=(12, 6))

    plt.plot(history["train_loss"], 'r-', label='Training Loss')
    plt.plot(history["val_loss"], 'g-', label='Validation Loss')
    plt.plot(history["accuracy"], 'b-', label='Accuracy')

    plt.xlabel('Epochs')
    plt.ylabel('Metrics')
    plt.title('Training Progress')
    plt.legend()
    plt.show()

def test(model, test_loader, loss_fn, num_runs=5, device="cpu"):
    model.eval()
    total_accuracy = 0.0
    total_loss = 0.0

    for run in range(num_runs):
        num_correct = 0
        num_examples = 0

        for batch in test_loader:
            inputs, targets = batch
            inputs = inputs.to(device)
            targets = targets.to(device)

            with torch.no_grad():
                output = model(inputs)
                loss = loss_fn(output, targets)
                total_loss += loss.item() * inputs.size(0)

            correct = torch.eq(torch.max(F.softmax(output, dim=1), dim=1)[1], targets)
            num_correct += torch.sum(correct).item()
            num_examples += correct.shape[0]

        accuracy = num_correct / num_examples
        total_accuracy += accuracy

    average_loss = total_loss / (num_runs * len(test_loader.dataset))
    average_accuracy = total_accuracy / num_runs

    print('Average Test Loss: {:.2f}, Average Test Accuracy: {:.2f}'.format(average_loss, average_accuracy))

In [16]:
import os
import torch
import matplotlib.pyplot as plt
from ipywidgets import widgets, HBox, VBox, FloatText, Layout, Checkbox
from IPython.display import display, clear_output
from torchvision import models

# Global variables
global model
model = None  # Initialize model as None
global saved_models
saved_models = []  # List to store names of saved models

# Functions
def get_available_models():
    """Return a list of available models from torchvision."""
    return [name for name in dir(models) if name.islower() and not name.startswith("__")]

def get_saved_models():
    """Return a sorted list of saved models."""
    saved_models = [filename[:-4] for filename in os.listdir('.') if filename.endswith(('.pth', '.png'))]
    return sorted(set(saved_models))  # Sort and remove duplicates

def load_new_model(change):
    """Load a new model based on the selection and pretrained status."""
    global model
    model_name = model_selector.value
    pretrained = pretrained_checkbox.value
    
    if model_name in get_available_models():
        model = getattr(models, model_name)(pretrained=pretrained)
        model.to(device)
        clear_output(wait=True)
        with output_area:
            print(f"Loaded model: {model_name} (Pretrained: {pretrained})")
        update_saved_models_list()
    else:
        with output_area:
            print(f"Error: {model_name} not found in available models")

def train_model(b):
    """Train the model and update the plot."""
    global training_history
    epochs = epoch_slider.value
    with output_area:
        clear_output(wait=True)
        training_history = train(model, optimizer, torch.nn.CrossEntropyLoss(), train_loader, valid_loader, epochs, device, training_history)
        update_plot()

def plot_history_button_clicked(b):
    """Update the plot."""
    with output_area:
        clear_output(wait=True)
        update_plot()

def test_model(b):
    """Test the model."""
    num_runs = runs_slider.value
    with output_area:
        clear_output(wait=True)
        test(model, test_loader, torch.nn.CrossEntropyLoss(), num_runs=num_runs, device=device)

def load_model(change):
    model_name = model_dropdown.value
    model_path = os.path.join('models', f'{model_name}.pth')
    
    # Load the model
    model.load_state_dict(torch.load(model_path))
    
    with output_area:
        print(f"Model '{model_name}' loaded.")

def save_model(b):
    model_name = model_name_text.value
    model_dir = 'models'
    os.makedirs(model_dir, exist_ok=True)
    model_path = os.path.join(model_dir, f'{model_name}.pth')
    
    torch.save(model.state_dict(), model_path)
    
    # Update the list of available models
    update_model_dropdown()
    
    with output_area:
        print(f"Model saved as {model_path}")

def load_saved_model(change):
    global model
    selected_model_name = saved_model_selector.value
    
    try:
        # Check if the selected model name is a valid saved model
        if selected_model_name not in get_saved_models():
            raise ValueError(f"Invalid model name: {selected_model_name}")
        
        model_dir = 'models'
        model_path = os.path.join(model_dir, f'{selected_model_name}.pth')
        
        # Load the model from the specified directory
        loaded_model = torch.load(model_path)
        loaded_model.to(device)
        
        # Check if the loaded model is not None
        if loaded_model is not None:
            model = loaded_model
            clear_output(wait=True)
            with output_area:
                print(f"Loaded saved model: {selected_model_name} from {model_path}")
        else:
            raise ValueError(f"Error loading saved model: {selected_model_name}. Loaded model is None.")
    except Exception as e:
        with output_area:
            print(f"Error loading saved model: {selected_model_name}. {str(e)}")


def update_saved_models_list():
    """Update the list of saved models."""
    global saved_models
    saved_models = get_saved_models()
    saved_model_selector.options = saved_models

def update_plot(ax=None, y_limit=None):
    if ax is None:
        fig, ax = plt.subplots(figsize=(12, 6))
    ax.plot(training_history["train_loss"], 'r-', label='Training Loss')
    ax.plot(training_history["val_loss"], 'g-', label='Validation Loss')
    ax.plot(training_history["accuracy"], 'b-', label='Accuracy')
    ax.set_xlabel('Epochs')
    ax.set_ylabel('Metrics')
    ax.set_title('Training Progress')
    ax.legend()
    if y_limit is not None:
        ax.set_ylim(bottom=0, top=y_limit)
    
    # If ax is provided, show the existing figure
    if ax is not None:
        plt.show()

# Widgets
model_selector = widgets.Dropdown(options=get_available_models(), value='mobilenet_v3_small', description='Select Model:')
pretrained_checkbox = widgets.Checkbox(value=True, description='Pretrained')
load_model_button = widgets.Button(description='Load New Model')
load_model_button.on_click(load_new_model)

saved_model_selector = widgets.Dropdown(options=get_saved_models(), value=None, description='Select Saved Model:')
load_saved_model_button = widgets.Button(description='Load Saved Model')
load_saved_model_button.on_click(load_saved_model)

model_name_text = widgets.Text(value='model', description='Model Name:')
save_model_button = widgets.Button(description='Save Model')
save_model_button.on_click(save_model)

epoch_slider = widgets.IntSlider(value=1, min=5, max=100, step=1, description='Epochs:')
train_button = widgets.Button(description='Train Model')
train_button.on_click(train_model)

y_limit_text = FloatText(value=1.0, description='Y Limit:', layout=Layout(width='80px'))
plot_button = widgets.Button(description='Plot History')
plot_button.on_click(plot_history_button_clicked)

plot_name_text = widgets.Text(value='plot', description='Plot Name:')
save_plot_button = widgets.Button(description='Save Plot')
save_plot_button.on_click(save_plot)  # Implement save_plot function if needed

test_button = widgets.Button(description='Test Model')
test_button.on_click(test_model)

output_area = widgets.Output()

# UI Layout
ui = VBox([
    HBox([model_selector, pretrained_checkbox, load_model_button]),
    widgets.HTML(value='<hr>'),
    HBox([saved_model_selector, load_saved_model_button]),
    HBox([model_name_text, save_model_button]),
    widgets.HTML(value='<hr>'),
    HBox([epoch_slider, train_button]),
    HBox([y_limit_text, plot_button]),
    HBox([plot_name_text, save_plot_button]),
    output_area
])

display(ui)


VBox(children=(HBox(children=(Dropdown(description='Select Model:', index=45, options=('_api', '_meta', '_util…