<a href="https://colab.research.google.com/github/acost976/Raw_SAR_Pylon_detection/blob/main/Classifier_IAR_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import torch, torchvision
import torch.nn as nn
from torch.utils.data import DataLoader, random_split
from torchvision import models, transforms
from torch.utils.data import Dataset
import torchvision.datasets as datasets
import torch.optim as optim
import torch.utils.data.dataloader as dataloader

print("torch:", torch.__version__)
print("torchvision:", torchvision.__version__)
from tqdm.notebook import trange, tqdm

from PIL import Image
import os
import time
import random
import warnings

import matplotlib.pyplot as plt
import numpy as np

torch: 2.8.0+cu126
torchvision: 0.23.0+cu126


La classe ProcessedSARImageDatasetlegge immagini .png da directory strutturate come:


```
root_dir/
  Integri/
      Integri_subfolder1/
      Integri_subfolder2/
  Rimossi/
      Rimossi_subfolder1/
  Abbattuti/
      Abbattuti_subfolder1/
```


Le immagini sono in formato .png a 3 canali, dove il primo rappresenta il valore reale, il secondo il valore complesso e il terzo canale vuoto

Permette inoltre di:
1. selezionare specifiche classi o sottocartelle,
2. applicare trasformazioni torchvision,
3. opzionalmente mescolare le etichette (in caso si volesse fare un sanity check).

In [13]:
class ProcessedSARImageDataset(Dataset):
    """
    Dataset personalizzato per immagini SAR (Synthetic Aperture Radar) processate,
    organizzate in sottocartelle per classe.

    Parameters
    ----------
    root_dir : str
        Percorso alla directory principale che contiene le classi.
    transform : callable, optional
        Trasformazioni da applicare a ciascuna immagine (es. `transforms.Compose([...])`).
    shuffle_labels : bool, default=False
        Se True, mescola l'ordine di immagini ed etichette (utilizzato per sanity check).
    selected_classes : list of str, optional
        Lista di nomi di classi da includere (default: ['Integri', 'Rimossi', 'Abbattuti']).
    selected_subfolders : list of str, optional
        Lista di sottocartelle per ciascuna classe (default: [''] usa la root di ogni classe).

    Attributes
    ----------
    image_paths : list of str
        Percorsi completi alle immagini trovate.
    labels : list of int
        Etichette numeriche corrispondenti alle classi.
    classes : list of str
        Nomi delle classi considerate nel dataset.
    """

    def __init__(self, root_dir, transform=None, shuffle_labels=False,
                 selected_classes=None, selected_subfolders=None):
        """
        Inizializza il dataset e popola la lista di immagini ed etichette.
        """
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []

        # Define the class names and their corresponding labels
        if selected_classes is not None:
            self.classes = selected_classes
        else:
            self.classes = ['Integri', 'Rimossi', 'Abbattuti']

        if selected_subfolders is None:
            selected_subfolders = ['']  # Use the root directory for each class if no subfolders are specified

        for label, class_name in enumerate(self.classes):
            for subfolder in selected_subfolders:
                # Construct the full path to the subfolder within the class directory
                class_dir = os.path.join(root_dir, class_name, class_name + subfolder)
                print(f"Processing data from: {class_dir}")

                if not os.path.isdir(class_dir):
                    print(f"Warning: Class subfolder not found: {class_dir}")
                    continue

                for filename in os.listdir(class_dir):
                    if filename.endswith('.png'):
                        self.image_paths.append(os.path.join(class_dir, filename))
                        self.labels.append(label)

        if shuffle_labels:
            combined = list(zip(self.image_paths, self.labels))
            random.shuffle(combined)
            self.image_paths, self.labels = zip(*combined)
            print("Training labels and image paths have been shuffled.")

    def __len__(self):
        """Lunghezza del dataset"""
        return len(self.image_paths)

    def __getitem__(self, idx):
        """
        Restituisce un'immagine e la relativa etichetta.
        """
        img_path = self.image_paths[idx]
        label = self.labels[idx]

        image = Image.open(img_path)
        image_tensor = transforms.ToTensor()(image)

        # Convert back to PIL to allow further transforms
        image = transforms.ToPILImage()(image_tensor)

        # Apply transforms if any
        if self.transform:
            image = self.transform(image)

        # Ensure we only keep two channels
        if image_tensor.shape[0] == 2:
            image_tensor = image_tensor
        elif image_tensor.shape[0] == 3:
            ch3 = image_tensor[2]
            if not torch.allclose(ch3, torch.zeros_like(ch3), atol=1e-6):
                warnings.warn("Third channel is not empty", UserWarning)
            image_tensor = image_tensor[:2]
        else:
            warnings.warn(f"Unexpected number of channels ({image_tensor.shape[0]}).", UserWarning)
            image_tensor = image_tensor[:2]

        return image_tensor, label




Wrapper di training per un modello PyTorch: gestisce ottimizzatore, salvataggio/caricamento checkpoint, loop di training/val/test e logging di metriche.

In [16]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils import data as dataloader
from tqdm import tqdm


class ModelTrainer(nn.Module):
    def __init__(
        self,
        model,
        device,
        loss_fun,
        batch_size,
        learning_rate,
        save_dir,
        model_name,
        start_from_checkpoint: bool = False
    ):
        """
        Inizializza il trainer e imposta parametri, modello e ottimizzatore.

        Parameters
        ----------
        model : nn.Module
            Il modello neurale vero e proprio che vogliamo allenare.
        device : torch.device or str
            Il device su cui eseguire il training (ad es. "cuda" o "cpu").
        loss_fun : callable
            Funzione di loss che accetta (predictions, targets) e ritorna uno scalar Tensor.
        batch_size : int
            Dimensione del batch da usare nei DataLoader.
        learning_rate : float
            Learning rate per l'ottimizzatore.
        save_dir : str
            Directory dove salvare i checkpoint.
        model_name : str
            Nome base del file di checkpoint (verrà creato `model_name.pt`).
        start_from_checkpoint : bool, default=False
            Se True, prova a caricare un checkpoint esistente e riprendere l'allenamento.
            Se False, impedisce di sovrascrivere accidentalmente un checkpoint già esistente.
        """

        # Call the parent nn.Module constructor
        super(ModelTrainer, self).__init__()

        # core training objects
        self.model = model
        self.device = device
        self.loss_fun = loss_fun
        self.batch_size = batch_size
        self.learning_rate = learning_rate

        # logging / tracking
        self.start_epoch = 0
        self.best_valid_acc = 0.0

        self.train_loss_logger = []
        self.train_acc_logger = []
        self.val_acc_logger = []
        self.val_loss_logger = []
        self.test_acc_logger = []
        self.test_loss_logger = []

        # loaders will be attached later
        self.train_loader = None
        self.valid_loader = None
        self.test_loader = None

        # optimizer
        self.optimizer = None
        self.set_optimizer()

        # checkpoint stuff
        self.save_dir = save_dir
        self.save_path = os.path.join(save_dir, model_name + ".pt")

        if not os.path.isdir(self.save_dir):
            os.makedirs(self.save_dir)

        if start_from_checkpoint:
            # try to resume
            self.load_checkpoint()
        else:
            # safety check: don't silently overwrite an existing model
            if os.path.isfile(self.save_path):
                raise ValueError(f"Warning: checkpoint already exists at {self.save_path}")
            else:
                print("New model creation")

    def set_optimizer(self):
        # IMPORTANT: optimize the wrapped model's parameters, not the trainer's parameters()
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)

    def load_checkpoint(self):
        # Check if checkpoint exists
        if os.path.isfile(self.save_path):
            checkpoint = torch.load(self.save_path, map_location=self.device)

            self.model.load_state_dict(checkpoint['model_state_dict'])
            self.set_optimizer()
            self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

            self.start_epoch = checkpoint['epoch']
            self.best_valid_acc = checkpoint['best_valid_acc']

            self.train_loss_logger = checkpoint.get('train_loss_logger', [])
            self.train_acc_logger = checkpoint.get('train_acc_logger', [])
            self.val_acc_logger = checkpoint.get('val_acc_logger', [])
            self.val_loss_logger = checkpoint.get('val_loss_logger', [])
            self.test_acc_logger = checkpoint.get('test_acc_logger', [])
            self.test_loss_logger = checkpoint.get('test_loss_logger', [])

            print(f"Checkpoint loaded, starting from epoch: {self.start_epoch}")
        else:
            # Raise Error if it does not exist
            raise ValueError("Checkpoint does not exist")

    def save_checkpoint(self, epoch, valid_acc):
        self.best_valid_acc = valid_acc

        torch.save({
            'epoch': epoch,
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'best_valid_acc': valid_acc,
            'train_loss_logger': self.train_loss_logger,
            'train_acc_logger': self.train_acc_logger,
            'val_acc_logger': self.val_acc_logger,
            'val_loss_logger': self.val_loss_logger,
            'test_acc_logger': self.test_acc_logger,
            'test_loss_logger': self.test_loss_logger,
        }, self.save_path)

    def set_data(self, train_set, val_set, test_set=None):
        print(f'Number of training examples: {len(train_set)}')
        print(f'Number of validation examples: {len(val_set)}')
        if test_set is not None:
            print(f'Number of test examples: {len(test_set)}')

        self.train_loader = dataloader.DataLoader(
            train_set,
            shuffle=True,
            batch_size=self.batch_size,
            num_workers=4
        )
        self.valid_loader = dataloader.DataLoader(
            val_set,
            shuffle=False,
            batch_size=self.batch_size,
            num_workers=4
        )
        if test_set is not None:
            self.test_loader = dataloader.DataLoader(
                test_set,
                shuffle=False,
                batch_size=self.batch_size,
                num_workers=4
            )

    def forward(self, x):
        # delegate to the actual model
        return self.model(x)

    def train_model(self):
        """
        Esegue una singola epoca di addestramento.

        Scorre tutto il train_loader e:
        - calcola le predizioni
        - computa la loss
        - fa il backward
        - aggiorna i pesi
        """
        if self.train_loader is None:
            raise ValueError("Dataset not defined! Call set_data() first.")

        # Set model in train mode
        self.model.train()

        running_loss = 0.0
        running_correct = 0
        total_samples = 0

        for x, y in tqdm(self.train_loader, leave=False, desc="Training"):
            x = x.to(self.device)
            y = y.to(self.device)

            # Forward pass
            fx = self.model(x)

            # Loss
            loss = self.loss_fun(fx, y)

            # Zero grad
            self.optimizer.zero_grad()

            # Backward
            loss.backward()

            # Step
            self.optimizer.step()

            # Logging
            batch_size_now = x.size(0)
            running_loss += loss.item() * batch_size_now
            running_correct += (fx.argmax(1) == y).sum().item()
            total_samples += batch_size_now

        epoch_loss = running_loss / total_samples
        epoch_acc = running_correct / total_samples

        self.train_loss_logger.append(epoch_loss)
        self.train_acc_logger.append(epoch_acc)

        return epoch_loss, epoch_acc

    def evaluate_model(self, train_test_val="val"):
        """
        Valuta il modello su un dataset (train/val/test) senza aggiornare i pesi.
        Ritorna l'accuracy media.
        """
        if train_test_val == "train":
            loader = self.train_loader
            state = "Evaluating Train Set"
        elif train_test_val == "val":
            loader = self.valid_loader
            state = "Evaluating Validation Set"
        elif train_test_val == "test":
            loader = self.test_loader
            state = "Evaluating Test Set"
        else:
            raise ValueError("Invalid dataset, train_test_val should be 'train'/'val'/'test'")

        if loader is None:
            raise ValueError(f"{train_test_val} loader not defined! Did you pass that split to set_data()?")

        self.model.eval()

        epoch_acc = 0.0
        epoch_loss = 0.0
        total_samples = 0

        with torch.no_grad():
            for x, y in tqdm(loader, leave=False, desc=state):
                x = x.to(self.device)
                y = y.to(self.device)

                fx = self.model(x)

                loss = self.loss_fun(fx, y)
                bs = x.size(0)

                epoch_loss += loss.item() * bs
                total_samples += bs
                epoch_acc += (fx.argmax(1) == y).sum().item()

        avg_loss = epoch_loss / total_samples
        avg_acc = epoch_acc / total_samples

        if train_test_val == "train":
            self.train_acc_logger.append(avg_acc)
            # you normally wouldn't log train loss here because train_model already did,
            # but you could if you wanted symmetry
        elif train_test_val == "val":
            self.val_acc_logger.append(avg_acc)
            self.val_loss_logger.append(avg_loss)
        elif train_test_val == "test":
            self.test_acc_logger.append(avg_acc)
            self.test_loss_logger.append(avg_loss)

        return avg_acc



Definizione dei parametri di training



In [20]:
batch_size = 32
num_epochs = 3
learning_rate = 1e-5
start_epoch = 0
best_valid_acc = 0


In [18]:
# Set device to GPU_indx if GPU is avaliable
GPU_indx = 0
device = torch.device(GPU_indx if torch.cuda.is_available() else 'cpu')

Questa cella definisce il modello da utilizzare

- Architettura ResNet o Mobilenet
- Classificatore a 2 o 3 classi (la classe Rimossi è stata creata per fare dei test, ma difficilmente nel caso reale un traliccio viene completamente rimosso - più verosimilmente viene abbattuto )

In [None]:
#Load the dataset - Questo dataset include un crop dei 512 x 512 pixel centrali dell'immagine raw
data_dir = "/content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512"

# Si può scegliere se trainare un classificatore a due o a tre classi
two_classes = ['Integri', 'Abbattuti']
three_classes = ['Integri', 'Rimossi', 'Abbattuti']
selected_subfolders = ['_0', '_3x20_NoExtraTgt', '_5x25_NoExtraTgt', '_5x25_WithExtraTgt_Cls']


# Crea Dataset con i parametri selezzionati
two_class_dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=two_classes, transform=transforms.ToTensor(),  selected_subfolders=selected_subfolders)
#three_class_dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=three_classes, transform=transforms.ToTensor(),  selected_subfolders=selected_subfolders)


dataset_for_training = two_class_dataset
num_classes = len(dataset_for_training.classes)


# Scegli architettura
model_architecture = 'mobilenet' # Options: 'resnet18', 'resnet50', 'mobilenet'

if model_architecture == 'resnet18':
    if num_classes == 2:
        model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
        num_ftrs = model.fc.in_features
        model.conv1 = nn.Conv2d(in_channels=2,
                                out_channels=model.conv1.out_channels,
                                kernel_size=model.conv1.kernel_size,
                                stride=model.conv1.stride,
                                padding=model.conv1.padding,
                                bias=model.conv1.bias is not None
        )
        model.fc = nn.Linear(num_ftrs, 2)
    elif num_classes == 3:
        model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
        model.conv1 = nn.Conv2d(in_channels=2,
                                out_channels=model.conv1.out_channels,
                                kernel_size=model.conv1.kernel_size,
                                stride=model.conv1.stride,
                                padding=model.conv1.padding,
                                bias=model.conv1.bias is not None
        )
        num_ftrs = model.fc.in_features
        model.fc = nn.Linear(num_ftrs, 3)
    else:
        raise ValueError(f"Unsupported number of classes: {num_classes}")

