# Importing libraries 

In [1]:
import os
import json
import numpy as np
import pandas as pd
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader,Subset, random_split
from torchvision import transforms
import matplotlib.pyplot as plt
from tqdm import tqdm
import torchvision.models as models
from sklearn.metrics.pairwise import cosine_distances
from sklearn.metrics import f1_score, precision_score, recall_score,classification_report
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Utility Functions

In [2]:
# Function to get number of classes from label_mapping.json
def get_num_classes(label_mapping_path):
    with open(label_mapping_path, 'r') as f:
        label_mapping = json.load(f)
    return len(set(label_mapping.values()))

# Function to extract file_index and labels from folder name
def parse_folder_name(folder_name):
    parts = folder_name.split('_pattern_')
    if len(parts) != 2:
        return None, None
    file_index = parts[0]
    pattern_indices = parts[1].split('_')
    return file_index, [int(idx) for idx in pattern_indices]

# Function to create one-hot encoded labels
def create_one_hot_labels(pattern_indices, num_classes):
    one_hot = np.zeros(num_classes, dtype=np.float32)
    for idx in pattern_indices:
        one_hot[idx - 1] = 1  # 1-based to 0-based
    return one_hot

# Function to get unique labels
def get_unique_labels(data_dir):
    unique_labels = set()
    for folder_name in os.listdir(data_dir):
        folder_path = os.path.join(data_dir, folder_name)
        if not os.path.isdir(folder_path):
            continue
        parts = folder_name.split('_pattern_')
        if len(parts) != 2:
            continue
        labels = [int(label) for label in parts[1].split('_')]
        unique_labels.update(labels)
    return unique_labels

# Function to extract features
def extract_features(loader, model, device):
    model.eval()
    features_list = []
    with torch.no_grad():
        for images, _, folder_names in loader:  # Ignore the placeholder
            images = images.to(device)
            features = model(images, return_features=True)  # (batch_size, 512)
            features_list.append(features.cpu().numpy())
    return np.concatenate(features_list, axis=0)  # (num_samples, 512)

# Dataset

In [3]:
# Custom Dataset class
class MultiViewDataset(Dataset):
    def __init__(self, data_dirs, view_names, num_classes, img_size=(512, 512), transform=None, is_query=False, is_retrieval=False, white_views=None):
        self.data_dirs = data_dirs
        self.view_names = view_names
        self.num_classes = num_classes
        self.img_size = img_size
        self.transform = transform
        self.is_query = is_query
        self.is_retrieval = is_retrieval
        self.white_views = white_views if white_views is not None else {}
        self.samples = []

        for data_dir in data_dirs:
            for folder_name in os.listdir(data_dir):
                folder_path = os.path.join(data_dir, folder_name)
                if not os.path.isdir(folder_path):
                    continue
                file_index, pattern_indices = parse_folder_name(folder_name)
                all_views_exist = all(os.path.exists(os.path.join(folder_path, f"{view}.png")) for view in view_names)
                if not all_views_exist:
                    continue
                if self.is_retrieval or pattern_indices is None:
                    self.samples.append(folder_path)  # Just the path for retrieval
                else:
                    self.samples.append((folder_path, pattern_indices))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        if self.is_retrieval:
            folder_path = self.samples[idx]
            pattern_indices = None
        else:
            sample = self.samples[idx]
            if isinstance(sample, str):
                folder_path = sample
                pattern_indices = None
            else:
                folder_path, pattern_indices = sample
        
        folder_name = os.path.basename(folder_path)
        images = []
        for view in self.view_names:
            img_path = os.path.join(folder_path, f"{view}.png")
            img = Image.open(img_path).convert('RGB')
            if self.transform:
                img = self.transform(img)
            images.append(img)
        images = torch.stack(images)  # (num_views, 3, H, W)

        if self.is_retrieval:
            return images, torch.tensor([]), folder_name  
        elif self.is_query:
            label = create_one_hot_labels(pattern_indices, 19) if pattern_indices else np.zeros(19)
            return images, torch.tensor(label, dtype=torch.float32), folder_name
        else:
            label = create_one_hot_labels(pattern_indices, self.num_classes) if pattern_indices else np.zeros(self.num_classes)
            return images, torch.tensor(label, dtype=torch.float32), folder_name
        
# Wrapper dataset to filter labels to only the last 4 classes (indices 15-18)
class FilteredLabelDataset(Dataset):
    def __init__(self, dataset, original_num_classes, target_num_classes, start_class_idx):
        self.dataset = dataset
        self.original_num_classes = original_num_classes  # 19 classes
        self.target_num_classes = target_num_classes  # 4 classes
        self.start_class_idx = start_class_idx  # Start index for filtering (15)

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        images, label, folder_name = self.dataset[idx]
        # Extract the last 4 classes (indices 15-18) from the 19-class one-hot vector
        filtered_label = label[self.start_class_idx:self.start_class_idx + self.target_num_classes]
        return images, filtered_label, folder_name        


# Loading the data 

In [4]:
# Paths
train_dir = "E:/Ahmed/IMT Nord Europe/3D/Training_Screenshots/train"
val_dir = "E:/Ahmed/IMT Nord Europe/3D/Training_Screenshots/val"
query_dir = "E:/Ahmed/IMT Nord Europe/3D/Query_Screenshots"
retrieval_dir = "E:/Ahmed/IMT Nord Europe/3D/Retrieval_Screenshots"
label_mapping_path = "label_mapping_2.json"

# Get the number of classes
num_training_query_classes = get_num_classes(label_mapping_path)
train_labels = get_unique_labels(train_dir)
num_classes = len(train_labels)
print(f"Number of classes (original training): {num_classes}")

# Number of classes in query dataset
num_classes_query = 19  # Updated to 19 classes
num_classes_finetune = 4  # Last 4 classes for fine-tuning (indices 15-18)

# Define view names
view_names = ["top", "bottom", "lateral_1", "lateral_2", "lateral_3", "lateral_4"]

# Image size
img_size = (512, 512)

# Define transform
transform = transforms.Compose([
    transforms.Resize(img_size),
    transforms.ToTensor(),
])

# Preprocessing datasets
print("Preprocessing training data...")
train_dataset = MultiViewDataset([train_dir], view_names, num_classes, img_size, transform)

print("Preprocessing validation data...")
val_dataset = MultiViewDataset([val_dir], view_names, num_classes, img_size, transform)

print("Preprocessing query data...")
query_dataset = MultiViewDataset([query_dir], view_names, num_classes_query, img_size, transform, is_query=True)

print("Preprocessing retrieval data...")
retrieval_dataset = MultiViewDataset([retrieval_dir], view_names, num_classes, img_size, transform, is_retrieval=True)

# Extract labels from query dataset and identify relevant samples
query_labels = []
relevant_indices = []  # Indices of samples with at least one of the last 4 classes active
for idx in range(len(query_dataset)):
    _, label, _ = query_dataset[idx]  # label is torch.tensor of dtype torch.float32 (one-hot encoded)
    active_classes = label.nonzero(as_tuple=True)[0].numpy()  # Indices where label is 1
    query_labels.append(active_classes)

    # Check if the sample has at least one of the last 4 classes (indices 15-18) active
    last_four = label[-4:].numpy()  # Last 4 indices (15-18)
    if last_four.sum() > 0:  # At least one of the last 4 classes is active
        relevant_indices.append(idx)

