# Classification of images using PyTorch

## Reduction of dataset

In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchmetrics.classification import MulticlassF1Score
import torch.nn as nn
import torch.optim as optim
import copy
from PIL import Image
from pathlib import Path
from tqdm import tqdm
from datetime import datetime
import numpy as np
import optuna


# Custom import
from source.dataset_reduction import extract_reduced_dataset, check_images_size_equal
from source.cnn_model import train_model, FlexibleCNN
from source.plotting import plot_training_history, plot_random_predictions, plot_filter_weights, plot_feature_maps, plot_image




# TO DO


hyperparameter
- used optuna, needs to be explained


comment code
- done for plotting and this jupyter


Test vs pre-built model
- use pre-built model as paper to read?

architechture
- how to choose? add pooling and type to model generator


search "ML nodes UiO"
- apply for access
- 


data augmentation? already augmented?

In [None]:
# Settings for saving figures, models, etc

SAVE_FIGURES = True
SHOW_PLOT = True

SAVE_MODELS = True

# Optuna used for hyperparameter search
USE_OPTUNA = True
OPTUNA_N_TRIALS = 100


# Values for - Extract a reduced dataset for quick experiments
images_for_train_validate_test = 20   # 546 is all, extracts  smaller subset if given
# Extract only these classes
classes_reduced = ['snail', 'wasp', 'moth']



# Paths
current_dir = Path.cwd()
parent_dir = current_dir.parent
# Directory for saving figures
figures_path = parent_dir / "figures"
figures_path.mkdir(parents=True, exist_ok=True)
# Directory for saving models and summaries
models_path = parent_dir / "models"
models_path.mkdir(parents=True, exist_ok=True)

# Current timestamp for files
timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M")



In [None]:
# Extract a reduced dataset for quick experiments
dataset = extract_reduced_dataset(images_for_train_validate_test, classes_reduced)

In [None]:
# Check that images have equal size
check_images_size_equal(dataset)

## Prepare PyTorch model

In [None]:
# Create dataset and dataloaders

seed_value = 42
torch.manual_seed(seed_value)

transform = transforms.Compose([
    transforms.Resize((128,128)),  # 256,256 / 128,128 / 64,64
    transforms.ToTensor()
    ])

train_dataset = datasets.ImageFolder(root=dataset / 'train/images', transform=transform) # resize images if needed
test_dataset = datasets.ImageFolder(root=dataset / 'test/images', transform=transform) # resize images if needed
validation_dataset = datasets.ImageFolder(root=dataset / 'valid/images', transform=transform) # resize images if needed


if len(train_dataset) < 20:
    batch_size = 16
elif len(train_dataset) > 20 and len(train_dataset) < 100:
    batch_size = 32
else:
    batch_size = 64 


test_dataloader = DataLoader(test_dataset, batch_size, shuffle=True)
train_dataloader = DataLoader(train_dataset, batch_size, shuffle=True)
validation_dataloader = DataLoader(validation_dataset, batch_size, shuffle=True)


In [None]:
# Quality control of DataLoader
dataloader_verbose = True
if dataloader_verbose:
    ## Get one batch from DataLoader
    images, labels = next(iter(train_dataloader))

    print(f"Shape: {images.shape}")
    print(f'Scaled data in in range [0,1], values in loaded image are: [{images.min().item()}, {images.max().item()}]')
    print(f'Classes are: {train_dataset.class_to_idx}')
    #print(f'Targets are: {train_dataset.targets}')

In [None]:
# Defining CNN model in PyTorch using nn.Module

input_size_img = tuple(images.shape[1:])  # (channels, height, width)
num_classes = torch.unique(torch.tensor(train_dataset.targets)).numel()

print(f'Images channel and size: {input_size_img}\nNumber of classes: {num_classes}')

In [None]:
# Flexible CNN

"""
model_flexible = FlexibleCNN(
    input_size=input_size_img,
    num_classes=num_classes,
    conv_layers=[(32, 3), (64, 3), (128, 3)],
    fc_layers=[256, 128],
    activation=nn.ReLU,    # Custom activation
    dropout_fc=0.3,             # Dropout in FC layers
    dropout_conv=0.1,           # Dropout in conv layers
    use_batchnorm=True,
    pool_type="avg",            # Use AvgPool instead of MaxPool
    global_pool="max"           # Use AdaptiveMaxPool for final pooling
)

"""