elif model_architecture == 'resnet50':
    if num_classes == 2:
        model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
        num_ftrs = model.fc.in_features
        model.conv1 = nn.Conv2d(in_channels=2,
                                out_channels=model.conv1.out_channels,
                                kernel_size=model.conv1.kernel_size,
                                stride=model.conv1.stride,
                                padding=model.conv1.padding,
                                bias=model.conv1.bias is not None
        )
        model.fc = nn.Linear(num_ftrs, 2)
    elif num_classes == 3:
        model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1) #
        model.conv1 = nn.Conv2d(in_channels=2,
                                out_channels=model.conv1.out_channels,
                                kernel_size=model.conv1.kernel_size,
                                stride=model.conv1.stride,
                                padding=model.conv1.padding,
                                bias=model.conv1.bias is not None
        )
        num_ftrs = model.fc.in_features
        model.fc = nn.Linear(num_ftrs, 3)
    else:
        raise ValueError(f"Unsupported number of classes: {num_classes}")

elif model_architecture == 'mobilenet':
    if num_classes == 2:
        model = models.mobilenet_v3_small(weights=models.MobileNet_V3_Small_Weights.DEFAULT)
        model.features[0][0] = nn.Conv2d(2, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        num_ftrs = model.classifier[-1].in_features
        model.classifier[-1] = nn.Linear(num_ftrs, 2)
    elif num_classes == 3:
        model = models.mobilenet_v3_small(weights=models.MobileNet_V3_Small_Weights.DEFAULT)
        model.features[0][0] = nn.Conv2d(2, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        num_ftrs = model.classifier[-1].in_features
        model.classifier[-1] = nn.Linear(num_ftrs, 3)
    else:
        raise ValueError(f"Unsupported number of classes: {num_classes}")

else:
    raise ValueError(f"Unsupported model architecture: {model_architecture}")


# Split training, validation, e test
train_size = int(0.8 * len(dataset_for_training))
val_size = int(0.1 * len(dataset_for_training))
test_size = len(dataset_for_training) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset_for_training, [train_size, val_size, test_size])


def compute_mean_std(dset):
    loader = DataLoader(dset, batch_size=batch_size, shuffle=False)
    mean = 0.0
    std = 0.0
    n = 0

    for images, _ in loader:
        images = images.view(images.size(0), -1)  # Flatten
        n += images.size(0)
        mean += images.mean(1).sum().item()
        std += images.std(1).sum().item()

    mean /= n
    std /= n
    return mean, std

mean, std = compute_mean_std(train_dataset)
print("mean: ", mean)
print("std: ", std)


train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
valid_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

normalized_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((mean,), (std,))
])

train_dataset.dataset.transform = normalized_transform
val_dataset.dataset.transform = normalized_transform
test_dataset.dataset.transform = normalized_transform


print(f'Number of training examples: {len(train_dataset)}')
print(f'Number of validation examples: {len(val_dataset)}')
print(f'Number of test examples: {len(test_dataset)}')
print(f'Model with {num_classes} output classes and {model_architecture} architecture created.')

Processing data from: /content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512/Integri/Integri_0
Processing data from: /content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512/Integri/Integri_3x20_NoExtraTgt
Processing data from: /content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512/Integri/Integri_5x25_NoExtraTgt
Processing data from: /content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512/Integri/Integri_5x25_WithExtraTgt_Cls
Processing data from: /content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512/Abbattuti/Abbattuti_0
Processing data from: /content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512/Abbattuti/Abbattuti_3x20_NoExtraTgt
Processing data from: /content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512/Abbattuti/Abbattuti_5x25_NoExtraTgt
Processing data from: /content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512/Abbattuti/Abbattuti_5x25_WithExtraTgt_Cls


Date le performance molto alte del modello sia su set di training che di validazione, ho il dubbio che il dataset sintetico sia estremamente semplice. Per capire se ci sono dei pattern nelle immagini raw che il modello sta imparando, questa cella calcola la media e deviazione standard delle immagini di coascuna classe. la cella successiva calcola la differenza fra queste medie.

In [None]:

if 'train_dataset' not in globals():
    print("Error: 'train_dataset' not found. Please run the cell that creates the training dataset first.")
else:
    # Create a dictionary
    images_by_class = defaultdict(list)

    # Get the class names from the dataset (assuming train_dataset is a Subset)
    # Access the original dataset's classes attribute
    if hasattr(train_dataset, 'dataset') and hasattr(train_dataset.dataset, 'classes'):
        class_names = train_dataset.dataset.classes
    else:
        # Fallback if the structure is different
        class_names = [str(i) for i in range(len(np.unique([label for _, label in train_dataset])))]
        print("Warning: Could not access class names from the original dataset. Using numerical labels.")

    # Iterate through the training dataset and group images by class
    for img, label in train_dataset:
        images_by_class[label].append(img)

    # Iterate through each class to calculate and plot the average image and standard deviation
    for label, images in images_by_class.items():
        if not images:
            print(f"No images found for class with label {label}.")
            continue

        # Get the class name for the current label
        class_name = class_names[label]
        print(f"Processing class: {class_name}")

        # Stack images along a new dimension
        stacked_images = torch.stack(images)

        # Calculate the average image and standard deviation for each channel
        if stacked_images.shape[1] > 0:
            average_channel_0 = torch.mean(stacked_images[:, 0, :, :], dim=0)
            std_channel_0 = torch.std(stacked_images[:, 0, :, :], dim=0) # Calculate standard deviation

            # Determine the min and max values across both average and standard deviation for Channel 0
            vmin_ch0 = min(average_channel_0.min().item(), std_channel_0.min().item())
            vmax_ch0 = max(average_channel_0.max().item(), std_channel_0.max().item())


            # Plot average and standard deviation for Channel 0 side by side
            fig, axes = plt.subplots(1, 2, figsize=(12, 6))

            im0 = axes[0].imshow(average_channel_0.numpy(), cmap='viridis', vmin=vmin_ch0, vmax=vmax_ch0)
            axes[0].set_title(f'Average Channel 0: {class_name}')
            axes[0].axis('off')
            fig.colorbar(im0, ax=axes[0])


            im1 = axes[1].imshow(std_channel_0.numpy(), cmap='viridis', vmin=vmin_ch0, vmax=vmax_ch0)
            axes[1].set_title(f'Standard Deviation Channel 0: {class_name}')
            axes[1].axis('off')
            fig.colorbar(im1, ax=axes[1])

            plt.tight_layout()
            plt.show()

        else:
             print(f"No Channel 0 data for class '{class_name}'.")


        if stacked_images.shape[1] > 1:
            average_channel_1 = torch.mean(stacked_images[:, 1, :, :], dim=0)
            std_channel_1 = torch.std(stacked_images[:, 1, :, :], dim=0) # Calculate standard deviation

            # Determine the min and max values across both average and standard deviation for Channel 1
            vmin_ch1 = min(average_channel_1.min().item(), std_channel_1.min().item())
            vmax_ch1 = max(average_channel_1.max().item(), std_channel_1.max().item())

            # Plot average and standard deviation for Channel 1 side by side
            fig, axes = plt.subplots(1, 2, figsize=(12, 6))

            im0 = axes[0].imshow(average_channel_1.numpy(), cmap='viridis', vmin=vmin_ch1, vmax=vmax_ch1)
            axes[0].set_title(f'Average Channel 1: {class_name}')
            axes[0].axis('off')
            fig.colorbar(im0, ax=axes[0])

            im1 = axes[1].imshow(std_channel_1.numpy(), cmap='viridis', vmin=vmin_ch1, vmax=vmax_ch1)
            axes[1].set_title(f'Standard Deviation Channel 1: {class_name}')
            axes[1].axis('off')
            fig.colorbar(im1, ax=axes[1])

            plt.tight_layout()
            plt.show()
        else:
            print(f"No Channel 1 data for class '{class_name}'.")

In [None]:

integri_label = dataset_to_use.classes.index('Integri')
abbattuti_label = dataset_to_use.classes.index('Abbattuti')

if integri_label in images_by_class and abbattuti_label in images_by_class:
    # Get the average images for Integri
    integri_images = images_by_class[integri_label]
    if integri_images:
        stacked_integri = torch.stack(integri_images)
        avg_integri_ch0 = torch.mean(stacked_integri[:, 0, :, :], dim=0)
        avg_integri_ch1 = torch.mean(stacked_integri[:, 1, :, :], dim=0)
    else:
        avg_integri_ch0 = None
        avg_integri_ch1 = None
        print("No images found for class 'Integri'. Cannot perform subtraction.")


    # Get the average images for Abbattuti
    abbattuti_images = images_by_class[abbattuti_label]
    if abbattuti_images:
        stacked_abbattuti = torch.stack(abbattuti_images)
        avg_abbattuti_ch0 = torch.mean(stacked_abbattuti[:, 0, :, :], dim=0)
        avg_abbattuti_ch1 = torch.mean(stacked_abbattuti[:, 1, :, :], dim=0)
    else:
        avg_abbattuti_ch0 = None
        avg_abbattuti_ch1 = None
        print("No images found for class 'Abbattuti'. Cannot perform subtraction.")


    # Perform subtraction channel by channel and calculate magnitude if average images are available
    if avg_integri_ch0 is not None and avg_abbattuti_ch0 is not None:
        diff_channel_0 = avg_integri_ch0 - avg_abbattuti_ch0 # Real part difference
        diff_channel_1 = avg_integri_ch1 - avg_abbattuti_ch1 # Imaginary part difference

        print("Subtraction of average images performed channel by channel.")

        # Plot the difference images with color bars
        plt.figure(figsize=(12, 6))

        plt.subplot(1, 2, 1)
        im0 = plt.imshow(diff_channel_0.numpy(), cmap='seismic') # Use seismic colormap to show positive and negative differences
        plt.title('Difference (Integri - Abbattuti) - Channel 0')
        plt.axis('off')
        plt.colorbar(im0) # Add color bar for Channel 0 difference

        plt.subplot(1, 2, 2)
        im1 = plt.imshow(diff_channel_1.numpy(), cmap='seismic')
        plt.title('Difference (Integri - Abbattuti) - Channel 1')
        plt.axis('off')
        plt.colorbar(im1) # Add color bar for Channel 1 difference

        plt.tight_layout()
        plt.show()

    else:
        print("Average images for 'Integri' or 'Abbattuti' not found in images_by_class dictionary.")
        print("Please ensure the previous cell (cell_id: 3wWH7mE3GJaH) was run and successfully calculated the average images for these classes.")

Questa cella effettua il training del modello scelto su train_dataset

In [None]:
start_from_checkpoint = False
save_dir = '/content/drive/MyDrive/Colab Notebooks/Towercheck/Model/'
model_name = model_architecture+"_cls_"+str(num_classes)

# Move the model to the device
model = model.to(device)

model_trainer = ModelTrainer(model=model, device=device, loss_fun=nn.CrossEntropyLoss(),
                             batch_size=batch_size, learning_rate=learning_rate,
                             save_dir=save_dir, model_name=model_name,
                             start_from_checkpoint=start_from_checkpoint)

model_trainer.set_data(train_set=train_dataset, val_set=val_dataset, test_set=test_dataset)


start_time = time.time()
valid_acc = 0
train_acc = 0

pbar = trange(start_epoch, num_epochs, leave=False, desc="Epoch")
for epoch in pbar:
    pbar.set_postfix_str('Accuracy: Train %.2f%%, Val %.2f%%' % (train_acc * 100, valid_acc * 100))

    #Train su singole epoca
    model_trainer.train_model()

    #Valutazionedel modello su val
    train_acc = model_trainer.evaluate_model(train_test_val="train")
    valid_acc = model_trainer.evaluate_model(train_test_val="val")

    #Salvataggio del miglior modello
    if valid_acc > model_trainer.best_valid_acc:
        model_trainer.save_checkpoint(epoch, valid_acc)

end_time = time.time()

print("Training time %.2f seconds" %(end_time - start_time))
print("The highest validation accuracy was %.2f%%" %(model_trainer.best_valid_acc*100))

#Valutazione del modello sul set di test
test_acc = model_trainer.evaluate_model(train_test_val="test")
print(f"\nAccuracy on the test set: {test_acc*100:.2f}%")


In [None]:
#Plots
_ = plt.figure(figsize = (10,5))
train_x = np.linspace(0, num_epochs, len(model_trainer.train_loss_logger))
_ = plt.plot(train_x, model_trainer.train_loss_logger, label="Training Loss")

if len(model_trainer.val_loss_logger) > 0:
    valid_x = np.linspace(0, num_epochs, len(model_trainer.val_loss_logger))
    _ = plt.plot(valid_x, model_trainer.val_loss_logger, label="Validation Loss")

_ = plt.title("Loss per Epoch")
_ = plt.xlabel("Epoch")
_ = plt.ylabel("Loss")
_ = plt.legend()
plt.show()

_ = plt.figure(figsize = (10,5))
train_x = np.linspace(0, num_epochs, len(model_trainer.train_acc_logger))
_ = plt.plot(train_x, model_trainer.train_acc_logger, c = "y")
valid_x = np.linspace(0, num_epochs, len(model_trainer.val_acc_logger))
_ = plt.plot(valid_x, model_trainer.val_acc_logger, c = "k")

_ = plt.title("Accuracy")
_ = plt.xlabel("Epoch")
_ = plt.ylabel("Accuracy")
_ = plt.legend(["Training accuracy", "Validation accuracy"])


In [None]:


from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

model_trainer.eval()
all_labels = []
all_preds = []
misclassified_images = []
misclassified_labels = []
misclassified_preds = []
misclassified_filepaths = [] # List to store file paths of misclassified images

with torch.no_grad():
    # Iterate through the test loader with index
    for batch_idx, (inputs, labels) in enumerate(model_trainer.test_loader):
        inputs = inputs.to(model_trainer.device)
        labels = labels.to(model_trainer.device)
        outputs = model_trainer(inputs)
        _, preds = torch.max(outputs, 1)

        # Identify misclassified examples in the current batch
        misclassified_mask = preds != labels
        misclassified_indices_in_batch = torch.where(misclassified_mask)[0]

        # Get the global indices of misclassified examples in the test dataset
        # The index in the test_loader is batch_idx, the index within the batch is misclassified_indices_in_batch
        # We need to map these back to the original dataset indices through the test_dataset Subset
        for idx_in_batch in misclassified_indices_in_batch:
            global_dataset_index = test_dataset.indices[batch_idx * model_trainer.batch_size + idx_in_batch]
            misclassified_filepaths.append(dataset_to_use.image_paths[global_dataset_index])


        misclassified_images.extend(inputs[misclassified_mask].cpu())
        misclassified_labels.extend(labels[misclassified_mask].cpu().numpy())
        misclassified_preds.extend(preds[misclassified_mask].cpu().numpy())


        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())

# Calculate the confusion matrix
cm = confusion_matrix(all_labels, all_preds)

# Define class names based on the dataset used for validation
class_names = dataset_to_use.classes # Access the classes directly from the dataset

# Plot the confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix (Test Set)')
plt.show()

# Calculate and print per-class accuracy, precision, and recall
print("\nPer-Class Metrics (Test Set):")
for i, class_name in enumerate(class_names):
    true_positives = cm[i, i]
    false_negatives = np.sum(cm[i, :]) - true_positives
    false_positives = np.sum(cm[:, i]) - true_positives
    true_negatives = np.sum(cm) - true_positives - false_positives - false_negatives

    # Handle division by zero for precision and recall
    accuracy = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
    precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
    recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    print(f"  {class_name}:")
    print(f"    Accuracy: {accuracy:.2f}")
    print(f"    Precision: {precision:.2f}")
    print(f"    Recall: {recall:.2f}")
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    print(f"  {class_name}:")
    print(f"    Accuracy: {accuracy:.2f}")
    print(f"    Precision: {precision:.2f}")
    print(f"    Recall: {recall:.2f}")
    print(f"    F1-Score: {f1_score:.2f}")