print(f"Total query samples: {len(query_dataset)}")
print(f"Relevant query samples (with at least one of classes 15-18 active): {len(relevant_indices)}")

# Perform stratified split on relevant samples
def stratified_split_relevant_samples(dataset, labels, relevant_indices, test_size=0.2, random_state=42):
    if not relevant_indices:
        print("No relevant samples found with classes 15-18 active. Cannot perform split.")
        return [], []

    # Extract labels for relevant samples
    relevant_labels = [labels[idx] for idx in relevant_indices]

    # For stratification, use the active classes within the last 4 (15-18) as the primary label
    primary_labels = []
    for label in relevant_labels:
        # Get active classes in the range 15-18
        active_last_four = [cls for cls in label if cls >= 15 and cls <= 18]
        if active_last_four:
            primary_labels.append(active_last_four[0])  # Use the first active class in 15-18
        else:
            primary_labels.append(-1)  # Shouldn't happen due to relevant_indices filtering

    # Perform stratified split on relevant indices
    train_indices_rel, val_indices_rel = train_test_split(
        relevant_indices,
        test_size=test_size,
        stratify=primary_labels,
        random_state=random_state
    )

    return train_indices_rel, val_indices_rel

# Perform the stratified split on relevant samples
train_indices, val_indices = stratified_split_relevant_samples(
    query_dataset, query_labels, relevant_indices, test_size=0.2, random_state=42
)


# Create subsets for fine-tuning
query_train_dataset = Subset(query_dataset, train_indices)
query_val_dataset = Subset(query_dataset, val_indices)

# Create filtered datasets for fine-tuning
query_train_dataset_filtered = FilteredLabelDataset(
    query_train_dataset,
    original_num_classes=num_classes_query,
    target_num_classes=num_classes_finetune,
    start_class_idx=15  # Start at index 15 (for classes 15-18)
)

query_val_dataset_filtered = FilteredLabelDataset(
    query_val_dataset,
    original_num_classes=num_classes_query,
    target_num_classes=num_classes_finetune,
    start_class_idx=15
)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=0)
query_loader = DataLoader(query_dataset, batch_size=16, shuffle=False, num_workers=0)
retrieval_loader = DataLoader(retrieval_dataset, batch_size=16, shuffle=False, num_workers=0)

# New data loaders for fine-tuning with filtered labels
query_train_loader = DataLoader(query_train_dataset_filtered, batch_size=16, shuffle=True, num_workers=0)
query_val_loader = DataLoader(query_val_dataset_filtered, batch_size=16, shuffle=False, num_workers=0)

# Verify class distribution in training and validation sets
def print_class_distribution(dataset, name, num_classes=19):
    class_counts = np.zeros(num_classes)
    for idx in range(len(dataset)):
        _, label, _ = dataset[idx]  # label is torch.tensor of dtype torch.float32
        active_classes = label.nonzero(as_tuple=True)[0].numpy()
        for cls in active_classes:
            class_counts[cls] += 1
    print(f"\nClass distribution in {name}:")
    for cls, count in enumerate(class_counts):
        print(f"Class {cls}: {int(count)} samples")

print_class_distribution(query_dataset, "Query Dataset", num_classes_query)
print_class_distribution(query_train_dataset, "Query Training Dataset (fine-tuning)", num_classes_query)
print_class_distribution(query_val_dataset, "Query Validation Dataset (fine-tuning)", num_classes_query)

# Print dataset sizes
print(f"\nTraining samples (original): {len(train_dataset)}")
print(f"Validation samples (original): {len(val_dataset)}")
print(f"Query samples: {len(query_dataset)}")
print(f"Query training samples (for fine-tuning): {len(query_train_dataset)}")
print(f"Query validation samples (for fine-tuning): {len(query_val_dataset)}")

# pretrained model

In [6]:
import torch
import torch.nn as nn
from torchvision import models
from torchsummary import summary

# Load pretrained VGG19
vgg19 = models.vgg19(pretrained=True)

# Unfreeze the last few layers of VGG19 for fine-tuning
for param in vgg19.parameters():
    param.requires_grad = False
for param in vgg19.features[-3:].parameters():
    param.requires_grad = True

# Remove the default classifier
vgg19.classifier = nn.Identity()

class MultiView(nn.Module):
    def __init__(self, num_views, num_classes, model, svms=None, scaler=None, device=None):
        super(MultiView, self).__init__()
        self.num_views = num_views
        self.model = model
        self.svms = svms
        self.scaler = scaler
        self.feature_dim = self.model.features(torch.randn(1, 3, 512, 512)).shape[1]
        self.device = device
        self.classifier = nn.Sequential(
            nn.Linear(num_views * self.feature_dim, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes),
            nn.Sigmoid()
        )

    def forward(self, views, return_features=False, svm=False):
        batch_size = views.size(0)
        view_features = []
        for i in range(self.num_views):
            view = views[:, i]
            feat = self.model.features(view)
            feat = nn.AdaptiveAvgPool2d(1)(feat)
            feat = feat.view(batch_size, -1)
            view_features.append(feat)
        concatenated_features = torch.cat(view_features, dim=1)
        if return_features:
            features_512 = self.classifier[0](concatenated_features)
            features_512 = self.classifier[1](features_512)
            return features_512
        if svm:
            # SVM path (not used in summary)
            pass
        output = self.classifier(concatenated_features)
        return output

# Instantiate the model
num_views = 6  # Example: 6 views
num_classes = 10  # Example: 10 classes
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MultiView(num_views=num_views, num_classes=num_classes, model=vgg19).to(device)

# Print model summary
# Input shape: (channels, height, width) per view, but we need to account for num_views
# torchsummary expects input shape as (channels, height, width), so we simulate one view
print("Model Summary for one view (repeated internally for num_views):")
summary(model.model.features, input_size=(3, 512, 512))  # Summarize VGG19 features
print("\nCustom Classifier Summary:")
summary(model.classifier, input_size=(num_views * 512,))  # Summarize classifier

# training the main model

