**Taxonomic Classification with ResNet-101 and Random Forest**

**Author**: Elisa Paperini, Nevio Dubbini

**License**: CC-BY-SA 4.0

**Year**: 2024 (last version)

**Description**

This script implements a taxonomic classification model using a pre-trained ResNet-101 network with IMAGENET1K_V2 weights.
The final fully connected layer is replaced with a custom module:

- Fully connected layer → Batch normalization → ReLU activation → Dropout
- Final fully connected layer

To enhance performance, a Random Forest classifier was integrated to classify the CNN-extracted features and cross-validation was applied. A grid search was used to optimize hyperparameters.

# Set the environment

In [None]:
# Import standard libraries
import copy  # Used for creating deep copies of objects
import numpy as np  # Library for numerical computations
import time  # Module for measuring execution time
import os  # Provides operating system dependent functionalities
from tempfile import TemporaryDirectory  # Creates temporary directories for file handling

# Import libraries for Random Forest classification (from scikit-learn)
from sklearn.metrics import classification_report  # Generates detailed classification reports
from sklearn.ensemble import RandomForestClassifier  # Implements the Random Forest algorithm
from sklearn.metrics import accuracy_score  # Computes classification accuracy
from sklearn import metrics  # Collection of metrics for model evaluation
from sklearn.model_selection import train_test_split, KFold, GridSearchCV  # Splits data into training and test sets, K-fold, grisearch

# Import PyTorch and TorchVision for deep learning
import torch  # Main PyTorch library
import torch.nn as nn  # Neural network layers
import torch.nn.functional as F  # Functional operations for neural networks
import torch.backends.cudnn as cudnn  # Backend optimizations for CUDA
import torch.utils.data  # Data loading utilities
import torch.optim as optim  # Optimization algorithms
from torch.optim import lr_scheduler  # Learning rate scheduling
from torchsummary import summary  # Model summary utility

import torchvision  # Image processing and pre-trained models
import torchvision.transforms as transforms  # Data transformations for image preprocessing
from torchvision import datasets, models  # Pre-built datasets and models
from torchvision.transforms.transforms import ColorJitter  # Color augmentation for images
from torchvision.transforms.functional import perspective  # Perspective transformation for images

# Import libraries for image processing and visualization
import matplotlib.pyplot as plt  # Plotting utilities
import seaborn as sns  # Statistical data visualization
from PIL import Image  # Image processing with Python Imaging Library (PIL)
from PIL import ImageFile  # Handles truncated images
ImageFile.LOAD_TRUNCATED_IMAGES = True  # Prevents errors caused by incomplete image files

# Import pandas and CSV utilities for handling structured data
import pandas as pd  # Data manipulation and analysis
import csv  # CSV file handling

In [None]:
# Checking for device availability (GPU or CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # Use CUDA if available, otherwise default to CPU

# Clear CUDA cache to free up memory
torch.cuda.empty_cache()

# Print memory usage summary (uncomment if needed)
# print(torch.cuda.memory_summary(device=None, abbreviated=False))

# Display the selected device
device

# Data preprocessing

In [None]:
# Data transformations for training and validation

# Transformations applied to training data (includes augmentations for better generalization)
data_transforms_t = transforms.Compose([
                    transforms.Resize(size=(586,850)), # Resize images to 586x850 pixels
                    transforms.RandomRotation(10), # Randomly rotate images by ±10 degrees
                    transforms.ColorJitter(brightness=(0.8, 1.2), contrast=(0.8, 1.2), saturation=0, hue=0.1), # Adjust brightness, contrast, and hue
                    transforms.GaussianBlur(kernel_size=(3, 3), sigma=(0.1, 0.5)), # Apply slight blurring for noise reduction
                    transforms.ToTensor(), # Convert image to PyTorch tensor and scale pixel values to [0,1]
                    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # Normalize with ImageNet mean and std
                    ])

# Transformations applied to validation data (no augmentations, only resizing and normalization)
data_transforms_v = transforms.Compose([
                    transforms.Resize(size=(586,850)), # Resize images to 586x850 pixels
                    transforms.ToTensor(), # Convert image to PyTorch tensor
                    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # Normalize using ImageNet statistics
                    ])

In [None]:
# Define the data directory (change with your path)
data_dir = 'C:\\Users\\Elisa\\Desktop\\dataset_ossa\\bones_detection_species'

# Define the batch size (max limit set to 8 to prevent CUDA Out of Memory errors)
batch_size_real = 8 # If set too high, it may cause CUDA OOM (Out of Memory)

# Define paths for training and validation datasets
training_dir = os.path.join(data_dir, 'train') # Path to training dataset
validation_dir =  os.path.join(data_dir, 'val') # Path to validation dataset