# Print the file paths of misclassified examples
print("\nMisclassified Examples (File Paths):")
for filepath, true_lbl, pred_lbl in zip(misclassified_filepaths, misclassified_labels, misclassified_preds):
    true_class_name = dataset_to_use.classes[true_lbl]
    pred_class_name = dataset_to_use.classes[pred_lbl]
    print(f"  File: {filepath}, True Label: {true_class_name}, Predicted Label: {pred_class_name}")

In [None]:
import os
import torch
import torch.nn.functional as F
from torchvision import models

# ---- Build model exactly as in training ----
num_classes = 2
model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
num_ftrs = model.fc.in_features

model.conv1 = torch.nn.Conv2d(
    in_channels=2,
    out_channels=model.conv1.out_channels,
    kernel_size=model.conv1.kernel_size,
    stride=model.conv1.stride,
    padding=model.conv1.padding,
    bias=model.conv1.bias is not None
)
model.fc = torch.nn.Linear(num_ftrs, num_classes)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# ---- Load ONLY the model state dict from your checkpoint ----
ckpt_path = "/content/drive/MyDrive/Colab Notebooks/Towercheck/Model/ResNet50_2classes_IA_extratgt.pt"
ckpt = torch.load(ckpt_path, map_location=device)
state_dict = ckpt["model_state_dict"]  # this exists in your file

# (Optional) strip 'module.' if model was saved from DataParallel
from collections import OrderedDict
clean_sd = OrderedDict((k.replace("module.", "", 1) if k.startswith("module.") else k, v)
                       for k, v in state_dict.items())

model.load_state_dict(clean_sd, strict=True)  # this should succeed as-is ✅
model.eval()

# ---- Minimal Grad-CAM for ResNet-50 (target layer = layer4[-1]) ----
class GradCAM:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.activations, self.gradients = None, None
        self.h1 = target_layer.register_forward_hook(self._fwd_hook)
        self.h2 = target_layer.register_full_backward_hook(self._bwd_hook)

    def _fwd_hook(self, m, i, o): self.activations = o.detach()
    def _bwd_hook(self, m, gi, go): self.gradients = go[0].detach()

    def __call__(self, x, class_idx=None):
        H, W = x.shape[-2:]
        self.model.zero_grad()
        with torch.enable_grad():
            logits = self.model(x)
            if class_idx is None:
                class_idx = int(logits.argmax(dim=1))
            score = logits[0, class_idx]
        score.backward()
        w = self.gradients.mean(dim=(2, 3), keepdim=True)     # (1,C,1,1)
        cam = (w * self.activations).sum(dim=1, keepdim=True) # (1,1,h,w)
        cam = F.relu(cam)
        cam = F.interpolate(cam, size=(H, W), mode="bilinear", align_corners=False)
        cam = (cam - cam.min()) / (cam.max() + 1e-8)
        return cam  # (1,1,H,W)

    def close(self):
        self.h1.remove(); self.h2.remove()

target_layer = model.layer4[-1]  # best default for ResNet-50
gradcam = GradCAM(model, target_layer)

# ---- Example usage ----
# x: your preprocessed 2-channel tensor (1,2,H,W) using the SAME normalization as training
# x = ... .to(device)
# cam = gradcam(x)           # (1,1,H,W) in [0,1]
# heatmap = cam[0,0].cpu().numpy()
# gradcam.close()


In [None]:
# ==== CONFIG ====
DATASET_CHOICE = "test"  # one of: "train", "val", "test"
NUM_SAMPLES = 20
OUTPUT_DIR = "cam_outputs"
SEED = 42

# If you used per-channel normalization during training, put it here (length-2 lists).
# If unknown, leave as None and the script will min-max normalize channel 0 for display only.
TRAIN_MEAN = None  # e.g., [0.485, 0.456] for 2-ch
TRAIN_STD  = None  # e.g., [0.229, 0.224]

# ==== IMPORTS ====
import os, random
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torchvision import models
from collections import OrderedDict
import numpy as np # Import numpy

# ==== 1) PICK THE DATASET YOU WANT TO VISUALIZE ====
# Assumes you already created these via random_split:
#   train_dataset, val_dataset, test_dataset = random_split(dataset_to_use, [...])
if DATASET_CHOICE == "train":
    dataset_for_vis = train_dataset
elif DATASET_CHOICE == "val":
    dataset_for_vis = val_dataset
else:
    dataset_for_vis = test_dataset  # default

# ==== 2) BUILD THE MODEL EXACTLY AS TRAINED ====
num_classes = 2  # adjust if needed
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
num_ftrs = model.fc.in_features
model.conv1 = torch.nn.Conv2d(
    in_channels=2,
    out_channels=model.conv1.out_channels,
    kernel_size=model.conv1.kernel_size,
    stride=model.conv1.stride,
    padding=model.conv1.padding,
    bias=model.conv1.bias is not None,
)
model.fc = torch.nn.Linear(num_ftrs, num_classes)
model.to(device)

# ---- Load ONLY the model weights from your checkpoint ----
# (Update the path below if you’re not on Colab/Drive)
ckpt_path = "/content/drive/MyDrive/Colab Notebooks/Towercheck/Model/ResNet50_2classes_IA_extratgt.pt"
ckpt = torch.load(ckpt_path, map_location=device)
state_dict = ckpt.get("model_state_dict", ckpt)  # fall back if plain SD

# strip "module." if saved with DataParallel
clean_sd = OrderedDict((k.replace("module.", "", 1) if k.startswith("module.") else k, v)
                       for k, v in state_dict.items())
model.load_state_dict(clean_sd, strict=True)
model.eval()

# ==== 3) GRAD-CAM IMPLEMENTATION (last conv block) ====
class GradCAM:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.activations = None
        self.gradients = None
        self.h1 = target_layer.register_forward_hook(self._fwd_hook)
        self.h2 = target_layer.register_full_backward_hook(self._bwd_hook)

    def _fwd_hook(self, module, inp, out):
        self.activations = out.detach()           # (N, C, h, w)

    def _bwd_hook(self, module, grad_input, grad_output):
        self.gradients = grad_output[0].detach()  # (N, C, h, w)

    def __call__(self, x, class_idx=None):
        # x: (1, 2, H, W) already preprocessed as in training
        H, W = x.shape[-2:]
        self.model.zero_grad()
        with torch.enable_grad():
            logits = self.model(x)
            if class_idx is None:
                class_idx = int(logits.argmax(dim=1).item())
            score = logits[0, class_idx]
        score.backward()

        # weights: global-average gradients over spatial dims
        w = self.gradients.mean(dim=(2, 3), keepdim=True)     # (1, C, 1, 1)
        cam = (w * self.activations).sum(dim=1, keepdim=True) # (1, 1, h, w)
        cam = F.relu(cam)
        cam = F.interpolate(cam, size=(H, W), mode="bilinear", align_corners=False)

        # normalize to [0,1]
        cam = cam - cam.min()
        cam = cam / (cam.max() + 1e-8)
        return cam  # (1,1,H,W)

    def close(self):
        self.h1.remove(); self.h2.remove()

# Choose last conv block (best default for ResNet-50)
target_layer = model.layer4[-1] if not hasattr(model, "module") else model.module.layer4[-1]
gradcam = GradCAM(model, target_layer)

# ==== 4) SAMPLING 20 RANDOM SAMPLES ====
random.seed(SEED)
torch.manual_seed(SEED)

num_total = len(dataset_for_vis)
indices = random.sample(range(num_total), k=min(NUM_SAMPLES, num_total))

# Create output dir
os.makedirs(OUTPUT_DIR, exist_ok=True)

# ==== 5) VISUALIZATION HELPERS ====
import numpy as np

def denorm_channel0_for_display(x_2ch):
    """
    x_2ch: (1,2,H,W) tensor on any device (the SAME tensor fed to the model).
    Returns np array (H,W) scaled to [0,1] for display of channel 0 only.
    Uses TRAIN_MEAN/STD if provided, otherwise min-max.
    """
    x0 = x_2ch[0, 0].detach().float().cpu()
    if TRAIN_MEAN is not None and TRAIN_STD is not None:
        # de-normalize channel 0 only
        x0 = x0 * TRAIN_STD[0] + TRAIN_MEAN[0]
        # simple [0,1] clamp for display if those stats are in that range
        x0 = (x0 - x0.min()) / (x0.max() - x0.min() + 1e-8)
    else:
        # min-max normalize for display
        x0 = (x0 - x0.min()) / (x0.max() - x0.min() + 1e-8)
    return x0.cpu().numpy()

def overlay_and_save(x_2ch, cam, out_path, title=None, alpha=0.4, show=False):
    """
    x_2ch: (1,2,H,W) tensor
    cam: (1,1,H,W) tensor in [0,1]
    """
    bg = denorm_channel0_for_display(x_2ch)     # (H,W) in [0,1]
    heat = cam[0,0].detach().cpu().numpy()      # (H,W) in [0,1]

    plt.figure(figsize=(4,4))
    plt.imshow(bg, cmap="gray")
    plt.imshow(heat, cmap="jet", alpha=alpha)
    if title: plt.title(title)
    plt.axis("off")
    plt.tight_layout()
    plt.savefig(out_path, dpi=150)
    if show:
        plt.show()
    plt.close()

# ==== 6) RUN & PLOT (grid + individual files) ====
ncols = 4
nrows = int(np.ceil(len(indices) / ncols))
fig, axes = plt.subplots(nrows, ncols, figsize=(4*ncols, 4*nrows))
axes = np.atleast_2d(axes)

# Get class names from the dataset
class_names = dataset_for_vis.dataset.classes if hasattr(dataset_for_vis, 'dataset') and hasattr(dataset_for_vis.dataset, 'classes') else [str(i) for i in range(num_classes)]


for idx_plot, ds_idx in enumerate(indices):
    # dataset_for_vis[i] -> (image_tensor, label)
    sample = dataset_for_vis[ds_idx]
    if isinstance(sample, (tuple, list)):
        img, label = sample[0], int(sample[1])
    else:
        img, label = sample, None

    # Ensure shape (1,2,H,W)
    if img.dim() == 3:
        img_in = img.unsqueeze(0).to(device)
    else:
        raise ValueError(f"Expected image with shape (2,H,W); got {tuple(img.shape)}")

    # Get predicted label
    model.eval() # Ensure model is in evaluation mode
    with torch.no_grad():
        outputs = model(img_in)
        _, predicted_label = torch.max(outputs, 1)
        predicted_label = predicted_label.item()


    # Compute CAM for predicted class
    with torch.enable_grad():
        cam = gradcam(img_in)  # (1,1,H,W)

    # Save individual file
    out_file = os.path.join(OUTPUT_DIR, f"cam_idx{ds_idx}.png")
    # Include true and predicted labels in the title
    title = f"idx {ds_idx} | True: {class_names[label]}" + (f" | Pred: {class_names[predicted_label]}" if predicted_label is not None else "")
    overlay_and_save(img_in, cam, out_file, title=title, alpha=0.40, show=False)

    # Also draw in the grid
    r, c = divmod(idx_plot, ncols)
    ax = axes[r, c]
    bg = denorm_channel0_for_display(img_in)
    heat = cam[0,0].detach().cpu().numpy()
    ax.imshow(bg, cmap="gray")
    ax.imshow(heat, cmap="jet", alpha=0.40)
    # Include true and predicted labels in the grid title
    ax.set_title(f"True: {class_names[label]}\nPred: {class_names[predicted_label]}", fontsize=10)
    ax.axis("off")

# Hide any unused subplots
for k in range(len(indices), nrows*ncols):
    r, c = divmod(k, ncols)
    axes[r, c].axis("off")

plt.tight_layout()
plt.show()

# Cleanup hooks
gradcam.close()
print(f"Saved individual overlays to: {OUTPUT_DIR}")

In [None]:
import torch
import torch.nn as nn
from torchvision import models, transforms
from torch.utils.data import DataLoader
import os

# Assuming the necessary classes (ProcessedSARImageDataset, ModelTrainer) are defined in previous cells.

# --- Configuration ---
data_dir = "/content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512"
save_dir = '/content/drive/MyDrive/Colab Notebooks/Towercheck/Model/'
# Update model_name to match the model trained on the 2-class dataset with all subfolders
model_name = 'ResNet50_2classes_IA' # Name of the model to load (from cell Ks7Zms55xWuz)


# Subfolder to evaluate on
evaluation_subfolder = ['_5x25_WithExtraTgt'] # Changed to the desired evaluation subfolder

# Assuming the model was trained on 2 classes based on the model_name
num_classes = 2
selected_classes = ['Integri', 'Abbattuti'] # Define classes based on the trained model

# Batch size used during training (or a suitable batch size for evaluation)
batch_size = 64 # Adjust if necessary

# Set device to GPU_indx if GPU is available
GPU_indx = 0
device = torch.device(GPU_indx if torch.cuda.is_available() else 'cpu')

# --- Data Loading and Preparation for Evaluation ---

# Load the evaluation dataset with the specified subfolder
eval_dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=selected_classes, transform=transforms.ToTensor(), selected_subfolders=evaluation_subfolder)

# Assuming the model was trained with normalization, we need to apply the same normalization
# We need the mean and std that were used during training. These were calculated in cell Ks7Zms55xWuz.
# Based on the output of cell Ks7Zms55xWuz for 2 classes:
mean = 0.042781053980191545
std = 0.10918095029062695

normalized_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((mean,), (std,))
])

eval_dataset.transform = normalized_transform

# Create a DataLoader for the evaluation dataset
eval_loader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

print(f'Number of evaluation examples: {len(eval_dataset)}')


# --- Model Loading ---

# Define the model architecture (ResNet50 modified for 2 input channels and num_classes output)
model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
num_ftrs = model.fc.in_features
model.conv1 = nn.Conv2d(in_channels=2,
                        out_channels=model.conv1.out_channels,
                        kernel_size=model.conv1.kernel_size,
                        stride=model.conv1.stride,
                        padding=model.conv1.padding,
                        bias=model.conv1.bias is not None
)
model.fc = nn.Linear(num_ftrs, num_classes)

# Load the trained model state dictionary from the checkpoint
model_path = os.path.join(save_dir, model_name + ".pt")

if os.path.isfile(model_path):
    checkpoint = torch.load(model_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    print(f"Model loaded successfully from {model_path}")
else:
    raise FileNotFoundError(f"Model checkpoint not found at {model_path}")

# Move the model to the device
model = model.to(device)

# --- Model Evaluation ---

# Use the ModelTrainer class for evaluation
# We need to instantiate ModelTrainer with the loaded model and evaluation loader
# Although we are only evaluating, ModelTrainer requires some training parameters for initialization
# We can use dummy values for learning_rate, save_dir, model_name as they are not used in evaluation mode
model_trainer_eval = ModelTrainer(model=model, device=device, loss_fun=nn.CrossEntropyLoss(),
                                  batch_size=batch_size, learning_rate=1e-5,
                                  save_dir='./temp_eval', model_name='temp_eval_model',
                                  start_from_checkpoint=False) # Set to False as we load the model manually

# Set the evaluation loader in the model_trainer_eval instance
model_trainer_eval.valid_loader = eval_loader # Use valid_loader for evaluation dataset


# Evaluate the model on the evaluation dataset
print("\nEvaluating model on the specified subfolder...")
eval_acc = model_trainer_eval.evaluate_model(train_test_val="val") # Use "val" as evaluate_model expects it

print(f"\nAccuracy on {evaluation_subfolder[0]} dataset: {eval_acc*100:.2f}%")

# Optional: Generate confusion matrix for the evaluation dataset
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

# Get predictions on the evaluation set
model_trainer_eval.eval()
all_labels = []
all_preds = []
with torch.no_grad():
    for inputs, labels in model_trainer_eval.valid_loader: # Use valid_loader
        inputs = inputs.to(model_trainer_eval.device)
        labels = labels.to(model_trainer_eval.device)
        outputs = model_trainer_eval(inputs)
        _, preds = torch.max(outputs, 1)
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())