In [None]:
# Integrated Training and Evaluation Function
def train_and_evaluate_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=15, device='cuda', threshold=0.5, model_path_loss="best_model_loss.pth", model_path_f1="best_model_f1.pth"):
    best_val_loss = float('inf')
    best_val_f1 = 0.0
    train_losses, val_losses = [], []
    train_f1_scores, val_f1_scores = [], []
    train_accuracies, val_accuracies = [], []

    for epoch in range(num_epochs):
        # Training Phase
        model.train()
        train_loss = 0.0
        train_preds = []
        train_labels = []
        train_outputs = []

        for batch_idx, (images, labels, _) in enumerate(tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs} - Train")):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)  # Logits or probabilities depending on model
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * images.size(0)

            # Collect outputs and labels
            batch_outputs = outputs.detach().cpu().numpy()
            batch_labels = labels.detach().cpu().numpy()
            train_outputs.append(batch_outputs)
            train_labels.append(batch_labels)

            # Fixed threshold predictions
            preds = (outputs > threshold).float()
            train_preds.append(preds.detach().cpu().numpy())

        # Compute Training Metrics
        train_loss /= len(train_loader.dataset)
        train_outputs = np.concatenate(train_outputs)
        train_labels = np.concatenate(train_labels)
        train_preds = np.concatenate(train_preds)

        train_f1 = f1_score(train_labels, train_preds, average='micro', zero_division=0)
        train_accuracy = np.mean(np.all(train_preds == train_labels, axis=1))  # Strict accuracy

        # Store training metrics
        train_losses.append(train_loss)
        train_f1_scores.append(train_f1)
        train_accuracies.append(train_accuracy)

        # Validation Phase
        model.eval()
        val_loss = 0.0
        val_preds = []
        val_labels = []
        val_outputs = []

        with torch.no_grad():
            for batch_idx, (images, labels, _) in enumerate(tqdm(val_loader, desc=f"Epoch {epoch + 1}/{num_epochs} - Val")):
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)  
                loss = criterion(outputs, labels)
                val_loss += loss.item() * images.size(0)

                batch_outputs = outputs.detach().cpu().numpy()
                batch_labels = labels.detach().cpu().numpy()
                val_outputs.append(batch_outputs)
                val_labels.append(batch_labels)

                # Fixed threshold predictions
                preds = (outputs > threshold).float()
                val_preds.append(preds.detach().cpu().numpy())

                # Print first few batches
                if batch_idx < 3:
                    print(f"\nVal Batch {batch_idx + 1}")
                    print("Raw Outputs (first sample):", batch_outputs[0])
                    print("Predicted Labels (first sample):", preds[0].cpu().numpy())
                    print("True Labels (first sample):", batch_labels[0])

        # Compute Validation Metrics
        val_loss /= len(val_loader.dataset)
        val_outputs = np.concatenate(val_outputs)
        val_labels = np.concatenate(val_labels)
        val_preds = np.concatenate(val_preds)

        val_output_mean, val_output_std = np.mean(val_outputs), np.std(val_outputs)
        val_f1 = f1_score(val_labels, val_preds, average='micro', zero_division=0)
        val_precision = precision_score(val_labels, val_preds, average='micro', zero_division=0)
        val_recall = recall_score(val_labels, val_preds, average='micro', zero_division=0)
        val_accuracy = np.mean(np.all(val_preds == val_labels, axis=1))  # Strict accuracy

        # Store validation metrics
        val_losses.append(val_loss)
        val_f1_scores.append(val_f1)
        val_accuracies.append(val_accuracy)

        # Scheduler step
        scheduler.step()

        # Print Epoch Results
        print(f"\nEpoch {epoch + 1}/{num_epochs}")
        print(f"Train - Loss: {train_loss:.4f}, F1: {train_f1:.4f}, Accuracy: {train_accuracy:.4f}")
        print(f"Val - Loss: {val_loss:.4f}, F1: {val_f1:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, Accuracy: {val_accuracy:.4f}")
        print(f"Val Output Distribution - Mean: {val_output_mean:.4f}, Std: {val_output_std:.4f}")

        # Save Best Model Based on Validation Loss
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), model_path_loss)
            print(f"Best model (loss) saved with Val Loss: {best_val_loss:.4f}")

        # Save Best Model Based on Validation F1 Score
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            torch.save(model.state_dict(), model_path_f1)
            print(f"Best model (F1) saved with Val F1: {best_val_f1:.4f}")

    # Return all metrics for potential analysis
    return {
        'train_losses': train_losses, 'val_losses': val_losses,
        'train_f1_scores': train_f1_scores, 'val_f1_scores': val_f1_scores,
        'train_accuracies': train_accuracies, 'val_accuracies': val_accuracies
    }

# Setup and Run
view_names = ["top", "bottom", "lateral_1", "lateral_2", "lateral_3", "lateral_4"]
num_views = len(view_names)  
num_classes = 15
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model_vgg19_15 = MultiView(
    num_views=num_views,
    num_classes=num_classes,
    model=vgg19  
).to(device)

criterion = nn.BCELoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model_vgg19_15.parameters()), lr=0.001)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50, eta_min=1e-5)

# Train and Evaluate
metrics = train_and_evaluate_model(
    model=model_vgg19_15,
    train_loader=train_loader, 
    val_loader=val_loader,      
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    num_epochs=50,
    device=device,
    threshold=0.5,
    model_path_loss="best_model_loss.pth",
    model_path_f1="best_model_f1.pth"
)

# training 4-class model


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import f1_score, precision_score, recall_score
import numpy as np
from tqdm import tqdm

# Integrated Training and Evaluation Function
def fine_tune_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=15, device='cuda', threshold=0.5, model_path_loss="best_model_loss.pth", model_path_f1="best_model_f1.pth"):
    best_val_loss = float('inf')
    best_val_f1 = 0.0
    train_losses, val_losses = [], []
    train_f1_scores, val_f1_scores = [], []
    train_accuracies, val_accuracies = [], []

    for epoch in range(num_epochs):
        # Training Phase
        model.train()
        train_loss = 0.0
        train_preds = []
        train_labels = []
        train_outputs = []

        for batch_idx, (images, labels, _) in enumerate(tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs} - Train")):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)  # Logits or probabilities depending on model
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * images.size(0)

            # Collect outputs and labels
            batch_outputs = outputs.detach().cpu().numpy()
            batch_labels = labels.detach().cpu().numpy()
            train_outputs.append(batch_outputs)
            train_labels.append(batch_labels)

            # Fixed threshold predictions
            preds = (outputs > threshold).float()
            train_preds.append(preds.detach().cpu().numpy())

        # Compute Training Metrics
        train_loss /= len(train_loader.dataset)
        train_outputs = np.concatenate(train_outputs)
        train_labels = np.concatenate(train_labels)
        train_preds = np.concatenate(train_preds)

        train_f1 = f1_score(train_labels, train_preds, average='micro', zero_division=0)
        train_accuracy = np.mean(np.all(train_preds == train_labels, axis=1))  # Strict accuracy

        # Store training metrics
        train_losses.append(train_loss)
        train_f1_scores.append(train_f1)
        train_accuracies.append(train_accuracy)

        # Validation Phase
        model.eval()
        val_loss = 0.0
        val_preds = []
        val_labels = []
        val_outputs = []

        with torch.no_grad():
            for batch_idx, (images, labels, _) in enumerate(tqdm(val_loader, desc=f"Epoch {epoch + 1}/{num_epochs} - Val")):
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)  
                loss = criterion(outputs, labels)
                val_loss += loss.item() * images.size(0)

                batch_outputs = outputs.detach().cpu().numpy()
                batch_labels = labels.detach().cpu().numpy()
                val_outputs.append(batch_outputs)
                val_labels.append(batch_labels)

                # Fixed threshold predictions
                preds = (outputs > threshold).float()
                val_preds.append(preds.detach().cpu().numpy())

                # Print first few batches
                if batch_idx < 3:
                    print(f"\nVal Batch {batch_idx + 1}")
                    print("Raw Outputs (first sample):", batch_outputs[0])
                    print("Predicted Labels (first sample):", preds[0].cpu().numpy())
                    print("True Labels (first sample):", batch_labels[0])

        # Compute Validation Metrics
        val_loss /= len(val_loader.dataset)
        val_outputs = np.concatenate(val_outputs)
        val_labels = np.concatenate(val_labels)
        val_preds = np.concatenate(val_preds)

        val_output_mean, val_output_std = np.mean(val_outputs), np.std(val_outputs)
        val_f1 = f1_score(val_labels, val_preds, average='micro', zero_division=0)
        val_precision = precision_score(val_labels, val_preds, average='micro', zero_division=0)
        val_recall = recall_score(val_labels, val_preds, average='micro', zero_division=0)
        val_accuracy = np.mean(np.all(val_preds == val_labels, axis=1))  # Strict accuracy

        # Store validation metrics
        val_losses.append(val_loss)
        val_f1_scores.append(val_f1)
        val_accuracies.append(val_accuracy)

        # Scheduler step
        scheduler.step()

        # Print Epoch Results
        print(f"\nEpoch {epoch + 1}/{num_epochs}")
        print(f"Train - Loss: {train_loss:.4f}, F1: {train_f1:.4f}, Accuracy: {train_accuracy:.4f}")
        print(f"Val - Loss: {val_loss:.4f}, F1: {val_f1:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, Accuracy: {val_accuracy:.4f}")
        print(f"Val Output Distribution - Mean: {val_output_mean:.4f}, Std: {val_output_std:.4f}")

        # Save Best Model Based on Validation Loss
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), model_path_loss)
            print(f"Best model (loss) saved with Val Loss: {best_val_loss:.4f}")

        # Save Best Model Based on Validation F1 Score
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            torch.save(model.state_dict(), model_path_f1)
            print(f"Best model (F1) saved with Val F1: {best_val_f1:.4f}")

    # Return all metrics for potential analysis
    return {
        'train_losses': train_losses, 'val_losses': val_losses,
        'train_f1_scores': train_f1_scores, 'val_f1_scores': val_f1_scores,
        'train_accuracies': train_accuracies, 'val_accuracies': val_accuracies
    }