# Load datasets using ImageFolder (applies transformations defined earlier)
training_dset = torchvision.datasets.ImageFolder(training_dir,data_transforms_t) # Training dataset with augmentations
validation_dset = torchvision.datasets.ImageFolder(validation_dir,data_transforms_v) # Validation dataset (only normalization)

# Create DataLoaders for training and validation
training_loader = torch.utils.data.DataLoader(training_dset, batch_size=batch_size_real,
                                             shuffle=True, num_workers=2) # Enable shuffling for training
validation_loader = torch.utils.data.DataLoader(validation_dset, batch_size=batch_size_real,
                                             shuffle=True, num_workers=2) # Enable shuffling for validation

# Extract class names from datasets
class_names_tr = training_dset.classes # Training set class names
class_names_va = validation_dset.classes # Validation set class names

# Create an iterator for the training DataLoader
dataiter = iter(training_loader) # Allows retrieving batches from the DataLoader

# CNN Model definition

In [None]:
#  Model definition, useful for potential debugging

# Load a pre-trained ResNet-101 model with default weights
model_ft = models.resnet101(weights='DEFAULT')

# Get the number of input features for the fully connected (fc) layer
num_ftrs = model_ft.fc.in_features

# Replace the default fully connected layer with a new one matching the number of classes
model_ft.fc = nn.Linear(num_ftrs, len(class_names_tr))

# Move the model to the selected device (GPU or CPU)
model = model_ft.to(device)

# Define a custom fully connected module for classification
class CustomModule(nn.Module):
    """
    Custom classification head for ResNet-101.

    Args:
      num_ftrs (int): Number of input features.
      num_classes (int): Number of output classes.
    """
    def __init__(self, num_ftrs, num_classes):
        super(CustomModule, self).__init__()
        self.layer1 = nn.Linear(num_ftrs, 512) # First fully connected layer (reduces features to 512)
        self.norm = nn.BatchNorm1d(512) # Batch normalization for stabilization
        self.relu = nn.ReLU() # ReLU activation function
        self.dropout = nn.Dropout(0.5) # Dropout layer (50%) to prevent overfitting
        self.layer2 = nn.Linear(512, num_classes) # Final fully connected layer (maps to class outputs)

    def forward(self, x):
        """
        Forward pass of the custom module.

        Args:
          x (Tensor): Input tensor.

        Returns:
          Tensor: Output logits for classification.
        """
        x = self.layer1(x)
        x = self.norm(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.layer2(x)
        return x

    def get_layer(self, layer_name):
        """
        Retrieve a specific layer by name (useful for debugging).

        Args:
          layer_name (str): Name of the layer to retrieve.

        Returns:
          nn.Module or None: The requested layer if it exists, otherwise None.
        """
        return getattr(self, layer_name, None)


# Replace ResNet's final classification head with the custom module
model.fc = CustomModule(num_ftrs, len(class_names_tr)).to(device)

In [None]:
# Print the model architecture
print(model)

In [None]:
# Define a function to reset model weights before each training cycle
def init_weights(m):
    """
    This function iterates through each layer of the model and resets its weights before training.
    It should be called before training the model, specifically before `model.train()`.

    Usage:
      model.apply(init_weights)  # Resets all model weights

    Args:
      m (nn.Module): A model layer to be checked and reinitialized.
    """
    if isinstance(m, nn.Embedding):
        # Reinitialize weights for embedding layers with normal distribution
        nn.init.normal_(m.weight, mean=0.0, std=0.1)

    if isinstance(m, nn.Linear):
        # Reinitialize weights for fully connected (linear) layers with Xavier initialization
        nn.init.normal_(m.weight, mean=0.0, std=np.sqrt(1 / m.in_features))
        if m.bias is not None:
            nn.init.zeros_(m.bias) # Set biases to zero

    if isinstance(m, nn.Conv1d):
        # Reinitialize weights for 1D convolutional layers with a variance-scaled normal distribution
        nn.init.normal_(m.weight, mean=0.0, std=np.sqrt(4 / m.in_channels))
        if m.bias is not None:
            nn.init.zeros_(m.bias) # Set biases to zero

In [None]:
def reinitialize_layers(module):
    """
    Reinitialize the weights of the given module using Xavier initialization.

    This function resets the weights of convolutional (`Conv2d`) and fully connected (`Linear`) layers
    to ensure proper initialization before training, which can help improve model convergence.

    Args:
      module (nn.Module): The layer to be reinitialized.

    Usage:
      model.apply(reinitialize_layers)  # Resets weights for all applicable layers
    """
    if isinstance(module, nn.Conv2d) or isinstance(module, nn.Linear):
        nn.init.xavier_uniform_(module.weight) # Apply Xavier uniform initialization
        if module.bias is not None:
            module.bias.data.fill_(0.01) # Set bias values to a small constant

In [None]:
# Hyperparameter Grid Search and Model Training
import itertools # Used to iterate over all hyperparameter combinations

criterion = nn.CrossEntropyLoss() # Define loss function
epoch = 50 # Number of training epochs

# Define hyperparameter grid
batch_size = [4, 32, 64] # Small batch size to avoid memory issues
learning_rate = [0.01, 0.001, 0.0001] # Learning rate for optimization
momentum = [0.40, 0.70, 0.90] # Momentum for the optimizer

# Initialize lists to store training and validation statistics
summary_loss_train = [] # Store training loss per epoch
summary_acc_train = [] # Store training accuracy per epoch
summary_loss_val = [] # Store validation loss per epoch
summary_acc_val = [] # Store validation accuracy per epoch

best_accuracy = 0.0 # Track the best validation accuracy achieved
best_hyperparameters = {} # Store the best hyperparameter combination

best_model_wts = copy.deepcopy(model.state_dict()) # Save the best model weights
best_acc = 0.0 # Best validation accuracy

since = time.time() # Track training start time

# Create a dataframe to store hyperparameter results
df_hyperparameters = pd.DataFrame(columns=['epoca', 'learning_rate', 'batch_size', 'momentum', 'training_acc', 'validation_acc'])

# Create a CSV file to store hyperparameter search results
i_train = 0
fields = ['epoca', 'learning_rate', 'batch_size', 'momentum', 'training_acc', 'validation_acc']

with open('hyperparameters_grid.csv', 'w', newline='') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames = fields)
    writer.writeheader()
    csvfile.close()