# Calculate the confusion matrix
cm = confusion_matrix(all_labels, all_preds)

# Define class names based on the selected_classes for evaluation
class_names = selected_classes

# Plot the confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title(f'Confusion Matrix ({evaluation_subfolder[0]} Dataset)')
plt.show()

In [None]:
# --- Logistic Regression (Option 2: two-logit softmax) ---

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
import time
from tqdm import trange

# Paths & classes
data_dir = "/content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512"
two_classes   = ['Integri', 'Abbattuti']
three_classes = ['Integri', 'Rimossi', 'Abbattuti']
selected_subfolders = ['_0','_3x20_NoExtraTgt', '_5x25_NoExtraTgt']

# --- 1) Build datasets WITHOUT normalization first (ToTensor only) ---
base_transform = transforms.ToTensor()

two_class_dataset = ProcessedSARImageDataset(
    root_dir=data_dir,
    selected_classes=two_classes,
    transform=base_transform,
    selected_subfolders=selected_subfolders
)

three_class_dataset = ProcessedSARImageDataset(
    root_dir=data_dir,
    selected_classes=three_classes,
    transform=base_transform,
    selected_subfolders=selected_subfolders
)

# Choose which dataset to use
dataset_to_use = two_class_dataset  # or three_class_dataset
num_classes = len(dataset_to_use.classes)


# --- 2) Train/val split ---
train_size = int(0.9 * len(dataset_to_use))
val_size   = len(dataset_to_use) - train_size
train_dataset, val_dataset = random_split(dataset_to_use, [train_size, val_size])

# --- 3) Compute per-channel mean/std on the TRAIN split only ---
# IMPORTANT: do this while the underlying dataset transform is still ToTensor() only.
batch_size = 8  # set your batch size (define if not already)
def compute_mean_std(dataset):
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
    mean = 0.0
    std = 0.0
    n = 0

    for images, _ in loader:
        images = images.view(images.size(0), -1)  # Flatten
        n += images.size(0)
        mean += images.mean(1).sum().item()
        std += images.std(1).sum().item()

    mean /= n
    std /= n
    return mean, std

    print(mean, std)

mean, std = compute_mean_std(train_dataset)
print("Per-channel mean:", mean)
print("Per-channel std :", std)

# --- 4) Set NORMALIZATION transform (no flatten here!) ---
normalized_transform = transforms.Compose([
    transforms.ToTensor(),                 # -> [2, 512, 512]
    transforms.Normalize(mean=mean,std=std),
])

# Apply the new transform to the *underlying* dataset (affects both splits)
train_dataset.dataset.transform = normalized_transform
val_dataset.dataset.transform   = normalized_transform


print("aa", train_dataset[0][0].shape)
for i in range(len(train_dataset)):

    img, label = train_dataset[i]
    if isinstance(img, torch.Tensor) and img.dim() == 3:
        if img.shape[0] != 2:   # first dimension is channels
            print(f"Index {i}: shape {img.shape}, label {label}")
    else:
        print(f"Index {i}: unexpected type/shape -> {type(img)}, {getattr(img, 'shape', None)}")


# --- 5) Define the logistic regression model (flattens inside) ---
class LogisticTwoLogits(nn.Module):
    def __init__(self, C=2, H=512, W=512, num_classes=2):
        super().__init__()
        self.flatten = nn.Flatten()
        self.fc = nn.Linear(C * H * W, num_classes)  # two logits for 2 classes

    def forward(self, x):              # x: [N, 2, 512, 512]
        x = self.flatten(x)            # -> [N, 524288]
        return self.fc(x)              # -> [N, num_classes]


# Infer C,H,W from a sample (after transform) just to be safe
img0=train_dataset[0]
C, H, W = img0[0].shape
model = LogisticTwoLogits(C=C, H=H, W=W, num_classes=num_classes)

# --- 6) Dataloaders ---
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,  num_workers=4)
valid_loader = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False, num_workers=4)

# Quick sanity check
print(train_loader)
xb, yb = next(iter(train_loader))
print("Batch images:", xb.shape)     # [B, 2, 512, 512]
print("Batch labels:", yb.shape)     # [B]

# --- 7) Training setup ---
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

learning_rate = 1e-5       # set if not already
start_epoch = 0                # set if not already
num_epochs = 10                # set if not already

# Use CrossEntropyLoss for 2-class softmax
criterion = nn.CrossEntropyLoss()     # y must be LongTensor in {0,1}

# If your ModelTrainer expects the loss in its ctor, pass criterion there.
# Otherwise, we keep your original signature and ensure labels are long in the trainer.
model_trainer = ModelTrainer(
    model=model,
    device=device,
    loss_fun=criterion,
    batch_size=batch_size,
    learning_rate=learning_rate,
    save_dir='/content/drive/MyDrive/Colab Notebooks/Towercheck/Model/',
    model_name=f'LogisticRegression_{num_classes}classes_v4',
    start_from_checkpoint=False
)

# Provide data to trainer
model_trainer.set_data(train_set=train_dataset, val_set=val_dataset)

# --- 8) Train loop (unchanged logic) ---
start_time = time.time()
valid_acc = 0.0
train_acc = 0.0

pbar = trange(start_epoch, num_epochs, leave=False, desc="Epoch")
for epoch in pbar:
    pbar.set_postfix_str('Accuracy: Train %.2f%%, Val %.2f%%' % (train_acc * 100, valid_acc * 100))

    # Train one epoch
    model_trainer.train_model()   # Ensure inside this method: labels are cast to long: y = y.long()

    # Evaluate
    train_acc = model_trainer.evaluate_model(train_test_val="train")
    valid_acc = model_trainer.evaluate_model(train_test_val="val")

    # Save best
    if valid_acc > model_trainer.best_valid_acc:
        model_trainer.save_checkpoint(epoch, valid_acc)

end_time = time.time()

print("Training time %.2f seconds" %(end_time - start_time))
print("The highest validation accuracy was %.2f%%" %(model_trainer.best_valid_acc*100))

import matplotlib.pyplot as plt
import numpy as np

_ = plt.figure(figsize = (10,5))
train_x = np.linspace(0, num_epochs, len(model_trainer.train_loss_logger))
_ = plt.plot(train_x, model_trainer.train_loss_logger, label="Training Loss")

if len(model_trainer.val_loss_logger) > 0: # Only plot validation loss if it's logged
    valid_x = np.linspace(0, num_epochs, len(model_trainer.val_loss_logger))
    _ = plt.plot(valid_x, model_trainer.val_loss_logger, label="Validation Loss")

_ = plt.title("Loss per Epoch")
_ = plt.xlabel("Epoch")
_ = plt.ylabel("Loss")
_ = plt.legend()
plt.show() # Added to display the plot

_ = plt.figure(figsize = (10,5))
train_x = np.linspace(0, num_epochs, len(model_trainer.train_acc_logger))
_ = plt.plot(train_x, model_trainer.train_acc_logger, c = "y")
valid_x = np.linspace(0, num_epochs, len(model_trainer.val_acc_logger))
_ = plt.plot(valid_x, model_trainer.val_acc_logger, c = "k")
_ = plt.xlabel("Epoch")
_ = plt.ylabel("Accuracy")
_ = plt.title("Accuracy")
_ = plt.legend(["Training accuracy", "Validation accuracy"])

from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

# Get predictions on the validation set
model_trainer.eval()
all_labels = []
all_preds = []
with torch.no_grad():
    for inputs, labels in model_trainer.valid_loader:
        inputs = inputs.to(model_trainer.device)
        labels = labels.to(model_trainer.device)
        outputs = model_trainer(inputs)
        _, preds = torch.max(outputs, 1)
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())

# Calculate the confusion matrix
cm = confusion_matrix(all_labels, all_preds)

# Define class names based on the dataset used for validation
class_names = dataset_to_use.classes # Access the classes directly from the dataset

# Plot the confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix (Validation Set)')
plt.show()



In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch

# Assuming 'dataset' is already loaded and transformed with ToTensor()
# If not, you would need to load it like in previous cells:
# data_dir = "/content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512"
# selected_classes = ['Integri', 'Rimossi', 'Abbattuti']
# selected_subfolders = ['_0', '_3x20_NoExtraTgt', '_5x25_NoExtraTgt']
# dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=selected_classes, transform=transforms.ToTensor(), selected_subfolders=selected_subfolders)

# Create lists to store the average intensity, average channel 0 values, and labels
avg_intensity = []
avg_channel_0 = []
image_labels = []

# Iterate through the dataset to calculate average intensity and channel 0 values
for img, label in dataset:
    # Ensure the image has at least two channels (real and imaginary)
    if img.shape[0] >= 2:
        # Assuming Channel 0 is real and Channel 1 is imaginary
        real_part = img[0]
        imag_part = img[1]

        # Calculate intensity: sqrt(real^2 + imag^2)
        intensity = torch.sqrt(real_part**2 + imag_part**2)

        # Calculate the average intensity for the image
        avg_intensity.append(torch.mean(intensity).item())

        # Calculate the average of channel 0 (real part) for plotting against intensity
        avg_channel_0.append(torch.mean(real_part).item())

        image_labels.append(label)
    else:
        print(f"Skipping image with unexpected shape: {img.shape}")


# Convert lists to NumPy arrays for plotting
avg_intensity_np = np.array(avg_intensity)
avg_channel_0_np = np.array(avg_channel_0)
image_labels_np = np.array(image_labels)

# Create the scatter plot
plt.figure(figsize=(10, 8))
scatter = plt.scatter(avg_channel_0_np, avg_channel_1_np, c=image_labels_np, cmap='viridis', alpha=0.6)

# Add labels and title
plt.xlabel('Average Value of Channel 0 (Real Part)')
plt.ylabel('Average Intensity (sqrt(Real^2 + Imag^2))')
plt.title('Scatter Plot of Average Channel 0 vs. Average Intensity per Image')

# Add a legend based on the class names from the dataset
legend_elements = []
for i, class_name in enumerate(dataset.classes):
    # Get the color from the colormap corresponding to the class index
    color = plt.cm.viridis(i / (len(dataset.classes) - 1))
    legend_elements.append(plt.Line2D([0], [0], marker='o', color='w', label=class_name,
                                      markerfacecolor=color, markersize=10))

plt.legend(handles=legend_elements, title="Classes")

# Show the plot
plt.grid(True)
plt.show()

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch

# Assuming 'dataset' is already loaded and transformed with ToTensor()
# If not, you would need to load it like in previous cells:
# data_dir = "/content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512"
# selected_classes = ['Integri', 'Rimossi', 'Abbattuti']
# selected_subfolders = ['_0', '_3x20_NoExtraTgt', '_5x25_NoExtraTgt']
# dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=selected_classes, transform=transforms.ToTensor(), selected_subfolders=selected_subfolders)


# Create a dictionary to store average intensities for each class
average_intensities_by_class = {class_name: [] for class_name in dataset.classes}

# Iterate through the dataset to calculate average intensity and group by class
for img, label in dataset:
    # Ensure the image has at least two channels (real and imaginary)
    if img.shape[0] >= 2:
        # Assuming Channel 0 is real and Channel 1 is imaginary
        real_part = img[0]
        imag_part = img[1]

        # Calculate intensity: sqrt(real^2 + imag^2)
        intensity = torch.sqrt(real_part**2 + imag_part**2)

        # Calculate the average intensity for the image
        avg_intensity = torch.mean(intensity).item()

        # Get the class name for the current label
        class_name = dataset.classes[label]

        # Append the average intensity to the corresponding class list
        average_intensities_by_class[class_name].append(avg_intensity)
    else:
        print(f"Skipping image with unexpected shape: {img.shape}")


# Plot histograms for each class
plt.figure(figsize=(12, 8))

# Determine the overall range of average intensities to set bins
all_avg_intensities = [item for sublist in average_intensities_by_class.values() for item in sublist]

print(num_bins)
for class_name, avg_intensities in average_intensities_by_class.items():
  min_intensity = min(avg_intensities)
  max_intensity = max(avg_intensities)
  bin_size = 0.005
  num_bins = int((max_intensity - min_intensity) / bin_size)
  plt.hist(avg_intensities, bins=num_bins, alpha=0.7, label=class_name)

plt.xlabel('Average Intensity')
plt.ylabel('Frequency')
plt.title('Distribution of Average Intensity per Class')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# === Visualize learned weights as images ===
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn # Import nn for defining the model structure
import torch.nn.functional as F # Import functional for the linear layer

# Define the structure of the Logistic Regression model used for training
class LogisticTwoLogits(nn.Module):
    def __init__(self, C=2, H=512, W=512, num_classes=2):
        super().__init__()
        self.flatten = nn.Flatten()
        self.fc = nn.Linear(C * H * W, num_classes)  # two logits for 2 classes

    def forward(self, x):              # x: [N, 2, 512, 512]
        x = self.flatten(x)            # -> [N, 524288]
        return self.fc(x)              # -> [N, num_classes]


# Load the model state dictionary from the checkpoint file
model_path = '/content/drive/MyDrive/Colab Notebooks/Towercheck/Model/LogisticRegression_2classes_v2.pt' # Use the correct model name
checkpoint = torch.load(model_path)

# Instantiate the model with the correct dimensions and load the state dictionary
# Infer C, H, W from the flattened dimension in the state_dict's fc.weight
num_classes, input_dim = checkpoint['model_state_dict']['fc.weight'].shape
C, H, W = 2, 512, 512 # Assuming C=2, H=512, W=512 based on data processing

loaded_model = LogisticTwoLogits(C=C, H=H, W=W, num_classes=num_classes)
loaded_model.load_state_dict(checkpoint['model_state_dict'])

# Grab weights from the linear layer of the loaded model
weights = loaded_model.fc.weight.data.cpu().numpy()  # shape: [num_classes, input_dim]
print(weights.shape)

# Reshape each class's weights into (C, H, W)
weights_img = weights.reshape(num_classes, C, H, W)


fig, axes = plt.subplots(1, num_classes, figsize=(5*num_classes, 5))

# Define class names based on the dataset used for validation (assuming the same classes as in cell 76f043d6)
class_names = ['Integri', 'Rimossi'] # Based on cell 76f043d6 which trained a 2-class model


for i in range(num_classes):
    ax = axes[i] if num_classes > 1 else axes

    if C == 1:
        # Single-channel input (grayscale)
        ax.imshow(weights_img[i, 0], cmap="seismic")
    elif C == 2:
        # Two-channel input -> show as two heatmaps side by side
        # We can plot one of the channels, or both side by side
        # Let's plot the second channel as in the original code cell KjCpzrIiPT-H
        ax.imshow(weights_img[i, 1], cmap="seismic", alpha=1)
    else:
        # If >2 channels, collapse to RGB with normalization
        w = weights_img[i]
        w = (w - w.min()) / (w.max() - w.min() + 1e-8)  # normalize 0-1
        if w.shape[0] >= 3:
            ax.imshow(np.transpose(w[:3], (1,2,0)))  # first 3 channels → RGB
        else:
            ax.imshow(w[0], cmap="seismic")


    ax.set_title(f"Weights for class: {class_names[i]}")
    ax.axis("off")

plt.tight_layout()
plt.show()



In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Assuming weights_img is available from a previous cell (e.g., cell_id: VVboVs6oaLo2)
# If not, you would need to load the model and extract weights as done in that cell.