# Setup and Run
view_names = ["top", "bottom", "lateral_1", "lateral_2", "lateral_3", "lateral_4"]
num_views = len(view_names)  

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_fine_tuned = MultiView(
    num_views=num_views,
    num_classes=4,
    model=vgg19,
    device=device
).to(device)

# Load pretrained weights
state_dict = torch.load("best_model_f1_VGG.pth")

# Keep only the weights for classifier[0] (first layer producing 512 channels)
# Remove weights for other classifier layers
keys_to_remove = [key for key in state_dict.keys() if key.startswith('classifier') and key not in ['classifier.0.weight', 'classifier.0.bias']]
for key in keys_to_remove:
    state_dict.pop(key)

# Load the modified state dictionary into the model
# Use strict=False since we're intentionally missing some classifier weights
model_fine_tuned.load_state_dict(state_dict, strict=False)

# Freeze the backbone (VGG19 features)
for param in model_fine_tuned.model.parameters():
    param.requires_grad = False

# Reinitialize the classifier, preserving classifier[0] weights
num_classes_finetune = 4
in_features = model_fine_tuned.num_views * model_fine_tuned.feature_dim  # Should be 6 * 512 = 3072

# Save the pre-trained weights of classifier[0]
classifier_0_weight = model_fine_tuned.classifier[0].weight.clone()
classifier_0_bias = model_fine_tuned.classifier[0].bias.clone()

# Create a new classifier with the same architecture
model_fine_tuned.classifier = nn.Sequential(
    nn.Linear(in_features, 512),  # classifier[0]: 3072 -> 512
    nn.ReLU(),                    # classifier[1]
    nn.Dropout(0.5),              # classifier[2]
    nn.Linear(512, 128),          # classifier[3]: 512 -> 128
    nn.ReLU(),                    # classifier[4]
    nn.Dropout(0.5),              # classifier[5]
    nn.Linear(128, num_classes_finetune),  # classifier[6]: 128 -> 4
    nn.Sigmoid()                  # classifier[7]
).to(device)

# Restore the pre-trained weights for classifier[0]
model_fine_tuned.classifier[0].weight.data = classifier_0_weight
model_fine_tuned.classifier[0].bias.data = classifier_0_bias

# Initialize the remaining linear layers with random weights
# classifier[3] (Linear: 512 -> 128)
nn.init.kaiming_normal_(model_fine_tuned.classifier[3].weight, mode='fan_out', nonlinearity='relu')
nn.init.zeros_(model_fine_tuned.classifier[3].bias)

# classifier[6] (Linear: 128 -> 4)
nn.init.kaiming_normal_(model_fine_tuned.classifier[6].weight, mode='fan_out', nonlinearity='relu')
nn.init.zeros_(model_fine_tuned.classifier[6].bias)

# Ensure the entire classifier is trainable
for param in model_fine_tuned.classifier.parameters():
    param.requires_grad = True

# Verify which parameters are trainable
print("Trainable parameters after setup:")
for name, param in model_fine_tuned.named_parameters():
    if param.requires_grad:
        print(name)

# Optimizer and training
criterion = nn.BCELoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model_fine_tuned.parameters()), lr=0.001)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50, eta_min=1e-4)

metrics = fine_tune_model(
    model=model_fine_tuned,
    train_loader=query_train_loader,
    val_loader=query_val_loader,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    num_epochs=50,
    device=device,
    threshold=0.5,
    model_path_loss="finetuned_model_loss_VGG_4_class_classifier_unfrozen_first_layer_loaded.pth",
    model_path_f1="finetuned_model_f1_VGG_4_class_classifier_unfrozen_first_layer_loaded.pth"
)

# Optionally, load the best fine-tuned models
print("\nLoading best fine-tuned model based on validation loss...")
model_fine_tuned.load_state_dict(torch.load("finetuned_model_loss_VGG_4_class_classifier_unfrozen.pth"))
model_fine_tuned.eval()

print("\nLoading best fine-tuned model based on F1 score...")
model_fine_tuned.load_state_dict(torch.load("finetuned_model_f1_VGG_4_class_classifier_unfrozen.pth"))
model_fine_tuned.eval()

# model loading

In [None]:
num_views = len(view_names)  
num_classes = 4  
num_classes_finetune = 4

# Initialize the model with the new naming convention (self.model)
# model = MultiView(
#     num_views=num_views,
#     num_classes=num_classes,
#     model=vgg19
# ).to(device)

# # Load the checkpoint and modify the state dictionary keys
# checkpoint_path = "finetuned_model_f1_VGG_4_class.pth"
# state_dict = torch.load(checkpoint_path)

# # Create a new state dictionary with updated keys
# new_state_dict = {}
# # for key, value in state_dict.items():
# #     # Replace 'mobilenet' with 'model' in the key names
# #     new_key = key.replace('mobilenet', 'model')
# #     new_state_dict[new_key] = value

# # Load the modified state dictionary into the model
# model.load_state_dict(state_dict)

# Set the model to evaluation mode
#model.eval()

model_4class = MultiView(
    num_views=num_views,
    num_classes=num_classes_finetune,
    model=vgg19
).to(device)

model_4class.load_state_dict(torch.load("finetuned_model_f1_VGG_4_class_classifier_unfrozen.pth"))

# evaluation