# Model Training Loop with Hyperparameter Grid Search
for lr, batch_size, momentum in itertools.product(learning_rate, batch_size, momentum):

    # Initialize a new optimizer with the current hyperparameters
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=1e-4)
    scheduler = lr_scheduler.StepLR(optimizer, step_size = 6, gamma = 0.1) # Reduce learning rate every 6 epochs

    # Reinitialize model weights before training
    model.apply(init_weights)

    for e in range(epoch):
        i_train += 1
        ### TRAINING PHASE ###
        model.train() # Set model to training mode
        loss_current_train = 0.0 # Track training loss
        acc_current_train = 0.0 # Track training accuracy

        for inputs, labels in training_loader:
            model = model.to(device) # Ensure model is on the correct device (GPU or CPU)
            inputs, labels = inputs.to(device), labels.to(device)  # Move data to the selected device

            outputs = model(inputs) # Forward pass
            optimizer.zero_grad() # Reset gradients
            loss = criterion(outputs, labels) # Compute loss

            loss.backward() # Backpropagation
            optimizer.step() # Update model parameters

            _, preds = torch.max(outputs, 1) # Get predicted class
            loss_current_train += loss.item() * inputs.size(0) # Accumulate training loss
            acc_current_train += torch.sum(preds == labels.data) # Count correct predictions

        epoch_acc = acc_current_train.float() / len(training_dset) # Compute training accuracy

        # Adjust learning rate dynamically
        scheduler.step()

        ### VALIDATION PHASE ###
        model.eval() # Set model to evaluation mode
        loss_current_val = 0.0 # Track validation loss
        acc_current_val = 0.0 # Track validation accuracy

        with torch.no_grad(): # Disable gradient computation during validation
            for val_inputs, val_labels in validation_loader:
                val_inputs, val_labels = val_inputs.to(device), val_labels.to(device)  # Move data to device

                val_outputs = model(val_inputs) # Forward pass
                _, val_preds = torch.max(val_outputs, 1) # Get predicted class
                val_loss = criterion(val_outputs, val_labels) # Compute validation loss

                loss_current_val += val_loss.item() * val_inputs.size(0) # Accumulate validation loss
                acc_current_val += torch.sum(val_preds == val_labels.data) # Count correct predictions

        # Update best accuracy and hyperparameters
        if acc_current_val > best_accuracy:
            best_accuracy = acc_current_val / len(validation_dset)  # Compute validation accuracy
            best_hyperparameters = {'learning_rate': lr, 'batch_size': batch_size, 'momentum': momentum}

        ### STORE STATISTICS ###
        epoch_loss = loss_current_train / len(training_dset)  # Compute training loss
        summary_loss_train.append(epoch_loss)

        val_epoch_loss = loss_current_val / len(validation_dset)  # Compute validation loss
        summary_loss_val.append(val_epoch_loss)

        epoch_acc = acc_current_train.float() / len(training_dset)  # Compute training accuracy
        summary_acc_train.append(epoch_acc)

        val_epoch_acc = acc_current_val.float() / len(validation_dset)  # Compute validation accuracy
        summary_acc_val.append(val_epoch_acc)

        # Save the best model if the accuracy improves
        if val_epoch_acc > best_acc:
            best_acc = val_epoch_acc
            best_model_wts = copy.deepcopy(model.state_dict())

        # Print training and validation performance per epoch
        print('Epoca:', (e+1))
        print('Training:   Loss {:.4f}, Acc {:.4f}'.format(epoch_loss, epoch_acc.item()))
        print('Validation: Loss {:.4f}, Acc {:.4f}'.format(val_epoch_loss, val_epoch_acc.item()))
        print("Best Hyperparameters:", best_hyperparameters) # Stampa i migliori iperparametri e l'accuratezza corrispondente
        print("Best Accuracy:", best_accuracy)

        # Create vector with 5 elements: 3 of the grid + training accuracy and validation accuracy.
        epoca_to_vect = e+1
        tr_acc_to_vect = '{:.2%}'.format(epoch_acc.item())
        vl_acc_to_vect = '{:.2%}'.format(val_epoch_acc.item())
        lr_to_vect = best_hyperparameters['learning_rate']
        bs_to_vect = best_hyperparameters['batch_size']
        mom_to_vect = best_hyperparameters['momentum']

        vector_hyper = [epoca_to_vect, lr_to_vect, bs_to_vect, mom_to_vect, tr_acc_to_vect, vl_acc_to_vect]

        # Store hyperparameter results every 10 epochs
        if i_train % 10 == 0:
            df_hyperparameters.loc[len(df_hyperparameters.index)] = vector_hyper

            # Append results to CSV file
            with open('hyperparameters_grid.csv', 'a', newline='') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerow(vector_hyper)
                csvfile.close()

    # Save the best model after training completion
    torch.save(model.state_dict(),'best_checkpoint.model')