# Ensure weights_img has the expected shape (num_classes, C, H, W)
# and that weights_img[1, 0] is a valid slice.
if 'weights_img' in globals() and weights_img.shape[0] > 1 and weights_img.shape[1] > 0:
    # Get the weights for the second class (index 1), first channel (index 0)
    weights_channel_0_class_1 = weights_img[1, 0]

    # Create a boolean mask for positive values
    positive_mask = weights_channel_0_class_1 > 0

    # Visualize the mask
    plt.figure(figsize=(6, 6))
    plt.imshow(positive_mask, cmap='gray') # Using a grayscale colormap for the mask
    plt.title('Mask of Positive Weights (Class 1, Channel 0)')
    plt.axis('off')
    plt.show()

    print("Mask of positive values created and visualized.")
else:
    print("Error: weights_img variable not found or does not have the expected shape.")
    print("Please run the cell that generates weights_img first (e.g., cell_id: VVboVs6oaLo2).")

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Assuming weights_img is available from a previous cell (e.g., cell_id: VVboVs6oaLo2)
# If not, you would need to load the model and extract weights as done in that cell.

# Ensure weights_img has the expected shape (num_classes, C, H, W)
# and that the required slices are valid.
if 'weights_img' in globals() and weights_img.shape[0] >= 2 and weights_img.shape[1] > 0:
    # Get the weights for the first class (index 0), first channel (index 0)
    weights_channel_0_class_0 = weights_img[0, 0]

    # Get the weights for the second class (index 1), first channel (index 0)
    weights_channel_0_class_1 = weights_img[1, 0]

    # Create boolean masks for positive values
    positive_mask_class_0 = weights_channel_0_class_0 > 0
    positive_mask_class_1 = weights_channel_0_class_1 > 0

    # Visualize the masks side-by-side
    fig, axes = plt.subplots(1, 2, figsize=(12, 6))

    axes[0].imshow(positive_mask_class_0, cmap='gray')
    axes[0].set_title('Mask of Positive Weights (Class 0, Channel 0)')
    axes[0].axis('off')

    axes[1].imshow(positive_mask_class_1, cmap='gray')
    axes[1].set_title('Mask of Positive Weights (Class 1, Channel 0)')
    axes[1].axis('off')

    plt.tight_layout()
    plt.show()

    print("Masks of positive values for Channel 0 of Class 0 and Class 1 created and visualized side-by-side.")
else:
    print("Error: weights_img variable not found or does not have the expected shape.")
    print("Please run the cell that generates weights_img first (e.g., cell_id: VVboVs6oaLo2).")

In [None]:
import torch
import matplotlib.pyplot as plt
import numpy as np
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import os

# Assuming ProcessedSARImageDataset class is defined in a previous cell

# Define the data directory
data_dir = "/content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512"

# Define the class lists to include
selected_classes = ['Integri', 'Rimossi', 'Abbattuti']

# Define the subfolders to include within each class
selected_subfolders = ['_0', '_3x20_NoExtraTgt', '_5x25_NoExtraTgt']

# Instantiate the dataset with ToTensor transform to get tensors
dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=selected_classes, transform=transforms.ToTensor(), selected_subfolders=selected_subfolders)

# Iterate through each class
for class_name in dataset.classes:
    class_label = dataset.classes.index(class_name)

    # Filter images belonging to the current class
    class_images = [img for img, label in dataset if label == class_label]

    if not class_images:
        print(f"No images found for class '{class_name}'.")
        continue

    # Separate channels
    channel_0_images = [img[0] for img in class_images if img.shape[0] > 0]
    channel_1_images = [img[1] for img in class_images if img.shape[0] > 1]

    # Calculate and plot the average image for Channel 0
    if channel_0_images:
        channel_0_stacked = torch.stack(channel_0_images)
        average_channel_0 = torch.mean(channel_0_stacked, dim=0)

        plt.figure(figsize=(6, 6))
        plt.imshow(average_channel_0.numpy(), cmap='viridis')
        plt.title(f'Average Channel 0 Image for Class: {class_name}')
        plt.axis('off')
        plt.show()
    else:
        print(f"No images with Channel 0 found for class '{class_name}'.")

    # Calculate and plot the average image for Channel 1
    if channel_1_images:
        channel_1_stacked = torch.stack(channel_1_images)
        average_channel_1 = torch.mean(channel_1_stacked, dim=0)

        plt.figure(figsize=(6, 6))
        plt.imshow(average_channel_1.numpy(), cmap='viridis')
        plt.title(f'Average Channel 1 Image for Class: {class_name}')
        plt.axis('off')
        plt.show()
    else:
        print(f"No images with Channel 1 found for class '{class_name}'.")

In [None]:
import numpy as np
import torch

# Assuming 'dataset' is loaded and has the ToTensor() transform applied
# Assuming 'positive_mask_class_1' is available from a previous cell (e.g., cell_id: 15e64c81)

if 'positive_mask_class_1' in globals() and 'dataset' in globals():
    # Ensure the mask is a PyTorch tensor for easier element-wise operations
    if not isinstance(positive_mask_class_1, torch.Tensor):
        positive_mask_class_1_tensor = torch.from_numpy(positive_mask_class_1).bool()
    else:
        positive_mask_class_1_tensor = positive_mask_class_1.bool()

    # Create a list to store the average masked values for each image
    average_masked_values_class_1 = []
    image_labels = []

    # Iterate through the dataset
    for img, label in dataset:
        # Ensure the image has at least the channel we are masking (channel 0)
        if img.shape[0] > 0:
            # Get the first channel of the image
            channel_0_image = img[0] # Assuming Channel 0

            # Apply the mask to the first channel
            masked_image_channel_0 = channel_0_image * positive_mask_class_1_tensor

            # Calculate the average of the masked values
            # To avoid division by zero if the mask is all False, we can check if there are any positive mask values
            if torch.sum(positive_mask_class_1_tensor) > 0:
                average_masked_value = torch.sum(masked_image_channel_0) / torch.sum(positive_mask_class_1_tensor)
            else:
                average_masked_value = 0.0 # Or np.nan, depending on how you want to handle this case

            average_masked_values_class_1.append(average_masked_value.item())
            image_labels.append(label)
        else:
            print(f"Skipping image with unexpected shape: {img.shape}")

    # Convert the list of average masked values and labels to NumPy arrays
    average_masked_values_class_1_np = np.array(average_masked_values_class_1)
    image_labels_np = np.array(image_labels)

    print("Average masked values for Class 1 calculated for each example.")
    print(f"Shape of average masked values array: {average_masked_values_class_1_np.shape}")
    print(f"Shape of image labels array: {image_labels_np.shape}")

else:
    print("Error: 'positive_mask_class_1' or 'dataset' not found.")
    print("Please ensure 'positive_mask_class_1' is generated first (e.g., by running cell_id: 15e64c81) and 'dataset' is loaded.")

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Assuming 'average_masked_values_class_1_np' and 'image_labels_np' are available
# from a previous cell (e.g., cell_id: 8f43fe18) and 'dataset' is loaded

if 'average_masked_values_class_1_np' in globals() and 'image_labels_np' in globals() and 'dataset' in globals():
    # Create a dictionary to store average masked values for each class
    average_masked_values_by_class = {class_name: [] for class_name in dataset.classes}

    # Populate the dictionary with the calculated average masked values, grouped by class
    for i in range(len(image_labels_np)):
        label = image_labels_np[i]
        class_name = dataset.classes[label]
        average_masked_values_by_class[class_name].append(average_masked_values_class_1_np[i])

    # Plot histograms for each class
    plt.figure(figsize=(12, 8))

    for class_name, avg_masked_values in average_masked_values_by_class.items():
        plt.hist(avg_masked_values, bins=50, alpha=0.7, label=class_name) # Adjust bins as needed

    plt.xlabel('Average Masked Value (Class 1, Channel 0 Mask)')
    plt.ylabel('Frequency')
    plt.title('Distribution of Average Masked Values per Class')
    plt.legend()
    plt.grid(True)
    plt.show()

else:
    print("Error: Required variables ('average_masked_values_class_1_np', 'image_labels_np', or 'dataset') not found.")
    print("Please ensure the cell calculating average masked values (e.g., cell_id: 8f43fe18) and the cell loading the dataset have been run.")

In [None]:
import numpy as np
import torch
import matplotlib.pyplot as plt # Import matplotlib.pyplot

# Assuming 'dataset' is loaded and has the ToTensor() transform applied
# Assuming 'positive_mask_class_0' is available from a previous cell (e.g., cell_id: 15e64c81)

if 'positive_mask_class_0' in globals() and 'dataset' in globals():
    # Ensure the mask is a PyTorch tensor for easier element-wise operations
    if not isinstance(positive_mask_class_0, torch.Tensor):
        positive_mask_class_0_tensor = torch.from_numpy(positive_mask_class_0).bool()
    else:
        positive_mask_class_0_tensor = positive_mask_class_0.bool()

    # Create a list to store the average masked values for each image
    average_masked_values_class_0 = []
    image_labels = []

    # Iterate through the dataset
    for img, label in dataset:
        # Ensure the image has at least the channel we are masking (channel 0)
        if img.shape[0] > 0:
            # Get the first channel of the image
            channel_0_image = img[0] # Assuming Channel 0

            # Apply the mask to the first channel
            masked_image_channel_0 = channel_0_image * positive_mask_class_0_tensor

            # Calculate the average of the masked values
            # To avoid division by zero if the mask is all False, we can check if there are any positive mask values
            if torch.sum(positive_mask_class_0_tensor) > 0:
                average_masked_value = torch.sum(masked_image_channel_0) / torch.sum(positive_mask_class_0_tensor)
            else:
                average_masked_value = 0.0 # Or np.nan, depending on how you want to handle this case

            average_masked_values_class_0.append(average_masked_value.item())
            image_labels.append(label)
        else:
            print(f"Skipping image with unexpected shape: {img.shape}")

    # Convert the list of average masked values and labels to NumPy arrays
    average_masked_values_class_0_np = np.array(average_masked_values_class_0)
    image_labels_np = np.array(image_labels)

    print("Average masked values for Class 0 calculated for each example.")
    print(f"Shape of average masked values array: {average_masked_values_class_0_np.shape}")
    print(f"Shape of image labels array: {image_labels_np.shape}")

    # --- Plot histogram divided by class ---
    # Create a dictionary to store average masked values for each class for plotting
    average_masked_values_by_class_plot = {class_name: [] for class_name in dataset.classes}

    # Populate the dictionary with the calculated average masked values, grouped by class
    for i in range(len(image_labels_np)):
        label = image_labels_np[i]
        class_name = dataset.classes[label]
        average_masked_values_by_class_plot[class_name].append(average_masked_values_class_0_np[i])

    # Determine the overall range of average masked values to set bins
    all_avg_masked_values = [item for sublist in average_masked_values_by_class_plot.values() for item in sublist]
    min_value = min(all_avg_masked_values)
    max_value = max(all_avg_masked_values)
    bin_width = 0.005 # Desired bin width
    bins = np.arange(min_value, max_value + bin_width, bin_width) # Create bins with fixed width

    plt.figure(figsize=(12, 8))

    for class_name, avg_masked_values in average_masked_values_by_class_plot.items():
        plt.hist(avg_masked_values, bins=bins, alpha=0.7, label=class_name) # Use the defined bins

    plt.xlabel('Average Masked Value (Class 0, Channel 0 Mask)')
    plt.ylabel('Frequency')
    plt.title('Distribution of Average Masked Values per Class (Class 0 Mask)')
    plt.legend()
    plt.grid(True)
    plt.show()

else:
    print("Error: 'positive_mask_class_0' or 'dataset' not found.")
    print("Please ensure 'positive_mask_class_0' is generated first (e.g., by running cell_id: 15e64c81) and 'dataset' is loaded.")

In [None]:
# Load the model state dictionary
model_path = '/content/drive/MyDrive/Colab Notebooks/Towercheck/Model/LogisticRegression_2classes.pt'
checkpoint = torch.load(model_path)

# Extract the model state dictionary
# The state_dict directly contains 'weight' and 'bias' keys, not 'linear.weight' and 'linear.bias'
model_state_dict = checkpoint['model_state_dict']

# Create a dummy model to load the weights into (assuming the structure is the same as LogisticRegressionTorch)
# We need the input_dim and num_classes from the training setup
# Based on previous cells, the saved weights have input_dim 786432
input_dim = 786432 # Input dimension from the saved checkpoint
num_classes = 2 # Number of classes in the saved model (based on filename)

class LogisticRegressionTorch(nn.Module):
    def __init__(self, in_dim, n_classes):
        super().__init__()
        # The keys in the saved state_dict are 'weight' and 'bias'
        self.weight = nn.Parameter(torch.empty(n_classes, in_dim))
        self.bias = nn.Parameter(torch.empty(n_classes))

    def forward(self, x):
        return F.linear(x.view(x.size(0), -1), self.weight, self.bias)

# We also need to import the functional module
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import torch


loaded_model = LogisticRegressionTorch(input_dim, num_classes)
loaded_model.load_state_dict(model_state_dict)
print (loaded_model)
# Grab weights from the linear layer of the loaded model
weights = loaded_model.weight.data.cpu().numpy()  # shape: [num_classes, input_dim]
print(weights.shape)

# Reshape each class's weights into (C, H, W)
# C, H, W are the channel, height, and width of the original images used for training
# Assuming C=2, H=512, W=512 based on previous cells and the total input_dim
C, H, W = 3, 512, 512
weights_img = weights.reshape(num_classes, C, H, W)


fig, axes = plt.subplots(1, num_classes, figsize=(5*num_classes, 5))

# Define class names based on the saved model
class_names = ['Integri', 'Rimossi'] # Based on cell 'a5e99ba0'

for i in range(num_classes):
    ax = axes[i] if num_classes > 1 else axes

    if C == 1:
        # Single-channel input (grayscale)
        ax.imshow(weights_img[i, 0], cmap="seismic")
    elif C == 2:
        # Two-channel input -> show as two heatmaps side by side
        # We can plot one of the channels, or both side by side
        # Let's plot the second channel as in the original code cell KjCpzrIiPT-H
        ax.imshow(weights_img[i, 1], cmap="seismic", alpha=1)
    elif C == 3:
        # Three-channel input -> plot the first 2 channels
        # We can plot them side by side or overlay them
        # For simplicity, let's plot the first channel as a heatmap
        # You could also plot weights_img[i, 1] for the second channel
        ax.imshow(weights_img[i, 0], cmap="seismic")
        ax.set_title(f"Weights for class: {class_names[i]} (Channel 1)")
    else:
        # If >3 channels or unexpected, collapse to grayscale or show first channel
        w = weights_img[i]
        # For visualization, take the mean across channels or select one channel
        w_gray = np.mean(w, axis=0) # Take mean across channels

        ax.imshow(w_gray, cmap='viridis') # Use a suitable colormap
        ax.set_title(f"Weights for class: {class_names[i]} (Mean Channels)")

    ax.axis("off")

plt.tight_layout()
plt.show()

In [None]:
print(img.shape)
import matplotlib.pyplot as plt

plt.imshow(img[1])
plt.show()

In [None]:
# Plot weight maps for each class
import matplotlib.pyplot as plt
import numpy as np
import os# === Flexible Logistic Regression (PyTorch, 2 or 3 classes) ===
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
import matplotlib.pyplot as plt
import seaborn as sns


#Load the dataset
data_dir = "/content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512"