In [None]:
def evaluate_model(model, val_loader, criterion, device):
    model.eval()
    val_loss = 0.0
    val_preds = []
    val_labels = []
    val_outputs = []

    with torch.no_grad():
        for batch_idx, (images, labels, _) in enumerate(tqdm(val_loader)):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)  
            loss = criterion(outputs, labels)
            val_loss += loss.item() * images.size(0)
            batch_outputs = outputs.detach().cpu().numpy()
            batch_labels = labels.detach().cpu().numpy()
            val_outputs.append(batch_outputs)
            val_labels.append(batch_labels)
            # Fixed threshold predictions
            preds = (outputs > 0.5).float()
            val_preds.append(preds.detach().cpu().numpy())

    # Compute validation metrics
    val_loss /= len(val_loader.dataset)
    val_outputs = np.concatenate(val_outputs)
    val_labels = np.concatenate(val_labels)
    val_preds = np.concatenate(val_preds)

    # Analyze validation output distribution
    val_output_mean, val_output_std = np.mean(val_outputs), np.std(val_outputs)
    print(f"Validation Output Distribution - Mean: {val_output_mean:.4f}, Std: {val_output_std:.4f}")

    # Compute per-class metrics using classification_report
    class_names = [str(i) for i in range(val_labels.shape[1])]  # Class names as 0 to 14
    report = classification_report(
        val_labels,
        val_preds,
        target_names=class_names,
        zero_division=0,
        output_dict=False  # Set to True if you want a dictionary instead
    )
    print("\nPer-class Classification Report:")
    print(report)

    # Compute overall metrics for adaptive thresholds
    val_f1 = f1_score(val_labels, val_preds, average='micro', zero_division=0)
    val_precision = precision_score(val_labels, val_preds, average='micro', zero_division=0)
    val_recall = recall_score(val_labels, val_preds, average='micro', zero_division=0)
    val_accuracy = np.mean(np.all(val_preds == val_labels, axis=1))

    # Print overall metrics
    print("\nEvaluation Results (Classifier)")
    print(f"Val Loss: {val_loss:.4f}, Val F1: {val_f1:.4f}, Val Precision: {val_precision:.4f}, "
          f"Val Recall: {val_recall:.4f}, Val Accuracy: {val_accuracy:.4f}")

    return val_loss, val_f1, val_accuracy



# Optionally, load the best models after training (choose one or both based on your needs)
# print("\nLoading best model based on validation loss...")
# model_vgg19_15.load_state_dict(torch.load("best_model_loss.pth"))
# model_vgg19_15.eval()

# print("\nLoading best model based on F1 score...")
# model_vgg19_15.load_state_dict(torch.load("best_model_f1_VGG.pth"))
# model_vgg19_15.eval()

# Evaluate the classifier
val_loss_classifier, val_f1_classifier, val_accuracy_classifier = evaluate_model(
    model=model_4class,
    val_loader=query_val_loader,
    criterion=criterion,
    device=device
)

# SVM (not used)

In [None]:
# Extract features for training and validation sets
train_features, train_labels = extract_features(train_loader, model_classifier, num_views, device)
val_features, val_labels = extract_features(val_loader, model_classifier, num_views, device)

# Standardize the features (important for SVM)
scaler = StandardScaler()
train_features = scaler.fit_transform(train_features)
val_features = scaler.transform(val_features)

# Train an SVM with RBF kernel for each class (one-vs-rest for multi-label)
svms = []
for class_idx in range(num_classes):
    print(f"Training SVM for class {class_idx + 1}/{num_classes}...")
    svm = SVC(kernel='rbf', probability=True, C=1.0, gamma='scale')
    svm.fit(train_features, train_labels[:, class_idx])
    svms.append(svm)

# Instantiate a new MultiViewModel for SVM-based classification
model_svm = MultiView(
    num_views=num_views,
    num_classes=num_classes,
    model=mobilenet_v3,
    svms=svms,
    scaler=scaler,
    device=device
).to(device)

In [None]:
val_loss_svm, val_f1_svm, val_accuracy_svm = evaluate_model(
    model=model_svm,
    val_loader=val_loader,
    criterion=criterion,
    device=device,
    svm = True,
    k=0.5
)

# CNN model

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from sklearn.metrics import f1_score
import numpy as np

# CNN Feature Extractor (unchanged)
class CNNFeatureExtractor(nn.Module):
    def __init__(self):
        super(CNNFeatureExtractor, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )
        self.fc = nn.Linear(256 * 32 * 32, 512)  # For 512x512 input

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

# Multi-View CNN Classifier (unchanged)
class MultiViewCNN(nn.Module):
    def __init__(self, num_classes=15):
        super(MultiViewCNN, self).__init__()
        self.feature_extractor = CNNFeatureExtractor()
        self.final_fc = nn.Sequential(
            nn.Linear(512 * 6, 1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, num_classes)
        )

    def forward(self, x):
        batch_size, num_views, channels, height, width = x.shape
        x = x.view(batch_size * num_views, channels, height, width)
        features = self.feature_extractor(x)
        features = features.view(batch_size, num_views, -1)
        features = features.view(batch_size, -1)
        out = self.final_fc(features)
        return out

# Updated Training Function
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=20, model_path="best_model.pth", threshold=0.5):
    best_val_loss = float("inf")

    for epoch in range(num_epochs):
        # Training Phase
        model.train()
        total_train_loss = 0.0
        for images, labels, _ in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epochs} - Train"):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)  # Logits
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

        # Validation Phase
        model.eval()
        total_val_loss = 0.0
        all_val_labels = []
        all_val_preds = []
        all_val_probs = []
        batch_count = 0

        with torch.no_grad():
            for images, labels, _ in tqdm(val_loader, desc=f"Epoch {epoch + 1}/{num_epochs} - Val"):
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)  # Logits
                loss = criterion(outputs, labels)
                total_val_loss += loss.item()

                # Convert logits to probabilities
                probs = torch.sigmoid(outputs)
                # Threshold predictions
                preds = (probs > threshold).float()

                # Store for metrics
                all_val_labels.append(labels.cpu().numpy())
                all_val_preds.append(preds.cpu().numpy())
                all_val_probs.append(probs.cpu().numpy())

                # Visualize first three batches
                if batch_count < 3:
                    print(f"\nValidation Batch {batch_count + 1}:")
                    print("True Labels:", labels.cpu().numpy()[:2])  # Show first 2 samples
                    print("Raw Probabilities:", probs.cpu().numpy()[:2])
                    print("Thresholded Predictions (>{threshold}):", preds.cpu().numpy()[:2])
                batch_count += 1

        # Compute Metrics
        avg_train_loss = total_train_loss / len(train_loader)
        avg_val_loss = total_val_loss / len(val_loader)

        # Concatenate all predictions and labels
        all_val_labels = np.concatenate(all_val_labels, axis=0)  # (num_samples, num_classes)
        all_val_preds = np.concatenate(all_val_preds, axis=0)    # (num_samples, num_classes)
        all_val_probs = np.concatenate(all_val_probs, axis=0)    # (num_samples, num_classes)

        # Validation Accuracy (strict: all labels must match)
        correct_samples = np.all(all_val_preds == all_val_labels, axis=1)  # True if all labels match
        val_accuracy = np.mean(correct_samples) * 100  # Percentage

        # F1 Score (micro: element-wise across all labels)
        val_f1 = f1_score(all_val_labels, all_val_preds, average='micro', zero_division=0)

        # Print Results
        print(f"Epoch [{epoch+1}/{num_epochs}]")
        print(f"Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")
        print(f"Val Accuracy (strict): {val_accuracy:.2f}%, Val F1 (micro): {val_f1:.4f}")

        # Save Best Model (based on loss, not accuracy)
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), model_path)
            print("Best model saved!")

        scheduler.step()