# Compute total training time
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))

# Attach Random Forest

In [None]:
# Disable gradient computation for efficiency (no need to track gradients during validation)
with torch.no_grad():
    for val_inputs, val_labels in validation_loader: # Iterate over validation batches

        # Move input data and labels to the correct device (GPU or CPU)
        val_inputs = val_inputs.to(device)
        val_labels = val_labels.to(device)

        # Perform forward pass (get model predictions)
        val_outputs = model(val_inputs)

        # Get predicted class (highest probability)
        _, val_preds = torch.max(val_outputs, 1)

        # Compute validation loss
        val_loss = criterion(val_outputs, val_labels)

        # Accumulate total validation loss (scaled by batch size)
        loss_current_val += val_loss.item() * val_inputs.size(0)

        # Count correct predictions for accuracy calculation
        acc_current_val += torch.sum(val_preds == val_labels.data)

## Random Forest hyperparameters optimization methods

In [None]:
# List to store extracted CNN features and corresponding labels
cnn_features_train = []
labels_train = []

# Function to register a hook on layer1 (feature extraction)
def extract_layer1_features():
    """
    Hook function to extract features from CUSTOM LAYER 1 during forward pass.
    The extracted features are appended to the global list cnn_features_train.
    """
    global cnn_features_train # Ensure features are stored globally
    def hook(module, input, output):
        cnn_features_train.extend(output.cpu().detach().numpy()) # Store extracted features
    return hook

# Register the hook on CUSTOM LAYER 1
handle = model.fc.layer1.register_forward_hook(extract_layer1_features())

# Set model to evaluation mode (no dropout, batch norm uses running stats)
model.eval()

# Extract features from the training set
with torch.no_grad(): # Disable gradient computation for efficiency
    for inputs, labels in training_loader: # Iterate over training dataset batches
        inputs = inputs.to(device) # Move inputs to GPU/CPU
        labels = labels.to(device) # Move labels to GPU/CPU

        _ = model(inputs)# Forward pass (triggers hook function)

        labels_train.extend(labels.cpu().numpy()) # Store corresponding labels

# Remove the registered hook after feature extraction is complete
handle.remove()

# Convert extracted features and labels to numpy arrays for further processing
X_train = np.array(cnn_features_train) # Feature matrix
y_train = np.array(labels_train) # Label vector

In [None]:
# Assuming cnn_features_train and labels_train are extracted features and labels
X = X_train # Feature matrix (CNN-extracted features)
y = y_train # Label vector