# Define the class lists
two_classes = ['Integri', 'Rimossi']
three_classes = ['Integri', 'Rimossi', 'Abbattuti']
selected_subfolders = ['_0']#, '_3x20_NoExtraTgt', '_5x25_NoExtraTgt'] # Updated subfolder names


# Create the datasets first, before normalization
two_class_dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=two_classes, transform=transforms.ToTensor(),  selected_subfolders=selected_subfolders)
three_class_dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=three_classes, transform=transforms.ToTensor(),  selected_subfolders=selected_subfolders)


# Choose the dataset (either two_class_dataset or three_class_dataset)
dataset_to_use = two_class_dataset  # Or three_class_dataset

# Define the model based on the number of classes in the chosen dataset
num_classes = len(dataset_to_use.classes)

# Split data into training and validation sets
train_size = int(0.9 * len(dataset_to_use))
val_size = len(dataset_to_use) - train_size
train_dataset, val_dataset = random_split(dataset_to_use, [train_size, val_size])

def compute_mean_std(dataset):
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
    mean = 0.0
    std = 0.0
    n = 0

    for images, _ in loader:
        images = images.view(images.size(0), -1)  # Flatten
        n += images.size(0)
        mean += images.mean(1).sum().item()
        std += images.std(1).sum().item()

    mean /= n
    std /= n
    return mean, std

mean, std = compute_mean_std(train_dataset)
print("mean: ", mean)
print("std: ", std)


# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
valid_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

#Redefine transforms with normalization
normalized_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((mean,), (std,))
])

#Reload datasets with normalization
train_dataset.dataset.transform = normalized_transform
val_dataset.dataset.transform = normalized_transform


# Infer input shape
with torch.no_grad():
    sample_batch = next(iter(train_loader))[0]
C, H, W = sample_batch.shape[1], sample_batch.shape[2], sample_batch.shape[3]
input_dim = C * H * W

num_classes = len(dataset_to_use.classes)

class LogisticRegressionTorch(nn.Module):
    def __init__(self, in_dim, n_classes):
        super().__init__()
        self.linear = nn.Linear(in_dim, n_classes)
    def forward(self, x):
        return self.linear(x.view(x.size(0), -1))

logreg_model = LogisticRegressionTorch(input_dim, num_classes).to(device)

# Optional class weights
def compute_class_weights(loader, n_classes):
    counts = torch.zeros(n_classes, dtype=torch.long)
    for _, y in loader:
        counts += torch.bincount(y, minlength=n_classes)
    w = 1.0 / torch.clamp(counts.float(), min=1.0)
    return (w / w.sum() * n_classes).to(device)

use_class_weights = True
weights = compute_class_weights(train_loader, num_classes) if use_class_weights else None

criterion = nn.CrossEntropyLoss(weight=weights)
optimizer = optim.SGD(logreg_model.parameters(), lr=1e-6, momentum=0.9, weight_decay=1e-4)

num_epochs = 30
best_val_acc, best_state = 0.0, None

# Logs
train_losses, train_accuracies, val_accuracies = [], [], []

for epoch in range(1, num_epochs + 1):
    # --- Training ---
    logreg_model.train()
    running_loss, correct, total = 0.0, 0, 0

    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)

        optimizer.zero_grad()
        logits = logreg_model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * xb.size(0)
        preds = logits.argmax(1)
        correct += (preds == yb).sum().item()
        total += yb.size(0)

    train_loss = running_loss / len(train_dataset)
    train_acc = correct / max(total, 1)

    # --- Validation ---
    logreg_model.eval()
    v_correct, v_total = 0, 0
    with torch.no_grad():
        for xb, yb in valid_loader:
            xb, yb = xb.to(device), yb.to(device)
            preds = logreg_model(xb).argmax(1)
            v_correct += (preds == yb).sum().item()
            v_total += yb.size(0)

    val_acc = v_correct / max(v_total, 1)

    # Store logs
    train_losses.append(train_loss)
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)

    # Save best model
    if val_acc > best_val_acc:
        best_val_acc, best_state = val_acc, {k: v.cpu().clone() for k, v in logreg_model.state_dict().items()}

    print(f"Epoch {epoch:02d}/{num_epochs} | "
          f"Loss: {train_loss:.4f} | Train Acc: {train_acc*100:.2f}% | Val Acc: {val_acc*100:.2f}%")

# Restore best
if best_state is not None:
    logreg_model.load_state_dict(best_state)

# --- Plot curves ---
plt.figure(figsize=(10,4))
plt.plot(range(1, num_epochs+1), train_losses, label="Train Loss")
plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.title("Training Loss"); plt.legend(); plt.show()

plt.figure(figsize=(10,4))
plt.plot(range(1, num_epochs+1), [a*100 for a in train_accuracies], label="Train Acc")
plt.plot(range(1, num_epochs+1), [a*100 for a in val_accuracies], label="Val Acc")
plt.xlabel("Epoch"); plt.ylabel("Accuracy (%)"); plt.title("Accuracy per Epoch"); plt.legend(); plt.show()

# --- Final eval ---
all_preds, all_labels = [], []
logreg_model.eval()
with torch.no_grad():
    for xb, yb in valid_loader:
        preds = logreg_model(xb.to(device)).argmax(1).cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(yb.numpy())

print(f"\nBest Validation Accuracy: {best_val_acc*100:.2f}%")
cm = confusion_matrix(all_labels, all_preds)
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
            xticklabels=dataset_to_use.classes,
            yticklabels=dataset_to_use.classes)
plt.xlabel("Predicted"); plt.ylabel("True"); plt.title("Confusion Matrix (Validation)"); plt.show()

print("\nClassification Report:")
print(classification_report(all_labels, all_preds, target_names=dataset_to_use.classes, digits=3))

In [None]:
# === Visualize learned weights as images ===
import matplotlib.pyplot as plt

# Grab weights from the linear layer
weights = logreg_model.linear.weight.data.cpu().numpy()  # shape: [num_classes, input_dim]
print(weights.shape)
# Reshape each class's weights into (C, H, W)
weights_img = weights.reshape(num_classes, C, H, W)


fig, axes = plt.subplots(1, num_classes, figsize=(5*num_classes, 5))

for i in range(num_classes):
    ax = axes[i] if num_classes > 1 else axes

    if C == 1:
        # Single-channel input (grayscale)
        ax.imshow(weights_img[i, 0], cmap="seismic")
    elif C == 2:
        # Two-channel input → show as two heatmaps side by side
        #ax.imshow(weights_img[i, 0], cmap="seismic", alpha=0.7)
        ax.imshow(weights_img[i, 1], cmap="seismic", alpha=1)
    else:
        # If >2 channels, collapse to RGB with normalization
        w = weights_img[i]
        w = (w - w.min()) / (w.max() - w.min() + 1e-8)  # normalize 0-1
        if w.shape[0] >= 3:
            ax.imshow(np.transpose(w[:3], (1,2,0)))  # first 3 channels → RGB
        else:
            ax.imshow(w[0], cmap="seismic")

    ax.set_title(f"Weights for class: {dataset_to_use.classes[i]}")
    ax.axis("off")

plt.tight_layout()
plt.show()


In [None]:
# === Flexible Logistic Regression (PyTorch, 2 or 3 classes) ===
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
import matplotlib.pyplot as plt
import seaborn as sns
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
import numpy as np
import os


#Load the dataset
data_dir = "/content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512"

# Define the class lists
two_classes = ['Integri', 'Rimossi']
three_classes = ['Integri', 'Rimossi', 'Abbattuti']
selected_subfolders = ['_0', '_3x20_NoExtraTgt', '_5x25_NoExtraTgt'] # Updated subfolder names


# Create the datasets first, before normalization
two_class_dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=two_classes, transform=transforms.ToTensor(),  selected_subfolders=selected_subfolders)
three_class_dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=three_classes, transform=transforms.ToTensor(),  selected_subfolders=selected_subfolders)


# Choose the dataset (either two_class_dataset or three_class_dataset)
dataset_to_use = two_class_dataset  # Or three_class_dataset

# Define the model based on the number of classes in the chosen dataset
num_classes = len(dataset_to_use.classes)

# Split data into training and validation sets
train_size = int(0.9 * len(dataset_to_use))
val_size = len(dataset_to_use) - train_size
train_dataset, val_dataset = random_split(dataset_to_use, [train_size, val_size])

def compute_mean_std(dataset):
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
    mean = 0.0
    std = 0.0
    n = 0

    for images, _ in loader:
        images = images.view(images.size(0), -1)  # Flatten
        n += images.size(0)
        mean += images.mean(1).sum().item()
        std += images.std(1).sum().item()

    mean /= n
    std /= n
    return mean, std

mean, std = compute_mean_std(train_dataset)
print("mean: ", mean)
print("std: ", std)


# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
valid_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

#Redefine transforms with normalization
normalized_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((mean,), (std,))
])

#Reload datasets with normalization
train_dataset.dataset.transform = normalized_transform
val_dataset.dataset.transform = normalized_transform


# Infer input shape
with torch.no_grad():
    sample_batch = next(iter(train_loader))[0]
C, H, W = sample_batch.shape[1], sample_batch.shape[2], sample_batch.shape[3]
input_dim = C * H * W

num_classes = len(dataset_to_use.classes)

class LogisticRegressionTorch(nn.Module):
    def __init__(self, in_dim, n_classes):
        super().__init__()
        self.linear = nn.Linear(in_dim, n_classes)
    def forward(self, x):
        return self.linear(x.view(x.size(0), -1))

logreg_model = LogisticRegressionTorch(input_dim, num_classes).to(device)

# Optional class weights
def compute_class_weights(loader, n_classes):
    counts = torch.zeros(n_classes, dtype=torch.long)
    for _, y in loader:
        counts += torch.bincount(y, minlength=n_classes)
    w = 1.0 / torch.clamp(counts.float(), min=1.0)
    return (w / w.sum() * n_classes).to(device)

use_class_weights = True
weights = compute_class_weights(train_loader, num_classes) if use_class_weights else None

criterion = nn.CrossEntropyLoss(weight=weights)
optimizer = optim.SGD(logreg_model.parameters(), lr=1e-6, momentum=0.9, weight_decay=1e-4)

num_epochs = 30
best_val_acc, best_state = 0.0, None

# Logs
train_losses, train_accuracies, val_accuracies = [], [], []

for epoch in range(1, num_epochs + 1):
    # --- Training ---
    logreg_model.train()
    running_loss, correct, total = 0.0, 0, 0

    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)

        optimizer.zero_grad()
        logits = logreg_model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * xb.size(0)
        preds = logits.argmax(1)
        correct += (preds == yb).sum().item()
        total += yb.size(0)

    train_loss = running_loss / len(train_dataset)
    train_acc = correct / max(total, 1)

    # --- Validation ---
    logreg_model.eval()
    v_correct, v_total = 0, 0
    with torch.no_grad():
        for xb, yb in valid_loader:
            xb, yb = xb.to(device), yb.to(device)
            preds = logreg_model(xb).argmax(1)
            v_correct += (preds == yb).sum().item()
            v_total += yb.size(0)

    val_acc = v_correct / max(v_total, 1)

    # Store logs
    train_losses.append(train_loss)
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)

    # Save best model
    if val_acc > best_val_acc:
        best_val_acc, best_state = val_acc, {k: v.cpu().clone() for k, v in logreg_model.state_dict().items()}

    print(f"Epoch {epoch:02d}/{num_epochs} | "
          f"Loss: {train_loss:.4f} | Train Acc: {train_acc*100:.2f}% | Val Acc: {val_acc*100:.2f}%")

# Restore best
if best_state is not None:
    logreg_model.load_state_dict(best_state)

# --- Plot curves ---
plt.figure(figsize=(10,4))
plt.plot(range(1, num_epochs+1), train_losses, label="Train Loss")
plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.title("Training Loss"); plt.legend(); plt.show()

plt.figure(figsize=(10,4))
plt.plot(range(1, num_epochs+1), [a*100 for a in train_accuracies], label="Train Acc")
plt.plot(range(1, num_epochs+1), [a*100 for a in val_accuracies], label="Val Acc")
plt.xlabel("Epoch"); plt.ylabel("Accuracy (%)"); plt.title("Accuracy per Epoch"); plt.legend(); plt.show()

# --- Final eval ---
all_preds, all_labels = [], []
logreg_model.eval()
with torch.no_grad():
    for xb, yb in valid_loader:
        preds = logreg_model(xb.to(device)).argmax(1).cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(yb.numpy())

print(f"\nBest Validation Accuracy: {best_val_acc*100:.2f}%")
cm = confusion_matrix(all_labels, all_preds)
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
            xticklabels=dataset_to_use.classes,
            yticklabels=dataset_to_use.classes)
plt.xlabel("Predicted"); plt.ylabel("True"); plt.title("Confusion Matrix (Validation)"); plt.show()

print("\nClassification Report:")
print(classification_report(all_labels, all_preds, target_names=dataset_to_use.classes, digits=3))

# Task
Run a PCA on the images loaded using the `ProcessedSARImageDataset` class.

## Load the image data

### Subtask:
Load the images from the dataset using the `ProcessedSARImageDataset` class.


**Reasoning**:
The first step is to load the images using the `ProcessedSARImageDataset` class as per the instructions. This involves defining the data directory, selected classes, and selected subfolders, and then instantiating the dataset object.



In [None]:
# Define the data directory
data_dir = "/content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512"

# Define the class lists to include
selected_classes = ['Integri', 'Rimossi', 'Abbattuti']

# Define the subfolders to include within each class
selected_subfolders = ['_0', '_3x20_NoExtraTgt', '_5x25_NoExtraTgt']

# Create the dataset without any transforms yet
dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=selected_classes, transform=None, selected_subfolders=selected_subfolders)

print(f'Number of images loaded: {len(dataset)}')

## Preprocess the images

### Subtask:
Apply necessary transformations (like ToTensor and normalization) and flatten the images into a 1D vector.


**Reasoning**:
Apply the necessary transformations and flatten the images.



In [None]:
# Define the transformation pipeline
mean = 0.02739770549039046
std = 0.09120287481281493

transform_pipeline = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((mean,), (std,)),
    transforms.Lambda(lambda x: x.view(-1)) # Flatten the image tensor
])

# Update the transform attribute of the dataset
dataset.transform = transform_pipeline

# Create a list to store the preprocessed and flattened image tensors
flattened_images = []

# Iterate through the dataset and apply the transformations
for img, label in dataset:
    flattened_images.append(img)

print(f'Number of flattened images: {len(flattened_images)}')
# Optionally, check the shape of the first flattened image
if flattened_images:
    print(f'Shape of the first flattened image: {flattened_images[0].shape}')

## Prepare data for pca

### Subtask:
Collect the flattened image vectors into a single NumPy array where each row is an image and each column is a pixel feature.


**Reasoning**:
Convert the list of flattened image tensors to a NumPy array and print its shape.



In [None]:
import numpy as np

# Convert the list of flattened image tensors into a single NumPy array
images_np = np.array([img.numpy() for img in flattened_images])

# Print the shape of the NumPy array
print(f'Shape of the NumPy array: {images_np.shape}')

## Perform pca

### Subtask:
Apply PCA to the prepared data to reduce dimensionality and find the principal components.


**Reasoning**:
Apply PCA to the prepared NumPy array to reduce dimensionality and find the principal components, as per the instructions.



In [None]:
from sklearn.decomposition import PCA

# Instantiate PCA
# Using a number of components to start with, can be adjusted
n_components = 50
pca = PCA(n_components=n_components)

# Fit PCA on the data
pca.fit(images_np)

# Transform the data
images_pca = pca.transform(images_np)