In [None]:
if USE_OPTUNA:

    def objective(trial):
        # --- Hyperparameter suggestions ---
        """
        # Sample vaues within range
        learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)
        dropout_fc = trial.suggest_float('dropout_fc', 0.1, 0.5)
        dropout_conv = trial.suggest_float('dropout_conv', 0.0, 0.3)
        activation_choice = trial.suggest_categorical('activation', [nn.ReLU, nn.LeakyReLU])
        batch_size = trial.suggest_int('batch_size', 16, 128)
        conv_layers = [
            (trial.suggest_int('conv1', 16, 64), 3),
            (trial.suggest_int('conv2', 32, 128), 3),
            (trial.suggest_int('conv3', 64, 256), 3)
        ]
        fc_layers = [
            trial.suggest_int('fc1', 128, 512),
            trial.suggest_int('fc2', 64, 256)
        ]
        """
        # Use suggested categorical values
        learning_rate = trial.suggest_categorical('learning_rate', [0.0001, 0.001, 0.01, 0.1])
        dropout_fc = trial.suggest_categorical('dropout_fc', [0.1, 0.5])
        dropout_conv = trial.suggest_categorical('dropout_conv', [0.0, 0.3])
        activation_name = trial.suggest_categorical('activation', ['ReLU', 'LeakyReLU'])
        activation_choice = nn.ReLU if activation_name == 'ReLU' else nn.LeakyReLU
        batch_size = trial.suggest_categorical('batch_size', [16, 32, 64, 128])
        conv_layers = [
            (trial.suggest_categorical('conv1', [16, 32, 64]), 3),
            (trial.suggest_categorical('conv2', [32, 64, 128]), 3),
            (trial.suggest_categorical('conv3', [64, 128, 256]), 3)
        ]
        fc_layers = [
            trial.suggest_categorical('fc1', [128, 256, 512]),
            trial.suggest_categorical('fc2', [64, 128, 256])
        ]

        # --- Model ---
        model = FlexibleCNN(
            input_size=input_size_img,
            num_classes=num_classes,
            conv_layers=conv_layers,
            fc_layers=fc_layers,
            activation=activation_choice,
            dropout_fc=dropout_fc,
            dropout_conv=dropout_conv,
            use_batchnorm=True,
            pool_type="avg",
            global_pool="max",
            show_summary=False
        )

        # --- Optimizer & Loss ---
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        loss_fn = nn.CrossEntropyLoss()

        # --- DataLoader ---
        train_dl = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        valid_dl = torch.utils.data.DataLoader(validation_dataset, batch_size=batch_size)

        # --- Training loop with pruning ---
        num_epochs = 30
        patience = 5
        best_val_loss = float('inf')
        epochs_no_improve = 0

        for epoch in range(num_epochs):
            # Train one epoch
            model.train()
            train_loss = 0
            for X_batch, y_batch in train_dl:
                optimizer.zero_grad()
                preds = model(X_batch)
                loss = loss_fn(preds, y_batch)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()
            train_loss /= len(train_dl)

            # Validate
            model.eval()
            val_loss = 0
            with torch.no_grad():
                for X_val, y_val in valid_dl:
                    preds = model(X_val)
                    loss = loss_fn(preds, y_val)
                    val_loss += loss.item()
            val_loss /= len(valid_dl)

            # Report intermediate result to Optuna
            trial.report(val_loss, step=epoch)

            # Check pruning
            if trial.should_prune():
                raise optuna.TrialPruned()

            # Early stopping logic
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                epochs_no_improve = 0
            else:
                epochs_no_improve += 1
                if epochs_no_improve >= patience:
                    break

        return best_val_loss

    # --- Run Optuna study ---
    
    db_path = models_path / "optuna_study.db"
    optuna_path = f"sqlite:///{db_path}"  # Convert to string for Optuna

    study = optuna.create_study(direction='minimize', pruner=optuna.pruners.MedianPruner(), 
        #storage='sqlite:///optuna_study.db',  # Save trials to SQLite
        storage=optuna_path,  # Save trials to SQLite
        load_if_exists=True)
    study.optimize(objective, n_trials=OPTUNA_N_TRIALS)

    print("Best hyperparameters:", study.best_params)

In [None]:
# Visualize and save Optuna figures
if USE_OPTUNA:
    from optuna.visualization import plot_optimization_history, plot_param_importances

    # Generate plots
    fig_history = plot_optimization_history(study)
    fig_importance = plot_param_importances(study)

    # Show in notebook
    if SHOW_PLOT:
        fig_history.show()
        fig_importance.show()

    if SAVE_FIGURES:
        path_history = Path(figures_path / f'{timestamp}_optuna_history_number_images{images_for_train_validate_test}.png')
        path_importance = Path(figures_path / f'{timestamp}_optuna_importance_number_images{images_for_train_validate_test}.png')

        # Save as PNG
        fig_history.write_image(path_history)
        fig_importance.write_image(path_importance)

In [None]:
# Define model with Optuna or manual

if USE_OPTUNA:
    # Update model with best hyperparameters from Optuna
    best_params = study.best_params

    model_flexible = FlexibleCNN(
        input_size=input_size_img,
        num_classes=num_classes,
        conv_layers=[
            (best_params['conv1'], 3),
            (best_params['conv2'], 3),
            (best_params['conv3'], 3)
        ],
        fc_layers=[
            best_params['fc1'],
            best_params['fc2']
        ],
        activation=best_params['activation'],   
        dropout_fc=best_params['dropout_fc'],
        dropout_conv=best_params['dropout_conv'],
        use_batchnorm=True,
        pool_type="avg",
        global_pool="max"
    )