# Define the hyperparameter grid for Random Forest
param_grid = {
    'n_estimators': [100, 200, 500],  # Number of trees in the forest
    'max_depth': [5, 10, 15],  # Maximum depth of each tree
    'min_samples_split': [2, 5, 10],  # Minimum samples required to split an internal node
    'min_samples_leaf': [1, 2, 4],  # Minimum samples required at a leaf node
    'max_features': ['auto', 'sqrt']  # Number of features to consider for best split
}

# Initialize the Random Forest classifier with class balancing
rf = RandomForestClassifier(class_weight='balanced') # Balances classes if they are imbalanced

# Setup GridSearchCV with k-Fold cross-validation (k=5)
cv = KFold(n_splits=5, shuffle=True, random_state=42) # Define cross-validation strategy
grid_search = GridSearchCV(estimator=rf, param_grid=param_grid, cv=cv, scoring='accuracy') # Grid search with accuracy metric

# Fit the model using grid search
grid_search.fit(X, y)

# Retrieve the best hyperparameters and the corresponding accuracy
best_params = grid_search.best_params_ # Optimal hyperparameter combination
best_score = grid_search.best_score_ # Best cross-validation accuracy

# Print the best hyperparameters and accuracy
print("Best Hyperparameters:", best_params)
print("Best CV Accuracy: {:.2%}".format(best_score)) # Print accuracy as percentage

In [None]:
# Clear CUDA memory cache to free up unused GPU memory
torch.cuda.empty_cache()

# Define layers to be evaluated (ResNet backbone + custom classification head)
layers_to_evaluate = {
    'resnet_layer1': model_ft.layer1,  # First ResNet block
    'resnet_layer2': model_ft.layer2,  # Second ResNet block
    'resnet_layer3': model_ft.layer3,  # Third ResNet block
    'resnet_layer4': model_ft.layer4,  # Fourth ResNet block
    'resnet_avgpool': model_ft.avgpool,  # Global average pooling layer
    'custom_linear1': model_ft.fc.get_layer('layer1'),  # First fully connected layer in custom head
    'custom_bn1': model_ft.fc.get_layer('norm'),  # Batch normalization layer
    'custom_relu': model_ft.fc.get_layer('relu'),  # ReLU activation function
    'custom_dropout': model_ft.fc.get_layer('dropout'),  # Dropout layer
    'custom_linear2': model_ft.fc.get_layer('layer2'),  # Final fully connected output layer
}

# Define hyperparameter grid for Random Forest tuning
param_grid = {
    'n_estimators': [100, 200, 500],  # Number of trees in the forest
    'max_depth': [5, 10, 15],  # Maximum depth of trees
    'min_samples_split': [2, 5, 10],  # Minimum samples required to split a node
    'min_samples_leaf': [1, 2, 4],  # Minimum samples required at a leaf node
    'max_features': ['auto', 'sqrt']  # Number of features considered for best split
}

In [None]:
!pip install h5py

In [None]:
# Idea for testing different layers for Random Forest feature extraction
# Uses h5py for disk storage due to limited VRAM

import h5py # Library for efficient disk-based data storage

def extract_features(model, dataloader, layer, device):
    """
    Extract features from a specific layer of the model using forward hooks.

    Args:
      model (torch.nn.Module): The neural network model.
      dataloader (torch.utils.data.DataLoader): DataLoader for processing the dataset.
      layer (torch.nn.Module): The specific layer to extract features from.
      device (str): The device to run inference on (CPU or GPU).

    Returns:
      np.ndarray: Extracted features.
      np.ndarray: Corresponding labels.
    """
    features = []
    labels = []

    # Hook function to capture layer outputs
    def hook(module, input, output):
        # Flatten the output to 2D (samples x features)
        output_flat = output.view(output.size(0), -1) # Flatten output to 2D (samples x features)
        features.append(output_flat.cpu().detach().numpy()) # Store features in CPU memory

    handle = layer.register_forward_hook(hook) # Register hook on the layer

    model.eval()  # Set model to evaluation mode

    with torch.no_grad(): # Disable gradient computation for efficiency
        for inputs, batch_labels in dataloader:
            inputs = inputs.to(device) # Move inputs to the selected device
            model(inputs) # Forward pass to trigger the hook
            labels.extend(batch_labels.numpy()) # Store labels

    handle.remove() # Remove the hook after feature extraction

    return np.concatenate(features), np.array(labels) # Convert to NumPy arrays

# Define path for saving extracted features
save_dir = 'F:\\ArchAIDE_nn\\test'
os.makedirs(save_dir, exist_ok=True) # Create directory if it doesn't exist