# Print the explained variance ratio
print(f'Explained variance ratio by each component: {pca.explained_variance_ratio_}')
print(f'Total explained variance by {n_components} components: {np.sum(pca.explained_variance_ratio_):.4f}')
print(f'Shape of the PCA-transformed data: {images_pca.shape}')

## Visualize pca results

### Subtask:
Plot the images in the reduced-dimensional space (e.g., a 2D scatter plot) and potentially visualize the principal components themselves.


**Reasoning**:
Create a scatter plot of the first two principal components, coloring points by class labels, and optionally visualize the principal components as images.



In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Get the original labels from the dataset
labels = [label for _, label in dataset]
labels_np = np.array(labels)

# Create a scatter plot of the first two principal components
plt.figure(figsize=(10, 8))
scatter = plt.scatter(images_pca[:, 8], images_pca[:, 49], c=labels_np, cmap='viridis', alpha=0.6)
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.title('PCA of Image Dataset')

# Add a legend
legend_elements = []
for i, class_name in enumerate(dataset.classes):
    legend_elements.append(plt.Line2D([0], [0], marker='o', color='w', label=class_name,
                                      markerfacecolor=plt.cm.viridis(i / (len(dataset.classes) - 1)), markersize=10))

plt.legend(handles=legend_elements, title="Classes")
plt.show()

# Optional: Visualize the first few principal components as images
print("\nVisualizing the first few principal components:")
num_components_to_visualize = min(5, n_components) # Visualize up to 5 components

# Reshape the principal components back to the original image dimensions
# Assuming original image dimensions are 512x512x3 (after converting to RGB)
original_shape = (3, 512, 512) # Channels x Height x Width

plt.figure(figsize=(15, 5))
for i in range(num_components_to_visualize):
    pc_image = pca.components_[i].reshape(original_shape)

    # For visualization, take the mean across channels or select one channel
    pc_image_gray = np.mean(pc_image, axis=0) # Take mean across channels

    plt.subplot(1, num_components_to_visualize, i + 1)
    plt.imshow(pc_image_gray, cmap='viridis') # Use a suitable colormap
    plt.title(f'PC {i+1}')
    plt.axis('off')

plt.tight_layout()
plt.show()

## Summary:

### Data Analysis Key Findings

*   1200 images were loaded from the specified data directory, classes, and subfolders.
*   Each image was successfully transformed using `ToTensor` and `Normalize` and then flattened into a 1D vector of size 786432.
*   The flattened images were successfully converted into a NumPy array with a shape of (1200, 786432).
*   PCA was applied to the data, reducing the dimensionality to 50 components.
*   The first 50 principal components captured approximately 97.36% of the total variance in the dataset.
*   A scatter plot of the first two principal components showed the distribution of images in the reduced-dimensional space, colored by their original classes.
*   The first few principal components were successfully visualized as images, providing insight into the spatial patterns captured by the PCA.

### Insights or Next Steps

*   The PCA analysis effectively reduced the dimensionality of the image data while retaining a high percentage of the variance, suggesting that the first few principal components capture the most significant information.
*   The scatter plot of the first two principal components can help assess the separability of different image classes in the reduced-dimensional space, which could inform the choice of subsequent classification or clustering algorithms.


# Task
Train an SVM model to classify images from the dataset located in "/content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512/" which is split into subfolders for different classes and sub-subfolders for different data variations. The model should be trained on a subset of the data based on user-selected sub-subfolders and evaluated on a test set.

## Load and prepare data for svm

### Subtask:
Load the image data using the `ProcessedSARImageDataset` class and preprocess it into a format suitable for scikit-learn's SVM. This will likely involve flattening the images and organizing them into a feature matrix and a label vector.


**Reasoning**:
Load the data using the `ProcessedSARImageDataset` class, apply the transformations, and prepare the data and labels as NumPy arrays for SVM training.



In [None]:
import numpy as np
from sklearn.model_selection import train_test_split

# 1. Define the data directory path.
data_dir = "/content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512"

# 2. Define the list of class names to be included in the dataset.
selected_classes = ['Integri', 'Rimossi', 'Abbattuti']

# 3. Define the list of subfolder names to be included within each class directory.
selected_subfolders = ['_0', '_3x20_NoExtraTgt', '_5x25_NoExtraTgt']

# 4. Instantiate the ProcessedSARImageDataset class with the defined data directory, selected classes, and selected subfolders. Set transform to None initially.
dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=selected_classes, transform=None, selected_subfolders=selected_subfolders)

# 5. Define a transformation pipeline that includes converting images to tensors, normalizing them using the previously calculated mean and standard deviation (`mean` and `std`), and flattening the tensors into a 1D vector.
# The mean and std were calculated in a previous cell (cell_id: VIrQlTU-YGpl and cell_id: 76f043d6).
# Using the mean and std from the 3-class experiment with all subfolders
mean = 0.02739770549039046
std = 0.09120287481281493

transform_pipeline = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((mean,), (std,)),
    transforms.Lambda(lambda x: x.view(-1)) # Flatten the image tensor
])

# 6. Update the transform attribute of the instantiated dataset object with the defined transformation pipeline.
dataset.transform = transform_pipeline

# 7. Create empty lists to store the flattened image tensors and their corresponding labels.
flattened_images = []
labels = []

# 8. Iterate through the dataset, apply the transformations to each image, and append the flattened image tensor and its label to the respective lists.
for img, label in dataset:
    flattened_images.append(img)
    labels.append(label)

# 9. Convert the list of flattened image tensors into a NumPy array.
images_np = np.array([img.numpy() for img in flattened_images])

# 10. Convert the list of labels into a NumPy array.
labels_np = np.array(labels)

print(f'Shape of the image feature matrix: {images_np.shape}')
print(f'Shape of the label vector: {labels_np.shape}')

**Reasoning**:
Split the data into training and testing sets, then train an SVM classifier on the training data.



In [None]:
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(images_np, labels_np, test_size=0.2, random_state=42, stratify=labels_np)

# Instantiate and train the SVM classifier
svm_model = SVC(kernel='linear') # Using a linear kernel for simplicity, can be adjusted
svm_model.fit(X_train, y_train)

print("SVM model trained successfully.")

**Reasoning**:
Evaluate the trained SVM model on the test set and print the classification report and confusion matrix.



In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Make predictions on the test set
y_pred = svm_model.predict(X_test)

# Print the classification report
print("Classification Report:")
print(classification_report(y_test, y_pred, target_names=dataset.classes))

# Calculate the confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Plot the confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=dataset.classes, yticklabels=dataset.classes)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix (Test Set)')
plt.show()

## Summary:

### Data Analysis Key Findings

*   The image data was loaded and preprocessed, resulting in a feature matrix of shape (1200, 786432) and a label vector of shape (1200,).
*   The data was split into training and testing sets, with 960 samples for training and 240 samples for testing.
*   The trained linear SVM model achieved high classification performance on the test set. The classification report shows precision, recall, and f1-scores of 1.00 for 'Rimossi' and 'Abbattuti', and 0.99 for 'Integri'.
*   The confusion matrix indicates only one misclassification on the test set, where an 'Integri' image was predicted as 'Rimossi'.

### Insights or Next Steps

*   The linear SVM model performed exceptionally well on this dataset subset. Consider exploring the model's performance on other subsets of the data variations or the full dataset.
*   Investigate the single misclassification to understand if there are specific characteristics of that image that led to the incorrect prediction and if any data augmentation or feature engineering could address it.


# Task
Implement and evaluate an SVM classifier with cross-validation for the image classification task using the data loaded with `ProcessedSARImageDataset`.

## Load and prepare data for svm

### Subtask:
Load the image data using the `ProcessedSARImageDataset` class and preprocess it into a format suitable for scikit-learn's SVM. This will likely involve flattening the images and organizing them into a feature matrix and a label vector.


**Reasoning**:
Load the data using the ProcessedSARImageDataset class, apply the transformations, and prepare the data and labels as NumPy arrays for SVM training.



In [None]:
import numpy as np
from sklearn.model_selection import train_test_split

# 1. Define the data directory path.
data_dir = "/content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512"

# 2. Define the list of class names to be included in the dataset.
selected_classes = ['Integri', 'Rimossi']

# 3. Define the list of subfolder names to be included within each class directory.
selected_subfolders = ['_0', '_3x20_NoExtraTgt', '_5x25_NoExtraTgt']

# 4. Instantiate the ProcessedSARImageDataset class with the defined data directory, selected classes, and selected subfolders. Set transform to None initially.
dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=selected_classes, transform=None, selected_subfolders=selected_subfolders)

# 5. Define a transformation pipeline that includes converting images to tensors, normalizing them using the previously calculated mean and standard deviation (`mean` and `std`), and flattening the tensors into a 1D vector.
# The mean and std were calculated in a previous cell (cell_id: VIrQlTU-YGpl and cell_id: 76f043d6).
# Using the mean and std from the 3-class experiment with all subfolders
mean = 0.02739770549039046
std = 0.09120287481281493

transform_pipeline = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((mean,), (std,)),
    transforms.Lambda(lambda x: x.view(-1)) # Flatten the image tensor
])

# 6. Update the transform attribute of the instantiated dataset object with the defined transformation pipeline.
dataset.transform = transform_pipeline

# 7. Create empty lists to store the flattened image tensors and their corresponding labels.
flattened_images = []
labels = []

# 8. Iterate through the dataset, apply the transformations to each image, and append the flattened image tensor and its label to the respective lists.
for img, label in dataset:
    flattened_images.append(img)
    labels.append(label)

# 9. Convert the list of flattened image tensors into a NumPy array.
images_np = np.array([img.numpy() for img in flattened_images])

# 10. Convert the list of labels into a NumPy array.
labels_np = np.array(labels)

# 11. Print the shapes of the image feature matrix and the label vector to verify the data preparation.
print(f'Shape of the image feature matrix: {images_np.shape}')
print(f'Shape of the label vector: {labels_np.shape}')

## Split data

### Subtask:
Split the prepared data into training and testing sets.


**Reasoning**:
Split the prepared data into training and testing sets using train_test_split, ensuring stratification and setting a random_state for reproducibility, and then print the shapes of the resulting arrays.



In [None]:
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(images_np, labels_np, test_size=0.2, random_state=42, stratify=labels_np)

# Print the shapes of the resulting training and testing sets
print(f'Shape of X_train: {X_train.shape}')
print(f'Shape of X_test: {X_test.shape}')
print(f'Shape of y_train: {y_train.shape}')
print(f'Shape of y_test: {y_test.shape}')

## Define svm model

### Subtask:
Instantiate an SVM classifier.


**Reasoning**:
Import the SVC class and instantiate the SVM model.



In [None]:
from sklearn.svm import SVC

# Instantiate the SVM classifier
svm_model = SVC(kernel='linear', random_state=42) # Using a linear kernel and setting random_state for reproducibility

# Print a confirmation message
print("SVM model instantiated successfully.")

## Implement cross-validation

### Subtask:
Perform cross-validation on the training data to evaluate the model's performance.


**Reasoning**:
Perform cross-validation on the training data to evaluate the model's performance.



In [None]:
from sklearn.model_selection import cross_val_score

# Perform cross-validation on the training data
cv_scores = cross_val_score(svm_model, X_train, y_train, cv=5, scoring='accuracy')

# Print the cross-validation scores
print("Cross-validation scores:", cv_scores)

# Print the mean and standard deviation of the cross-validation scores
print(f"Mean cross-validation accuracy: {np.mean(cv_scores):.4f}")
print(f"Standard deviation of cross-validation accuracy: {np.std(cv_scores):.4f}")

In [None]:
# Train the SVM classifier
svm_model.fit(X_train, y_train)

# Make predictions on the test set
y_pred = svm_model.predict(X_test)

# Print the classification report
print("Classification Report:")
print(classification_report(y_test, y_pred, target_names=dataset.classes))

# Calculate the confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Plot the confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=dataset.classes, yticklabels=dataset.classes)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix (Test Set)')
plt.show()

## Summary:

### Data Analysis Key Findings

*   The image data was successfully loaded and preprocessed, resulting in a feature matrix of shape (1200, 786432) and a label vector of shape (1200,).
*   The data was split into training (960 samples) and testing (240 samples) sets with a 80/20 ratio, maintaining the class distribution through stratification.
*   Cross-validation on the training data with 5 folds yielded a mean accuracy of 0.9984 and a standard deviation of 0.0031, indicating high and consistent performance.
*   The SVM model, after training on the training data, achieved perfect precision, recall, f1-score, and accuracy (1.00) on the test set.
*   The confusion matrix confirmed the perfect classification on the test set, with all 80 samples of 'Integri' and all 80 samples of 'Rimossi' being correctly predicted.

### Insights or Next Steps

*   The current SVM model with a linear kernel performs exceptionally well on this specific dataset and class subset.
*   Explore alternative kernels (e.g., RBF) and hyperparameters for the SVM model to see if performance can be maintained or improved with potentially lower complexity or better generalization to unseen data.


In [None]:
import numpy as np
from tqdm import tqdm
from skimage.feature import hog, local_binary_pattern
from sklearn.preprocessing import StandardScaler
import joblib
import os

#Load the dataset
data_dir = "/content/drive/MyDrive/Colab Notebooks/Towercheck/ProcessedDataset_512"

# Define the class lists
two_classes = ['Integri', 'Abbattuti']
three_classes = ['Integri', 'Rimossi', 'Abbattuti']
selected_subfolders = ['_0', '_3x20_NoExtraTgt']#, '_5x25_NoExtraTgt'] # Updated subfolder names


# Create the datasets first, before normalization
two_class_dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=two_classes, transform=transforms.ToTensor(),  selected_subfolders=selected_subfolders)
three_class_dataset = ProcessedSARImageDataset(root_dir=data_dir, selected_classes=three_classes, transform=transforms.ToTensor(),  selected_subfolders=selected_subfolders)


# Choose the dataset (either two_class_dataset or three_class_dataset)
dataset_to_use = two_class_dataset  # Or three_class_dataset

# Define the model based on the number of classes in the chosen dataset
num_classes = len(dataset_to_use.classes)


# -------- HOG / LBP helpers --------
def compute_hog_per_channel(img_ch,
                            pixels_per_cell=(8,8),
                            cells_per_block=(2,2),
                            orientations=9,
                            block_norm="L2-Hys"):
    """
    img_ch: 2D numpy array (H, W) float in [0,1]
    returns: 1D hog feature vector for this channel
    """
    return hog(img_ch,
               orientations=orientations,
               pixels_per_cell=pixels_per_cell,
               cells_per_block=cells_per_block,
               block_norm=block_norm,
               visualize=False,
               feature_vector=True)

def compute_lbp_hist_per_channel(img_ch, P=8, R=1, method="uniform", normalize=True):
    """
    img_ch: 2D numpy array (H, W) float in [0,1]
    returns: normalized histogram of LBP codes for this channel
    """
    lbp = local_binary_pattern(img_ch, P=P, R=R, method=method)
    # 'uniform' bins = P+2
    n_bins = P + 2 if method == "uniform" else int(lbp.max() + 1)
    hist, _ = np.histogram(lbp.ravel(), bins=n_bins, range=(0, n_bins), density=False)
    if normalize:
        hist = hist.astype(np.float32)
        s = hist.sum()
        if s > 0:
            hist /= s
    return hist

def tensor_to_numpy(img_tensor):
    """
    img_tensor: torch.Tensor shaped (C, H, W), dtype float, range [0,1]
    returns: numpy array (C, H, W) float32
    """
    arr = img_tensor.detach().cpu().numpy().astype(np.float32)
    # If any NaNs/Infs slipped in, clean:
    arr = np.nan_to_num(arr, nan=0.0, posinf=1.0, neginf=0.0)
    return arr