else: # choosing settings manually
    model_flexible = FlexibleCNN(
    input_size=input_size_img,
    num_classes=num_classes,
    conv_layers=[(32, 3), (64, 3), (128, 3)],
    fc_layers=[256, 128],
    activation=nn.ReLU,    # Custom activation
    dropout_fc=0.3,             # Dropout in FC layers
    dropout_conv=0.1,           # Dropout in conv layers
    use_batchnorm=True,
    pool_type="avg",            # Use AvgPool instead of MaxPool
    global_pool="max"           # Use AdaptiveMaxPool for final pooling
)





In [None]:
# Model summary

from torchinfo import summary
model_summary = summary(model_flexible, input_size=images.shape, col_names=["input_size", "output_size", "num_params"])

# Alternative model summary
#from torchsummary import summary
#summary(model_flexible, input_size=input_size_img)

In [None]:
# Parameters for - train model
learning_rate = 0.001
optimizer = torch.optim.Adam(model_flexible.parameters(), lr=learning_rate)
loss_fn = nn.CrossEntropyLoss()
epochs = 30

# Train with early stopping + scheduler
history = train_model(
    model=model_flexible,
    num_epochs=epochs,
    train_dl=train_dataloader,
    valid_dl=validation_dataloader,
    loss_fn=loss_fn,
    optimizer=optimizer,
    device='cpu',
    verbose=True,
    patience=5
)

In [None]:
# Evaluate test dataset
model_flexible.eval()
device = 'cpu'

f1_metric = MulticlassF1Score(num_classes=num_classes, average='macro').to(device)

correct = 0
total = 0
f1_score_total = 0

with torch.no_grad():
    for x_batch, y_batch in test_dataloader:
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)

        preds = model_flexible(x_batch)  # shape: [batch_size, num_classes]
        correct += (preds.argmax(dim=1) == y_batch).sum().item()
        total += y_batch.size(0)

        # Update F1 metric
        f1_score_total += f1_metric(preds, y_batch).item()

test_accuracy = correct / total
avg_f1_score = f1_score_total / len(test_dataloader)

print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Average F1-score: {avg_f1_score:.4f}")


In [None]:
# Write model and summary to file
if SAVE_MODELS:
    path_summary = Path(models_path / f'{timestamp}_model_summary_number_images{images_for_train_validate_test}_test_accuracy{test_accuracy:.4f}_avgf1_{avg_f1_score:.4f}.txt')
    with open(path_summary, "w", encoding="utf-8") as f:
        f.write(str(model_summary))
    
    path_model = Path(models_path / f'{timestamp}_model_summary_number_images{images_for_train_validate_test}_test_accuracy{test_accuracy:.4f}_avgf1_{avg_f1_score:.4f}_weights.pth')
    torch.save(model_flexible.state_dict(), path_model)

In [None]:
# Plot training and validation loss and accuracy

plot_training_history(history, images_for_train_validate_test, path=figures_path, save_figures=SAVE_FIGURES, show_plot=SHOW_PLOT)

In [None]:
# Plot predictions and true labels

n_samples = 12
if len(test_dataset) > n_samples:
    # Auto layout (2x6 for 12 samples)
    plot_random_predictions(model_flexible, test_dataset, num_samples=n_samples, path=figures_path, time= timestamp, number_images=images_for_train_validate_test, rows=2, cols=6, device="cpu", save_figures=SAVE_FIGURES, show_plot=SHOW_PLOT)

n_samples = 4
if len(test_dataset) > n_samples:
    # Custom layout (2x2 for 4 samples)
    plot_random_predictions(model_flexible, test_dataset, num_samples=n_samples, path=figures_path, time= timestamp, number_images=images_for_train_validate_test, rows=2, cols=2, device="cpu", save_figures=SAVE_FIGURES, show_plot=SHOW_PLOT)

In [None]:
# Plot filter weights (first convolutional layer)

plot_filter_weights(model_flexible, figures_path, timestamp, images_for_train_validate_test, rows=2, cols=2, channel=0, save_figures=SAVE_FIGURES, show_plot=SHOW_PLOT) # channel as in RGB


In [None]:
# Plot feature maaps for a sample image


# Random sample from test dataset
random_index = np.random.randint(0, len(test_dataset))
sample_img, _ = test_dataset[random_index]

# Plot sample image
plot_image(sample_img, figures_path, timestamp, images_for_train_validate_test, title='Image sample from test dataset', save_figures=SAVE_FIGURES, show_plot=SHOW_PLOT)

# Plot all Conv2D
for i, layer in enumerate(model_flexible.features):
    if isinstance(layer, nn.Conv2d):
        plot_feature_maps(model_flexible, i,  sample_img, figures_path, timestamp, images_for_train_validate_test, layers_to_show=[0,3], num_maps=4, rows=2, cols=2, cmap='gray', save_figures=SAVE_FIGURES, show_plot=SHOW_PLOT)