# Extract and save features for each selected layer
for layer_name, layer in layers_to_evaluate.items():
    print(f"Processing {layer_name}...")

    # Extract features from training and validation sets
    train_features, train_labels = extract_features(model_ft, training_loader, layer, device)
    val_features, val_labels = extract_features(model_ft, validation_loader, layer, device)

    # Save features to disk using h5py to avoid VRAM overuse
    train_features_path = os.path.join(save_dir, f'train_features_{layer_name}.h5')
    val_features_path = os.path.join(save_dir, f'val_features_{layer_name}.h5')

    with h5py.File(train_features_path, 'w') as h5file:
        h5file.create_dataset('features', data=train_features) # Save training features
        h5file.create_dataset('labels', data=train_labels) # Save training labels

    with h5py.File(val_features_path, 'w') as h5file:
        h5file.create_dataset('features', data=val_features) # Save validation features
        h5file.create_dataset('labels', data=val_labels) # Save validation labels

# Evaluate different layers with Random Forest
best_accuracy = 0 # Track best accuracy
best_layer = None # Store the best-performing layer

for layer_name in layers_to_evaluate.keys():
    # Load features from disk
    with h5py.File(os.path.join(save_dir, f'train_features_{layer_name}.h5'), 'r') as h5file:
        train_features = h5file['features'][:]
        train_labels = h5file['labels'][:]

    with h5py.File(os.path.join(save_dir, f'val_features_{layer_name}.h5'), 'r') as h5file:
        val_features = h5file['features'][:]
        val_labels = h5file['labels'][:]

    # Initialize Random Forest classifier with predefined hyperparameters
    rf_classifier = RandomForestClassifier(max_depth=15, n_estimators=200, min_samples_leaf=2, min_samples_split=2, criterion='gini', class_weight='balanced')

    # Train the Random Forest classifier
    rf_classifier.fit(train_features, train_labels)

    # Make predictions on the validation set
    val_predictions = rf_classifier.predict(val_features)
    val_accuracy = accuracy_score(val_labels, val_predictions) # Compute accuracy

    print(f"Layer {layer_name}: Validation Accuracy: {val_accuracy:.2%}")

    # Update best accuracy and best-performing layer
    if val_accuracy > best_accuracy:
        best_accuracy = val_accuracy
        best_layer = layer_name

# Print the best-performing layer for feature extraction
print(f"The best layer for feature extraction is: {best_layer} with an accuracy of {best_accuracy:.2%}")

In [None]:
# Idea for testing different layers for Random Forest feature extraction with Grid Search
# Uses h5py for disk-based processing due to limited VRAM

#takes way too long
import h5py
from sklearn.model_selection import KFold, GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import numpy as np

def extract_features(model, dataloader, layer, device):
    """
    Extract features from a specified layer of the model using forward hooks.

    Args:
      model (torch.nn.Module): The trained neural network model.
      dataloader (torch.utils.data.DataLoader): DataLoader for processing the dataset.
      layer (torch.nn.Module): The specific layer to extract features from.
      device (str): The device to run inference on (CPU or GPU).

    Returns:
      np.ndarray: Extracted feature vectors.
      np.ndarray: Corresponding labels.
    """
    features = []
    labels = []

    # Hook function to capture layer outputs
    def hook(module, input, output):
        # Flatten the output to 2D (samples x features)
        output_flat = output.view(output.size(0), -1) # Flatten output to 2D (samples x features)
        features.append(output_flat.cpu().detach().numpy()) # Store features on CPU memory

    handle = layer.register_forward_hook(hook) # Register hook on the layer

    model.eval() # Set model to evaluation mode
    with torch.no_grad(): # Disable gradient computation for efficiency
        for inputs, batch_labels in dataloader:
            inputs = inputs.to(device) # Move inputs to the selected device
            model(inputs) # Forward pass triggers the hook
            labels.extend(batch_labels.numpy()) # Store corresponding labels

    handle.remove() # Remove the hook after feature extraction

    return np.concatenate(features), np.array(labels) # Convert lists to NumPy arrays

# Define path for saving extracted features
save_dir = 'F:\\ArchAIDE_nn\\test'
os.makedirs(save_dir, exist_ok=True) # Create directory if it doesn't exist

# Track best-performing layer and parameters
best_overall_accuracy = 0
best_overall_layer = None
best_overall_params = None