# Load Model Function (unchanged)
def load_best_model(model, model_path="best_model.pth"):
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model

# Hyperparameters
num_classes = 15
num_epochs = 20
learning_rate = 0.001
threshold = 0.5  # For thresholding predictions

# Model Setup
model = MultiViewCNN(num_classes).to(device)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

# Train and Save Best Model
train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs, model_path="best_model.pth", threshold=threshold)

# Load the Best Model
model = load_best_model(model, "best_model.pth")

# Inference Example
test_images = torch.randn(1, 6, 3, 512, 512).to(device)  # Fixed channels from 1 to 3
with torch.no_grad():
    logits = model(test_images)
    probs = torch.sigmoid(logits)
    print("Predicted class probabilities:", probs)

# Validation in membership matrix (main approach)

# sec 1
generating the text files of the features, and model predictions

In [None]:
import os
import pandas as pd
import numpy as np
import torch
from sklearn.metrics.pairwise import cosine_distances

# Paths
query_csv_path = "query_labels_new_2.csv"
obj_dir = "E:/Ahmed/IMT Nord Europe/3D/Retrieval_Screenshots_2"
num_classes = 15  # Number of trained classes
num_classes_finetune = 4  # Number of fine-tuned classes (15-18, re-indexed as 0-3)

# Step 1: Get number of validation objects (M) and sort them ascendingly
objects = [f for f in os.listdir(obj_dir) if os.path.isdir(os.path.join(obj_dir, f))]
objects = sorted(objects, key=lambda x: int(x))  # Sort by integer value
M = len(objects)
object_indices = {obj: i for i, obj in enumerate(objects)}
print(f"Number of objects (M): {M}")
print(f"Sorted objects: {objects[:10]}...")

# Step 2: Read query CSV and preserve exact pattern sequence
query_df = pd.read_csv(query_csv_path)
query_patterns = []  # List of (file_index, pattern) tuples

for _, row in query_df.iterrows():
    file_index = str(int(row['file_index']))
    if pd.notna(row['pattern_1']):
        pattern = int(row['pattern_1'])
        query_patterns.append((file_index, pattern))
    if pd.notna(row['pattern_2']):
        pattern = int(row['pattern_2'])
        query_patterns.append((file_index, pattern))

N = len(query_patterns)
print(f"Number of pattern instances (N): {N}")
print(f"Query patterns (first 10): {query_patterns[:10]}...")
if N != 94:
    print(f"Warning: Expected N = 94, but got N = {N}. Check query_labels_new_2.csv.")

# Step 3: Extract features and predictions using the original 15-class model
retrieval_features = extract_features(retrieval_loader, model, device)  # (M, 512)
query_features = extract_features(query_loader, model, device)  # (N, 512)

# # Precompute predictions for the original 15 classes
# predictions = np.zeros((M, num_classes))
# with torch.no_grad():
#     for images, _, folder_names in retrieval_loader:
#         images = images.to(device)
#         outputs = model(images)  # (batch_size, num_classes)
#         batch_outputs = outputs.cpu().numpy()
#         for i, folder_name in enumerate(folder_names):
#             col_idx = object_indices[folder_name]
#             predictions[col_idx] = batch_outputs[i]

# # Step 4: Save predictions to text file (300 lines, 15 columns)
# with open("predictionsVGG.txt", "w") as f:
#     for i in range(M):
#         pred_line = " ".join([f"{x:.3f}" for x in predictions[i]])
#         f.write(pred_line + "\n")
# print("Predictions saved to predictionsVGG.txt")

# Step 5: Precompute predictions for the fine-tuned 4-class model
# Assume model_4class is the fine-tuned model loaded elsewhere
predictions_4class = np.zeros((M, num_classes_finetune))
with torch.no_grad():
    for images, _, folder_names in retrieval_loader:
        images = images.to(device)
        outputs = model_4class(images)  # (batch_size, 4)
        batch_outputs = outputs.cpu().numpy()
        for i, folder_name in enumerate(folder_names):
            col_idx = object_indices[folder_name]
            predictions_4class[col_idx] = batch_outputs[i]

# Save 4-class predictions to text file (300 lines, 4 columns)
with open("predictions_4class.txt", "w") as f:
    for i in range(M):
        pred_line = " ".join([f"{x:.3f}" for x in predictions_4class[i]])
        f.write(pred_line + "\n")
print("4-class predictions saved to predictions_4class.txt")

# # Step 6: Save retrieval features to text file (300 lines, 512 columns)
# with open("retrieval_features.txt", "w") as f:
#     for i in range(M):
#         feat_line = " ".join([f"{x:.6f}" for x in retrieval_features[i]])
#         f.write(feat_line + "\n")
# print("Retrieval features saved to retrieval_features.txt")

# # Step 7: Save query features to text file (94 lines, 512 columns)
# query_folder_to_idx = {os.path.basename(folder_path): i for i, (folder_path, _) in enumerate(query_loader.dataset.samples)}
# with open("query_featuresVGG.txt", "w") as f:
#     for file_index, pattern in query_patterns:
#         patterns_for_file = [p for fi, p in query_patterns if fi == file_index]
#         query_folder = (f"{file_index}_pattern_{pattern}" if len(patterns_for_file) == 1 
#                         else f"{file_index}_pattern_{'_'.join(map(str, patterns_for_file))}")
#         if query_folder not in query_folder_to_idx:
#             print(f"Warning: Query folder {query_folder} not found. Skipping...")
#             continue
#         query_idx = query_folder_to_idx[query_folder]
#         feat_line = " ".join([f"{x:.6f}" for x in query_features[query_idx]])
#         f.write(feat_line + "\n")
# print("Query features saved to query_featuresVGG.txt")

# sec 2
generating the matrix using feature similarity

In [None]:
import os
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import euclidean_distances
import matplotlib.pyplot as plt
import seaborn as sns

# Paths
query_csv_path = "query_labels_new_2.csv"
obj_dir = "E:/Ahmed/IMT Nord Europe/3D/Retrieval_Screenshots_2"
num_classes = 15

# Step 1: Get number of validation objects (M) and sort them ascendingly
objects = [f for f in os.listdir(obj_dir) if os.path.isdir(os.path.join(obj_dir, f))]
objects = sorted(objects, key=lambda x: int(x))
M = len(objects)
object_indices = {obj: i for i, obj in enumerate(objects)}
print(f"Number of objects (M): {M}")

# Step 2: Read query CSV and preserve exact pattern sequence
query_df = pd.read_csv(query_csv_path)
query_patterns = []
pattern_labels = []

for _, row in query_df.iterrows():
    file_index = str(int(row['file_index']))
    if pd.notna(row['pattern_1']):
        pattern = int(row['pattern_1'])
        query_patterns.append((file_index, pattern))
        pattern_labels.append(f"{file_index}_{pattern}")
    if pd.notna(row['pattern_2']):
        pattern = int(row['pattern_2'])
        query_patterns.append((file_index, pattern))
        pattern_labels.append(f"{file_index}_{pattern}")

N = len(query_patterns)
print(f"Number of pattern instances (N): {N}")
if N != 94:
    print(f"Warning: Expected N = 94, but got N = {N}.")