def extract_features_from_dataset(dataset,
                                  hog_ppc=(8,8),
                                  hog_cpb=(2,2),
                                  hog_orient=9,
                                  lbp_P=8,
                                  lbp_R=1,
                                  lbp_method="uniform",
                                  standardize=True,
                                  save_path=None):
    """
    Iterates over the dataset (no DataLoader needed) and computes
    HOG and LBP features per channel; concatenates channels.
    Returns: X_hog, X_lbp, X_all, y, (scaler if standardize else None)
    """
    X_hog = []
    X_lbp = []
    y = []

    for i in tqdm(range(len(dataset)), desc="Extracting HOG/LBP"):
        img_t, label = dataset[i]          # img_t: torch.Tensor (C,H,W)
        arr = tensor_to_numpy(img_t)       # (C,H,W)
        C, H, W = arr.shape

        # Per-channel HOG + LBP, then concat
        hog_feats = []
        lbp_feats = []
        for c in range(C):
            ch = arr[c]
            hog_vec = compute_hog_per_channel(
                ch,
                pixels_per_cell=hog_ppc,
                cells_per_block=hog_cpb,
                orientations=hog_orient
            )
            lbp_hist = compute_lbp_hist_per_channel(
                ch,
                P=lbp_P,
                R=lbp_R,
                method=lbp_method,
                normalize=True
            )
            hog_feats.append(hog_vec)
            lbp_feats.append(lbp_hist)

        hog_feats = np.concatenate(hog_feats, axis=0)
        lbp_feats = np.concatenate(lbp_feats, axis=0)

        X_hog.append(hog_feats)
        X_lbp.append(lbp_feats)
        y.append(label)

    X_hog = np.vstack(X_hog).astype(np.float32)
    X_lbp = np.vstack(X_lbp).astype(np.float32)
    y = np.array(y, dtype=np.int64)
    X_all = np.hstack([X_hog, X_lbp]).astype(np.float32)

    scaler = None
    if standardize:
        scaler = StandardScaler(with_mean=False)  # sparse-friendly; HOG/LBP are nonnegative
        X_all = scaler.fit_transform(X_all)

    if save_path is not None:
        os.makedirs(os.path.dirname(save_path), exist_ok=True)
        np.savez_compressed(save_path, X_hog=X_hog, X_lbp=X_lbp, X_all=X_all, y=y)
        if scaler is not None:
            joblib.dump(scaler, save_path + ".scaler.joblib")
        print(f"Saved features to: {save_path} (+ scaler if used)")

    return X_hog, X_lbp, X_all, y, scaler

# -------- Run it on your chosen dataset_to_use --------
# Tip: if your dataset is large, you could also iterate via a DataLoader,
# but here we read directly to respect your custom __getitem__ logic.
X_hog, X_lbp, X_all, y, scaler = extract_features_from_dataset(
    dataset_to_use,
    hog_ppc=(16,16),
    hog_cpb=(2,2),
    hog_orient=9,
    lbp_P=48,
    lbp_R=1,
    lbp_method="uniform",
    standardize=True,
    save_path="/content/drive/MyDrive/Colab Notebooks/Towercheck/features/towercheck_hog_lbp.npz"
)

print("Shapes:")
print("X_hog:", X_hog.shape)
print("X_lbp:", X_lbp.shape)
print("X_all:", X_all.shape)
print("y:", y.shape)


In [None]:
# ====== Stratified K-Fold CV on saved HOG+LBP features ======
# Requirements: scikit-learn, numpy
# pip install scikit-learn numpy

import os
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold, cross_validate, cross_val_predict
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

# ---------- 1) Load saved features ----------
npz_path = "/content/drive/MyDrive/Colab Notebooks/Towercheck/features/towercheck_hog_lbp.npz"
if not os.path.exists(npz_path):
    raise FileNotFoundError(f"NPZ not found at: {npz_path}")

data = np.load(npz_path)
X_hog = data["X_hog"]
X_lbp = data["X_lbp"]
X_all = data["X_all"]
y     = data["y"]

print("Loaded shapes:")
print("  X_hog:", X_hog.shape)
print("  X_lbp:", X_lbp.shape)
print("  X_all:", X_all.shape)
print("  y:", y.shape)

# ---------- 2) Choose feature matrix ----------
# Options: X_all (HOG+LBP), X_hog (HOG only), X_lbp (LBP only)
X = X_lbp

# ---------- 3) Class names (auto for 2/3 classes; customize if needed) ----------
n_classes = len(np.unique(y))
if n_classes == 2:
    # Adjust if your label order differs
    class_names = ["Integri", "Abbattuti"]
elif n_classes == 3:
    class_names = ["Integri", "Rimossi", "Abbattuti"]
else:
    class_names = [f"class_{i}" for i in range(n_classes)]
print("Classes:", class_names)

# ---------- 4) Define CV and model ----------
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

rf = RandomForestClassifier(
    n_estimators=500,
    max_depth=None,          # set e.g. 60 if you see overfitting
    max_features="sqrt",
    class_weight="balanced", # helpful if classes are imbalanced
    n_jobs=-1,
    random_state=42
)

# ---------- 5) Cross-validate with multiple metrics ----------
scoring = {
    "acc": "accuracy",
    "f1_macro": "f1_macro",
    "f1_weighted": "f1_weighted",
}

cv_res = cross_validate(
    rf, X, y,
    cv=cv,
    scoring=scoring,
    n_jobs=-1,
    return_train_score=False
)

print("\nPer-fold metrics:")
print("  Accuracy   :", np.round(cv_res["test_acc"], 4))
print("  Macro-F1   :", np.round(cv_res["test_f1_macro"], 4))
print("  Weighted-F1:", np.round(cv_res["test_f1_weighted"], 4))

print("\nAveraged metrics (mean ± std):")
print(f"  Accuracy    : {cv_res['test_acc'].mean():.4f} ± {cv_res['test_acc'].std():.4f}")
print(f"  Macro-F1    : {cv_res['test_f1_macro'].mean():.4f} ± {cv_res['test_f1_macro'].std():.4f}")
print(f"  Weighted-F1 : {cv_res['test_f1_weighted'].mean():.4f} ± {cv_res['test_f1_weighted'].std():.4f}")

# ---------- 6) Out-of-fold predictions for a single overall report ----------
# Each sample is predicted by a model that did NOT train on it.
y_oof = cross_val_predict(rf, X, y, cv=cv, n_jobs=-1)

print("\n=== Out-of-fold performance (aggregated) ===")
print(f"OOF Accuracy: {accuracy_score(y, y_oof):.4f}")

try:
    print("\nClassification report (OOF):")
    print(classification_report(y, y_oof, target_names=class_names, digits=4))
except Exception:
    # Fallback in case class_names length/order doesn't match y's labels
    print("\nClassification report (OOF):")
    print(classification_report(y, y_oof, digits=4))

cm = confusion_matrix(y, y_oof)
print("Confusion matrix (OOF):\n", cm)

# ---------- 7) (Optional) Fit on full data and save model ----------
# from joblib import dump
# rf.fit(X, y)
# dump(rf, "/content/drive/MyDrive/Colab Notebooks/Towercheck/models/rf_cv_final.joblib")
# print("Saved final RF model.")


In [None]:
# === Visualize HOG & LBP: one example per class ===
# Works with your existing `dataset_to_use` (expects (C,H,W) tensors, 2 channels OK)

import numpy as np
import matplotlib.pyplot as plt
from skimage.feature import hog, local_binary_pattern

# ---- Params (tweak as you like) ----
CHANNEL = 0              # which channel to visualize
P, R = 48, 1              # LBP neighbors & radius
LBP_METHOD = "uniform"   # "uniform" keeps bins = P+2
ORIENT = 9               # HOG orientations
PPC = (16,16)            # HOG pixels per cell (you chose 16x16)
CPB = (2,2)              # HOG cells per block
BLOCK_NORM = "L2-Hys"
SEED = 0                 # for reproducible sampling

# ---- Pick one index per class (random but reproducible) ----
rng = np.random.default_rng()
labels_seen = {}
idxs = np.arange(len(dataset_to_use))
rng.shuffle(idxs)

class_names = getattr(dataset_to_use, "classes", None)
n_classes = len(class_names) if class_names is not None else None

for i in idxs:
    _, lab = dataset_to_use[i]
    lab = int(lab)
    if lab not in labels_seen:
        labels_seen[lab] = i
        if n_classes is not None and len(labels_seen) == n_classes:
            break

# If class names are missing, fall back to numeric strings
if class_names is None:
    class_names = [f"class_{k}" for k in sorted(labels_seen.keys())]

# ---- Plot: one row per class, 4 columns ----
rows = len(labels_seen)
cols = 4
fig, axes = plt.subplots(rows, cols, figsize=(4*cols, 4*rows))
if rows == 1:
    axes = axes[None, :]  # make 2D for consistent indexing

for r, lab in enumerate(sorted(labels_seen.keys())):
    idx = labels_seen[lab]
    img_t, _ = dataset_to_use[idx]     # torch tensor (C,H,W)
    arr = img_t.numpy()
    ch = arr[min(CHANNEL, arr.shape[0]-1)]

    # HOG (with visualization)
    hog_vec, hog_img = hog(
        ch,
        orientations=ORIENT,
        pixels_per_cell=PPC,
        cells_per_block=CPB,
        visualize=True,
        feature_vector=True,
        block_norm=BLOCK_NORM
    )

    # LBP map + histogram
    lbp = local_binary_pattern(ch, P=P, R=R, method=LBP_METHOD)
    n_bins = P + 2 if LBP_METHOD == "uniform" else int(lbp.max() + 1)
    hist, _ = np.histogram(lbp.ravel(), bins=n_bins, range=(0, n_bins), density=True)

    # ----- Plots for this class -----
    axes[r, 0].imshow(ch, cmap="gray")
    axes[r, 0].set_title(f"{class_names[lab]}  (idx={idx})")
    axes[r, 0].axis("off")

    axes[r, 1].imshow(hog_img, cmap="gray")
    axes[r, 1].set_title("HOG visualization")
    axes[r, 1].axis("off")

    im = axes[r, 2].imshow(lbp, cmap="magma")
    axes[r, 2].set_title(f"LBP code map (P={P}, R={R})")
    axes[r, 2].axis("off")

    axes[r, 3].bar(np.arange(n_bins), hist, width=0.9)
    axes[r, 3].set_title("LBP histogram")
    axes[r, 3].set_xlabel("LBP code")
    axes[r, 3].set_ylabel("frequency")

plt.tight_layout()
plt.show()


In [None]:
# === Total intensity distribution per class (histograms only, counts) ===
# Uses dataset_to_use (expects (C,H,W) tensors with >=2 channels)

import torch
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
from tqdm import tqdm

# ---- CONFIG ----
USE_SQRT   = True   # True: sum sqrt(ch1^2+ch2^2); False: sum (ch1^2+ch2^2)
LOG_XAXIS  = False  # True -> log-scaled x-axis with log-spaced bins
BINS_COUNT = 40
CHANNELS   = (0, 1) # which two channels to use

def collect_total_intensities(dataset, use_sqrt=True, ch_idx=(0,1)):
    vals = defaultdict(list)
    for i in tqdm(range(len(dataset)), desc="Total intensities"):
        img, lab = dataset[i]  # (C,H,W)
        ch1 = img[min(ch_idx[0], img.shape[0]-1)].float()
        ch2 = img[min(ch_idx[1], img.shape[0]-1)].float()
        mag = torch.sqrt(ch1**2 + ch2**2) if use_sqrt else (ch1**2 + ch2**2)
        vals[int(lab)].append(mag.sum().item())
    return {k: np.asarray(v, dtype=np.float64) for k, v in vals.items()}

# ---- Compute totals ----
totals = collect_total_intensities(dataset_to_use, use_sqrt=USE_SQRT, ch_idx=CHANNELS)

# ---- Class labels ----
class_names = getattr(dataset_to_use, "classes", None)
keys = sorted(totals.keys())
labels = [class_names[k] if (class_names and k < len(class_names)) else str(k) for k in keys]

# ---- Quick summary (optional) ----
print("Counts per class:", {labels[i]: totals[k].size for i,k in enumerate(keys)})
for lab, k in zip(labels, keys):
    arr = totals[k]
    print(f"{lab:>12s}: mean={arr.mean():.6f}, std={arr.std(ddof=1) if arr.size>1 else 0.0:.6f}, "
          f"min={arr.min():.6f}, max={arr.max():.6f}")

# ---- Plot: Overlaid histograms (counts) ----
plt.figure(figsize=(7,4))
all_vals = np.concatenate([totals[k] for k in keys])
vmin, vmax = float(all_vals.min()), float(all_vals.max())
if vmax <= vmin:  # guard against degenerate ranges
    vmax = vmin + 1e-9

if LOG_XAXIS:
    vmin = max(vmin, 1e-12)
    bins = np.logspace(np.log10(vmin), np.log10(vmax), BINS_COUNT)
    for k, lab in zip(keys, labels):
        plt.hist(totals[k], bins=bins, density=False, alpha=0.5, label=lab)
    plt.xscale("log")
    plt.xlabel("Total intensity (log scale)")
else:
    bins = np.linspace(vmin, vmax, BINS_COUNT)
    for k, lab in zip(keys, labels):
        plt.hist(totals[k], bins=bins, density=False, alpha=0.5, label=lab)
    plt.xlabel("Total intensity")

plt.ylabel("Count")
#plt.title("Per-class total intensity — histograms")
plt.legend()
plt.tight_layout()
plt.show()


# Task
Plot the confusion matrix for the test set, and then plot four additional confusion matrices, each considering only the images contained in the following subfolders: '_0', '_3x20_NoExtraTgt', '_5x25_NoExtraTgt', '_5x25_WithExtraTgt_Cls'.

## Identify test set images by subfolder

### Subtask:
Iterate through the `test_dataset` to identify which images belong to each subfolder. Store the indices and labels for images from each subfolder.


**Reasoning**:
Iterate through the test dataset to identify which images belong to each subfolder and store their indices and labels.



In [None]:
# Create dictionaries to store indices and labels per subfolder
subfolder_indices = {subfolder: [] for subfolder in selected_subfolders + ['_5x25_WithExtraTgt_Cls']}
subfolder_labels = {subfolder: [] for subfolder in selected_subfolders + ['_5x25_WithExtraTgt_Cls']}

# Iterate through the test_dataset using its indices
for i in test_dataset.indices:
    # Get the image path from the original dataset
    image_path = dataset_to_use.image_paths[i]
    true_label = dataset_to_use.labels[i]

    # Determine the subfolder name from the image path
    found_subfolder = None
    # Include the additional subfolder for evaluation
    all_subfolders_to_check = selected_subfolders + ['_5x25_WithExtraTgt_Cls']
    for subfolder in all_subfolders_to_check:
        # Check if the subfolder name is present in the image path
        if subfolder in image_path:
            found_subfolder = subfolder
            break

    # If a subfolder is identified, store the index and label
    if found_subfolder:
        subfolder_indices[found_subfolder].append(i)
        subfolder_labels[found_subfolder].append(true_label)
    else:
        # Handle cases where an image might not belong to any of the specified subfolders
        print(f"Warning: Image path {image_path} does not belong to any of the specified subfolders.")


# Print the number of images found for each subfolder
print("Number of images per subfolder in the test set:")
for subfolder, indices in subfolder_indices.items():
    print(f"  {subfolder}: {len(indices)}")