# Loop through different layers and evaluate their extracted features
for layer_name, layer in layers_to_evaluate.items():
    print(f"Processing {layer_name}...")
    # Extract features for training and validation
    train_features, train_labels = extract_features(model_ft, training_loader, layer, device)
    val_features, val_labels = extract_features(model_ft, validation_loader, layer, device)

    # Define the hyperparameter grid for Random Forest
    param_grid = {
        'n_estimators': [100, 200, 500],  # Number of trees
        'max_depth': [5, 10, 15],  # Tree depth
        'min_samples_split': [2, 5, 10],  # Minimum samples to split an internal node
        'min_samples_leaf': [1, 2, 4],  # Minimum samples required at a leaf node
        'max_features': ['auto', 'sqrt']  # Number of features considered for best split
    }

    # Initialize Random Forest classifier
    rf = RandomForestClassifier(class_weight='balanced') # Adjusts class imbalance automatically

    # Setup GridSearchCV with k-Fold cross-validation (k=5)
    cv = KFold(n_splits=5, shuffle=True, random_state=42) # Define cross-validation strategy
    grid_search = GridSearchCV(estimator=rf, param_grid=param_grid, cv=cv, scoring='accuracy') # Grid search with accuracy metric

    # Fit the model using Grid Search
    grid_search.fit(train_features, train_labels)

    # Retrieve best hyperparameters and accuracy for this layer
    best_params = grid_search.best_params_
    best_score = grid_search.best_score_

    print(f"Layer {layer_name} - Best Hyperparameters: {best_params}")
    print(f"Layer {layer_name} - Best CV Accuracy: {best_score:.2%}")

    # Update best layer if accuracy is the highest so far
    if best_score > best_overall_accuracy:
        best_overall_accuracy = best_score
        best_overall_layer = layer_name
        best_overall_params = best_params

# Print the best layer for feature extraction
print(f"The best layer for feature extraction is: {best_overall_layer} with an accuracy of {best_overall_accuracy:.2%} and parameters {best_overall_params}")

## CNN + Random Forest training

In [None]:
import itertools

criterion = nn.CrossEntropyLoss() # Define the loss function
epoch = 100 # Number of training epochs

# Define hyperparameter grid
batch_size = [16] # Batch size for training
learning_rate = [0.001, 0.01] # Learning rate options
momentum = [0.95] # Momentum for SGD optimizer

# Initialize lists for tracking training and validation statistics
summary_loss_train = []
summary_acc_train = []
summary_loss_val = []
summary_acc_val = []

best_accuracy = 0.0
best_hyperparameters = {}

best_model_wts = copy.deepcopy(model.state_dict()) # Store best model weights
best_acc = 0.0 # Store best validation accuracy

since = time.time() # Track training start time

i_train = 0

# Create DataFrame to store hyperparameter results
df_hyperparameters = pd.DataFrame(columns=['epoca', 'learning_rate', 'batch_size', 'momentum', 'training_acc', 'validation_acc'])

# Create a CSV file for storing hyperparameter search results
fields = ['epoca', 'learning_rate', 'batch_size', 'momentum', 'training_acc', 'validation_acc', 'training_acc_rf', 'validation_acc_rf']

with open('hyperparameters_grid.csv', 'w', newline='') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames = fields)
    writer.writeheader()
    csvfile.close()