# Step 3: Load precomputed data from files
# Load predictions (M x num_classes)
predictions = np.loadtxt("predictionsVGG.txt", dtype=float)  # (300, 15)
assert predictions.shape == (M, num_classes), f"Expected predictions shape ({M}, {num_classes}), got {predictions.shape}"

# Load retrieval features (M x 512)
retrieval_features = np.loadtxt("retrieval_features.txt", dtype=float)  # (300, 512)
assert retrieval_features.shape[1] == 512, f"Expected 512 features, got {retrieval_features.shape[1]}"

# Load query features (N x 512)
query_features = np.loadtxt("query_featuresVGG.txt", dtype=float)  # (94, 512)
assert query_features.shape == (N, 512), f"Expected query features shape ({N}, 512), got {query_features.shape}"

# Reorder query_features to match the order of query_patterns
# Create the same mapping as in the first script
query_folder_to_idx = {os.path.basename(folder_path): i for i, (folder_path, _) in enumerate(query_loader.dataset.samples)}
reordered_query_features = np.zeros_like(query_features)  # (N, 512)

for row_idx, (file_index, pattern) in enumerate(query_patterns):
    patterns_for_file = [p for fi, p in query_patterns if fi == file_index]
    query_folder = (f"{file_index}_pattern_{pattern}" if len(patterns_for_file) == 1 
                    else f"{file_index}_pattern_{'_'.join(map(str, patterns_for_file))}")
    if query_folder not in query_folder_to_idx:
        print(f"Warning: Query folder {query_folder} not found in query_loader. Using zeros for this row.")
        continue
    query_idx = query_folder_to_idx[query_folder]
    reordered_query_features[row_idx] = query_features[query_idx]

# Precompute the maximum Euclidean distance for normalization
all_distances = euclidean_distances(reordered_query_features, retrieval_features)  # (N, M)
max_distance = np.max(all_distances)  # Maximum Euclidean distance for normalization
if max_distance == 0:
    print("Warning: Maximum Euclidean distance is 0. Setting similarities to 0.")
    max_distance = 1e-10  # Avoid division by zero

# Step 4: Initialize membership matrix
membership_matrix = np.zeros((N, M))

# Step 5: Process each query pattern in sequence
for row_idx, (file_index, pattern) in enumerate(query_patterns):
    print(f"Processing pattern {pattern} (row {row_idx}) for file_index {file_index}...")

    if pattern <= num_classes:
        # Use precomputed predictions directly (no thresholding)
        for col_idx in range(M):
            membership_matrix[row_idx, col_idx] = predictions[col_idx, pattern - 1]
    else:
        # Feature-based similarity using normalized Euclidean distance
        query_feature = reordered_query_features[row_idx:row_idx+1]  # (1, 512)
        distances = euclidean_distances(query_feature, retrieval_features)[0]  # (M,)
        # Normalize distances and compute similarities
        similarities = 1 - (distances / max_distance)
        similarities = np.clip(similarities, 0, 1)  # Ensure similarities are in [0, 1]
        membership_matrix[row_idx] = similarities

# Step 6: Save and generate files
np.save("membership_matrix.npy", membership_matrix)
print("Membership matrix saved to membership_matrix.npy")
print(f"Matrix shape: {membership_matrix.shape}")

# Generate membership_matrix.txt
output_file = "membership_matrix_VGG_Features.txt"
with open(output_file, 'w') as f:
    f.write("Membership Matrix (Pattern vs. Object Confidence Scores):\n")
    header = "Query   | " + " | ".join([f"{obj[:5]:<5}" for obj in objects])
    f.write(header + "\n")
    f.write("-" * len(header) + "\n")
    for i, (file_index, pattern) in enumerate(query_patterns):
        row_values = membership_matrix[i]
        row_str = f"{file_index}_{pattern:<5} | " + " | ".join([f"{score:.3f}" for score in row_values])
        f.write(row_str + "\n")
print(f"Membership matrix saved to {output_file}")

# Generate competition submission file (NameParticipant_run1.txt)
submission_file = "NameParticipant_run1_VGG_Features.txt"
with open(submission_file, 'w') as f:
    for i in range(N):
        row_values = membership_matrix[i]
        f.write(" ".join([f"{score:.3f}" for score in row_values]) + "\n")
print(f"Competition submission file saved to {submission_file}")


# sec 3
generating the matrix using the fine tuned model

In [None]:
# This part is for the second approach of producing the matrice from the two classes.

import os
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_distances
import matplotlib.pyplot as plt
import seaborn as sns

# Paths
query_csv_path = "query_labels_new_2.csv"
obj_dir = "D:/3D/Retrieval_Screenshots_2"
num_classes = 15  # Original classes
num_classes_finetune = 4  # Fine-tuned classes (patterns 16-19, re-indexed as 0-3)

# Step 1: Get number of validation objects (M) and sort them ascendingly
objects = [f for f in os.listdir(obj_dir) if os.path.isdir(os.path.join(obj_dir, f))]
objects = sorted(objects, key=lambda x: int(x))
M = len(objects)
object_indices = {obj: i for i, obj in enumerate(objects)}
print(f"Number of objects (M): {M}")

# Step 2: Read query CSV and preserve exact pattern sequence
query_df = pd.read_csv(query_csv_path)
query_patterns = []
pattern_labels = []

for _, row in query_df.iterrows():
    file_index = str(int(row['file_index']))
    if pd.notna(row['pattern_1']):
        pattern = int(row['pattern_1'])
        query_patterns.append((file_index, pattern))
        pattern_labels.append(f"{file_index}_{pattern}")
    if pd.notna(row['pattern_2']):
        pattern = int(row['pattern_2'])
        query_patterns.append((file_index, pattern))
        pattern_labels.append(f"{file_index}_{pattern}")

N = len(query_patterns)
print(f"Number of pattern instances (N): {N}")
if N != 94:
    print(f"Warning: Expected N = 94, but got N = {N}.")

# Step 3: Load precomputed data from files
# Load predictions for original 15 classes (M x num_classes)
predictions = np.loadtxt("predictions.txt", dtype=float)  # (300, 15)
assert predictions.shape == (M, num_classes), f"Expected predictions shape ({M}, {num_classes}), got {predictions.shape}"

# Load predictions for fine-tuned 4 classes (M x 4)
predictions_4class = np.loadtxt("predictions_4class.txt", dtype=float)  # (300, 4)
assert predictions_4class.shape == (M, num_classes_finetune), f"Expected 4-class predictions shape ({M}, {num_classes_finetune}), got {predictions_4class.shape}"

# Load retrieval features (M x 512)
retrieval_features = np.loadtxt("retrieval_features.txt", dtype=float)  # (300, 512)
assert retrieval_features.shape[1] == 512, f"Expected 512 features, got {retrieval_features.shape[1]}"

# Load query features (N x 512)
query_features = np.loadtxt("query_features.txt", dtype=float)  # (94, 512)
assert query_features.shape == (N, 512), f"Expected query features shape ({N}, 512), got {query_features.shape}"

# Step 4: Initialize membership matrix
membership_matrix = np.zeros((N, M))