# Train model with different hyperparameter combinations
for lr, batch_size, momentum in itertools.product(learning_rate, batch_size, momentum):

    # Initialize optimizer with current hyperparameters
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=1e-4)
    scheduler = lr_scheduler.StepLR(optimizer, step_size = 6, gamma = 0.1) # Learning rate decay every 6 epochs

    # Reset model weights before training
    model.apply(init_weights)
    for e in range(epoch):

        i_train += 1
        ### TRAINING PHASE ###
        model.train() # Set model to training mode
        loss_current_train = 0.0
        acc_current_train = 0.0

        for inputs, labels in training_loader:
            model = model.to(device) # Ensure model is on the correct device
            inputs = inputs.to(device)

            labels = labels.to(device)
            outputs = model(inputs) # Forward pass
            optimizer.zero_grad() # Reset gradients
            loss = criterion(outputs, labels) # Compute loss
            loss.backward() # Backpropagation
            optimizer.step() # Update model weights

            _, preds = torch.max(outputs, 1) # Get predicted class
            loss_current_train += loss.item() * inputs.size(0)
            acc_current_train += torch.sum(preds == labels.data)

        epoch_acc = acc_current_train.float() / len(training_dset)

        # Update learning rate scheduler
        scheduler.step()


        ### CNN VALIDATION PHASE ###
        model.eval()
        loss_current_val = 0.0
        acc_current_val = 0.0

        with torch.no_grad(): # Disable gradient computation
            for val_inputs, val_labels in validation_loader:

                val_inputs = val_inputs.to(device)
                val_labels = val_labels.to(device)

                val_outputs = model(val_inputs)
                _, val_preds = torch.max(val_outputs, 1)
                val_loss = criterion(val_outputs, val_labels)

                loss_current_val += val_loss.item() * inputs.size(0)
                acc_current_val += torch.sum(val_preds == val_labels.data)

        # Track best validation accuracy
        if acc_current_val > best_accuracy:
            best_accuracy = acc_current_val / len(validation_dset)
            best_hyperparameters = {'learning_rate': lr, 'batch_size': batch_size, 'momentum': momentum}

        ### STORE TRAINING STATISTICS ###
        epoch_loss = loss_current_train / len(training_dset)
        summary_loss_train.append(epoch_loss)

        epoch_acc = acc_current_train.float() / len(training_dset)
        summary_acc_train.append(epoch_acc)

        val_epoch_loss = loss_current_val / len(validation_dset)
        summary_loss_val.append(val_epoch_loss)

        val_epoch_acc = acc_current_val.float() / len(validation_dset)
        summary_acc_val.append(val_epoch_acc)


        # Store best model weights if accuracy improves
        if val_epoch_acc > best_acc:
            best_acc = val_epoch_acc
            best_model_wts = copy.deepcopy(model.state_dict())

        print('Epoca:', (e+1))
        print('Training:   Loss {:.4f}, Acc {:.4f}'.format(epoch_loss, epoch_acc.item()))
        print('Validation: Loss {:.4f}, Acc {:.4f}'.format(val_epoch_loss, val_epoch_acc.item()))
        print("Best Hyperparameters:", best_hyperparameters) # Stampa i migliori iperparametri e l'accuratezza corrispondente
        print("Best Accuracy:", best_accuracy) # c'è qualcosa che non va nella best accuracy

        ### RANDOM FOREST FEATURE EXTRACTION ###
        # Extract CNN features for training set
        cnn_features_train = []
        labels_train = []

        with torch.no_grad():
            for inputs, labels in training_loader:

                model = model.to(device)
                inputs = inputs.to(device)

                labels = labels.to(device)
                outputs = model(inputs)

                features_train = model(inputs).cpu().detach().numpy()
                cnn_features_train.extend(features_train)
                labels_train.extend(labels.cpu().detach().numpy())


        # Extract CNN features for validation set
        cnn_features_val = []
        labels_val = []

        with torch.no_grad():
            for inputs, labels in validation_loader:

                model = model.to(device)
                inputs = inputs.to(device)
                labels = labels.to(device)
                outputs = model(inputs)

                features_val = model(inputs).cpu().detach().numpy()
                cnn_features_val.extend(features_val)
                labels_val.extend(labels.cpu().detach().numpy())

        # Train a Random Forest classifier on extracted CNN features
        rf_classifier = RandomForestClassifier(
            max_depth=7, n_estimators=100, criterion='gini', class_weight='balanced'
        )
        rf_classifier.fit(cnn_features_train, labels_train)

        # Evaluate Random Forest on training set
        predictions_rf_train = rf_classifier.predict(cnn_features_train)
        accuracy_rf_train = accuracy_score(labels_train, predictions_rf_train)
        print("Random Forest train accuracy: {:.2%}".format(accuracy_rf_train))

        confusion_mat_train = metrics.confusion_matrix(labels_train, predictions_rf_train)
        print(confusion_mat_train)

        # Evaluate Random Forest on validation set
        predictions_rf = rf_classifier.predict(cnn_features_val)
        accuracy_rf = accuracy_score(labels_val, predictions_rf)
        print("Random Forest accuracy: {:.2%}".format(accuracy_rf))

        confusion_mat = metrics.confusion_matrix(labels_val, predictions_rf)
        print(confusion_mat)

        ### STORE RESULTS ###
        vector_hyper = [
            e+1, best_hyperparameters['learning_rate'], best_hyperparameters['batch_size'],
            best_hyperparameters['momentum'], "{:.2%}".format(epoch_acc.item()),
            "{:.2%}".format(val_epoch_acc.item()), "{:.2%}".format(accuracy_rf_train),
            "{:.2%}".format(accuracy_rf)
        ]

        # Append results to DataFrame every 10 epochs
        if i_train % 10 == 0:
            try:
                new_row_df = pd.DataFrame([vector_hyper], columns=['epoca', 'learning_rate', 'batch_size', 'momentum', 'training_acc', 'validation_acc', 'training_acc_rf', 'validation_acc_rf'])
                df_hyperparameters = pd.concat([df_hyperparameters, new_row_df], ignore_index=True)
            except:
                print("concat failed")

        # Append results to CSV file
            with open('hyperparameters_grid.csv', 'a', newline='') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerow(vector_hyper)
                csvfile.close()

    # Save best model weights after training completion
    torch.save(model.state_dict(),'best_checkpoint.model')

# Compute total training time
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))