# Step 5: Process each query pattern in sequence
for row_idx, (file_index, pattern) in enumerate(query_patterns):
    print(f"Processing pattern {pattern} (row {row_idx}) for file_index {file_index}...")

    if pattern <= num_classes:  # Patterns 1-15 (0-14 in zero-based indexing)
        # Use precomputed predictions from the original 15-class model
        for col_idx in range(M):
            membership_matrix[row_idx, col_idx] = predictions[col_idx, pattern - 1]
    else:  # Patterns 16-19 (fine-tuned classes 0-3)
        # Use predictions from the fine-tuned 4-class model
        class_idx = pattern - 16  # Map pattern 16→0, 17→1, 18→2, 19→3
        for col_idx in range(M):
            membership_matrix[row_idx, col_idx] = predictions_4class[col_idx, class_idx]

# Step 6: Save and generate files
np.save("membership_matrix.npy", membership_matrix)
print("Membership matrix saved to membership_matrix.npy")
print(f"Matrix shape: {membership_matrix.shape}")

# Generate membership_matrix.txt with compact header
output_file = "membership_matrix.txt"
with open(output_file, 'w') as f:
    f.write("Membership Matrix (Pattern vs. Object Confidence Scores):\n")
    header = "Query   | " + " | ".join([f"{obj[:5]:<5}" for obj in objects])
    f.write(header + "\n")
    f.write("-" * len(header) + "\n")
    for i, (file_index, pattern) in enumerate(query_patterns):
        row_values = membership_matrix[i]
        row_str = f"{file_index}_{pattern:<5} | " + " | ".join([f"{score:.3f}" for score in row_values])
        f.write(row_str + "\n")
print(f"Membership matrix saved to {output_file}")

# Generate competition submission file (NameParticipant_run1.txt)
submission_file = "NameParticipant_run1.txt"
with open(submission_file, 'w') as f:
    for i in range(N):
        row_values = membership_matrix[i]
        f.write(" ".join([f"{score:.3f}" for score in row_values]) + "\n")
print(f"Competition submission file saved to {submission_file}")


# Validation in membership matrix (not used anymore)

In [None]:
import os
import pandas as pd
import numpy as np
import torch
from sklearn.metrics.pairwise import cosine_distances
import matplotlib.pyplot as plt
import seaborn as sns

# Paths
query_csv_path = "query_labels_new_2.csv"
obj_dir = "D:/3D/Retrieval_Screenshots"
num_classes = 15  # From your setup

# Step 1: Get number of validation objects (M) and sort them ascendingly
objects = [f for f in os.listdir(obj_dir) if os.path.isdir(os.path.join(obj_dir, f))]
# Sort objects by integer value of their names
objects = sorted(objects, key=lambda x: int(x))
M = len(objects)
object_indices = {obj: i for i, obj in enumerate(objects)}
print(f"Number of objects (M): {M}")
print(f"Sorted objects: {objects[:10]}...")  # Debug: Show first 10 objects

# Step 2: Read query CSV and preserve exact pattern sequence
query_df = pd.read_csv(query_csv_path)
query_patterns = []  # List of (file_index, pattern) tuples in order of appearance
pattern_labels = []  # For labeling rows in visualization

for _, row in query_df.iterrows():
    file_index = str(int(row['file_index']))
    if pd.notna(row['pattern_1']):
        pattern = int(row['pattern_1'])
        query_patterns.append((file_index, pattern))
        pattern_labels.append(f"{file_index}_{pattern}")
    if pd.notna(row['pattern_2']):
        pattern = int(row['pattern_2'])
        query_patterns.append((file_index, pattern))
        pattern_labels.append(f"{file_index}_{pattern}")

N = len(query_patterns)
print(f"Number of pattern instances (N): {N}")
print(f"Query patterns (first 10): {query_patterns[:10]}...")  # Debug: Show first 10

# Verify N matches expected value
if N != 94:
    print(f"Warning: Expected N = 94, but got N = {N}. Check query_labels_new_2.csv for consistency.")

# Step 3: Initialize membership matrix
membership_matrix = np.zeros((N, M))

# Step 4: Extract features and predictions once
retrieval_features = extract_features(retrieval_loader, model, device)  # (M, 512)
query_features = extract_features(query_loader, model, device)  # (num_queries, 512)

# Precompute predictions for trained classes
predictions = np.zeros((M, num_classes))
with torch.no_grad():
    offset = 0
    for images, _, folder_names in retrieval_loader:
        images = images.to(device)
        outputs = model(images)  # (batch_size, num_classes)
        batch_size = outputs.size(0)
        batch_outputs = outputs.cpu().numpy()
        batch_preds = (batch_outputs > 0.5).astype(float)
        for i, folder_name in enumerate(folder_names):
            col_idx = object_indices[folder_name]
            predictions[col_idx] = batch_outputs[i]
            if batch_preds[i].any():
                membership_matrix[:, col_idx] = 0  # Reset for later update
        offset += batch_size

# Step 5: Process each query pattern in sequence
query_folder_to_idx = {os.path.basename(folder_path): i for i, (folder_path, _) in enumerate(query_loader.dataset.samples)}

for row_idx, (file_index, pattern) in enumerate(query_patterns):
    print(f"Processing pattern {pattern} (row {row_idx}) for file_index {file_index}...")

    if pattern <= num_classes:
        # Use precomputed predictions
        for col_idx in range(M):
            membership_matrix[row_idx, col_idx] = predictions[col_idx, pattern - 1]

    else:
        # Feature-based similarity
        # Construct folder name based on all patterns for this file_index
        patterns_for_file = [p for fi, p in query_patterns if fi == file_index]
        query_folder = (f"{file_index}_pattern_{pattern}" if len(patterns_for_file) == 1 
                        else f"{file_index}_pattern_{'_'.join(map(str, patterns_for_file))}")
        if query_folder not in query_folder_to_idx:
            print(f"Warning: Query folder {query_folder} not found in query_loader. Skipping...")
            continue
        query_idx = query_folder_to_idx[query_folder]
        query_feature = query_features[query_idx:query_idx+1]  # (1, 512)
        similarities = 1 - (distances / max_distance)
        similarities = np.clip(similarities, 0, 1)  # Ensure similarities are in [0, 1]
        membership_matrix[row_idx] = similarities

# Step 6: Save and generate files
np.save("membership_matrix.npy", membership_matrix)
print("Membership matrix saved to membership_matrix.npy")
print(f"Matrix shape: {membership_matrix.shape}")

# Generate membership_matrix.txt with compact header
output_file = "membership_matrix.txt"
with open(output_file, 'w') as f:
    f.write("Membership Matrix (Pattern vs. Object Confidence Scores):\n")
    header = "Query   | " + " | ".join([f"{obj[:5]:<5}" for obj in objects])  # Compact header
    f.write(header + "\n")
    f.write("-" * len(header) + "\n")
    for i, (file_index, pattern) in enumerate(query_patterns):
        row_values = membership_matrix[i]
        row_str = f"{file_index}_{pattern:<5} | " + " | ".join([f"{score:.3f}" for score in row_values])
        f.write(row_str + "\n")
print(f"Membership matrix saved to {output_file}")

# Generate competition submission file (NameParticipant_run1.txt)
submission_file = "NameParticipant_run1.txt"
with open(submission_file, 'w') as f:
    for i in range(N):
        row_values = membership_matrix[i]
        f.write(" ".join([f"{score:.3f}" for score in row_values]) + "\n")
print(f"Competition submission file saved to {submission_file}")