# 1. EDA

# 2. Define utils of dataset

In [179]:
def pair_comparison(a, b):
    """
    This function compares two values a and b.
    If they are equal, it returns 1.
    If they are not equal, it returns 0.
    """
    if a == b:
        # If a is equal to b, return 1
        return 1
    else:
        # If a is not equal to b, return 0
        return 0


In [180]:
import random
def gram_matrix(list_of_score):
    """
    This function computes the Gram matrix for a list of scores.
    
    The Gram matrix is a matrix of pairwise comparisons of scores.
    Each element [i][j] in the matrix represents the result of
    comparing list_of_score[i] with list_of_score[j] using the
    pair_comparison function.
    
    Args:
    - list_of_score: A list of scores
    
    Returns:
    - gram_matrix: The Gram matrix computed from the pairwise comparisons
    """
    # Get the length of the list of scores
    n = len(list_of_score)
    
    # Initialize the Gram matrix with zeros
    gram_matrix = [[0 for _ in range(n)] for _ in range(n)]

    # Iterate through each pair of scores
    for i in range(n):
        for j in range(n):
            # Compute the pairwise comparison using the pair_comparison function
            gram_matrix[i][j] = pair_comparison(list_of_score[i], list_of_score[j])
    
    # Return the computed Gram matrix
    return gram_matrix


In [181]:
# Test
list_of_score = [1,2,3,2,3,4,2,1,4]
gram_matrix(list_of_score)

[[1, 0, 0, 0, 0, 0, 0, 1, 0],
 [0, 1, 0, 1, 0, 0, 1, 0, 0],
 [0, 0, 1, 0, 1, 0, 0, 0, 0],
 [0, 1, 0, 1, 0, 0, 1, 0, 0],
 [0, 0, 1, 0, 1, 0, 0, 0, 0],
 [0, 0, 0, 0, 0, 1, 0, 0, 1],
 [0, 1, 0, 1, 0, 0, 1, 0, 0],
 [1, 0, 0, 0, 0, 0, 0, 1, 0],
 [0, 0, 0, 0, 0, 1, 0, 0, 1]]

In [182]:
def split_data_randomly(data, labels, k):
    """
    Split the data and labels randomly into groups of size k.

    Args:
    - data: List of data elements
    - labels: List of corresponding labels
    - k: Size of each group

    Returns:
    - image_groups: List of groups containing data elements
    - label_groups: List of groups containing corresponding labels
    """
    # Shuffle data and labels in sync
    combined_data = list(zip(data, labels))
    random.shuffle(combined_data)
    # Split the shuffled data into groups of size k
    num_groups = len(combined_data) // k
    image_groups = [data[i * k : (i + 1) * k] for i in range(num_groups)]
    label_groups = [labels[i * k : (i + 1) * k] for i in range(num_groups)]
    return image_groups, label_groups


In [183]:
from torch.utils.data import Dataset
import pandas as pd
import cv2
import os
import torch

class SeveritySimilarityDataset(Dataset):
    def __init__(self, annotation_df: pd.DataFrame, dataset_dir: str, phase: str = "training", num_per_cluster: int = 5) -> None:
        """
        Dataset class for severity similarity task.

        Args:
        - annotation_df: DataFrame containing annotations
        - dataset_dir: Directory containing image data
        - phase: Phase of the dataset (e.g., "training", "validation", "testing")
        - num_per_cluster: Number of images per cluster
        """
        super(SeveritySimilarityDataset, self).__init__()
        self.dataset_dir = dataset_dir

        # Filter data based on the specified phase
        data = annotation_df[annotation_df["split"] == phase]
        # Concatenate study_id and image_id to get image paths
        image_paths_df = data["study_id"] + "/" + data["image_id"]
        image_paths = image_paths_df.tolist()
    
        # Get labels
        labels_df = data["breast_birads"]
        labels = labels_df.to_list()

        # Split data into clusters
        self.image_cluster_list, self.label_cluster_list = split_data_randomly(image_paths, labels, num_per_cluster)

    def __len__(self):
        """
        Returns the number of clusters in the dataset.
        """
        return len(self.label_cluster_list)
    
    def __getitem__(self, index):
        """
        Retrieves a cluster of images and its corresponding label cluster.

        Args:
        - index: Index of the cluster to retrieve

        Returns:
        - images: List of images in the cluster
        - gram_matrix: Gram matrix computed from the label cluster
        """
        image_cluster = self.image_cluster_list[index]
        label_cluster = self.label_cluster_list[index]
        images = []
        for image_path in image_cluster:
            abs_image_path = os.path.join(self.dataset_dir, image_path)
            # Read and preprocess image
            image = cv2.imread(abs_image_path)
            image = cv2.resize(image, (224, 224))
            image_tensor = torch.from_numpy(image)
            image_tensor_transposed = image_tensor.permute(2, 0, 1).float()  # Transpose image tensor
            images.append(image_tensor_transposed)
        # # Compute Gram matrix
        gram_matrix_ = gram_matrix(label_cluster)

        return  torch.tensor(gram_matrix_)


In [184]:
# test
df = pd.read_csv("breast-level_annotations.csv")
data = SeveritySimilarityDataset(df, ".", "training", 6)
print(len(data))
print(data[25])

2666
['72f7758250782ee835586f7122acd9fd/a3b1d7b76c8777c7a668e9c346cdc9d1', '72f7758250782ee835586f7122acd9fd/0d5507137e993dff61c8fdff2136c14b', '34317eec7ecb8de8f9c71bb55abb5ce4/4285bb1fd50ed9f19bd507e66738189c', '34317eec7ecb8de8f9c71bb55abb5ce4/5e35ead87bb708f1fea95033ddb2e093', '34317eec7ecb8de8f9c71bb55abb5ce4/c524513301e2f530506fa32ab8642849', '34317eec7ecb8de8f9c71bb55abb5ce4/475030258d30d870b8a38aac40d4763d']
['BI-RADS 1', 'BI-RADS 1', 'BI-RADS 3', 'BI-RADS 3', 'BI-RADS 1', 'BI-RADS 1']
tensor([[1, 1, 0, 0, 1, 1],
        [1, 1, 0, 0, 1, 1],
        [0, 0, 1, 1, 0, 0],
        [0, 0, 1, 1, 0, 0],
        [1, 1, 0, 0, 1, 1],
        [1, 1, 0, 0, 1, 1]])


# 2. Model

In [185]:
from torch import nn
import torch
import torchvision.models as models
import timm

class Setting_2_model(nn.Module):
    def __init__(self, model_name: str, embed_dim: int):
        """
        A custom model for Setting 2, which uses different pre-trained models
        based on the specified `model_name`.

        Args:
        - model_name: Name of the pre-trained model to be used
        - embed_dim: Dimension of the output embeddings
        """
        super(Setting_2_model, self).__init__()

        # Load the specified pre-trained model
        if model_name.startswith('resnet'):
            if model_name == 'resnet50':
                self.model = models.resnet50(pretrained=True)
            elif model_name == 'resnet101':
                self.model = models.resnet101(pretrained=True)
            elif model_name == 'resnet152':
                self.model = models.resnet152(pretrained=True)
            else:
                raise ValueError(f"Unsupported ResNet model: {model_name}")
                
            num_features = self.model.fc.in_features
            self.model.fc = nn.Linear(num_features, embed_dim)
        
        elif model_name.startswith('densenet'):
            if model_name == 'densenet121':
                self.model = models.densenet121(pretrained=True)
            else:
                raise ValueError(f"Unsupported DenseNet model: {model_name}")
                
            num_features = self.model.classifier.in_features
            self.model.classifier = nn.Linear(num_features, embed_dim)
        
        elif model_name.startswith('vit'):
            self.model = timm.create_model(model_name, pretrained=True)

            num_features = self.model.head.in_features
            self.model.head = nn.Linear(num_features, embed_dim)
        
        else:
            raise ValueError(f"Unsupported model: {model_name}")
    
    def forward(self, images):
        """
        Forward pass of the model.

        Args:
        - images: A list of input images

        Returns:
        - gram_matrix: The Gram matrix computed from the embeddings
        """
        embeddings = []
        # Iterate over the list of input images
        for image in images:
            # Pass the image through the pre-trained model
            image_embedding = self.model(image)
            # Append the embedding to the list
            embeddings.append(image_embedding)
        # Concatenate the embeddings along the batch dimension
        embeddings_tensor = torch.cat(embeddings, dim=0)
        
        # Normalize the embeddings
        embeddings_normalized = torch.nn.functional.normalize(embeddings_tensor, p=2, dim=1)

        # Compute the Gram matrix
        gram_matrix = torch.matmul(embeddings_normalized, embeddings_normalized.transpose(0, 1))
        return gram_matrix


# 3. Loss

In [186]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        """
        Contrastive Loss function for computing the loss between predicted
        and ground truth Gram matrices.

        Args:
        - margin: Margin value for the loss calculation
        """
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, gram_matrix_predicted, gram_matrix_ground_truth):
        """
        Forward pass of the Contrastive Loss function.

        Args:
        - gram_matrix_predicted: Predicted Gram matrix
        - gram_matrix_ground_truth: Ground truth Gram matrix

        Returns:
        - loss: Contrastive Learning Loss
        """
        # Compute cosine similarity (distance)
        cosine_similarity = F.cosine_similarity(gram_matrix_predicted, gram_matrix_ground_truth, dim=-1)
        
        # Compute Contrastive Learning Loss
        loss = torch.mean(torch.clamp(self.margin - cosine_similarity, min=0.0))
        return loss


In [187]:
# test
import torch

gt = [1,2,3,2,3,4,2,1,4]
gt = torch.tensor(gram_matrix(gt),dtype=torch.float)

pred = [1,2,3,2,3,4,2,2,4]
pred = torch.tensor(gram_matrix(pred), dtype=torch.float)

criterion = ContrastiveLoss()
loss = criterion(pred, gt)

print(loss)

tensor(0.1490)


# 4. Training 

In [188]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import torch.optim as optim

def train_model(model, train_dataset, val_dataset, num_epochs=10, batch_size=32, learning_rate=0.001):
    """
    Train the model using the provided datasets.

    Args:
    - model: The model to be trained
    - train_dataset: Dataset for training
    - val_dataset: Dataset for validation
    - num_epochs: Number of epochs for training
    - batch_size: Batch size for training
    - learning_rate: Learning rate for optimization

    Returns:
    - model: Trained model
    - train_losses: List of training losses
    - val_losses: List of validation losses
    """
    # Define data loaders for training and validation
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # Define loss function and optimizer
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Lists to store training and validation losses
    train_losses = []
    val_losses = []

    print("Training started...")
    for epoch in range(num_epochs):
        model.train()
        running_train_loss = 0.0
        for i, (images, labels) in enumerate(train_loader, 1):
            optimizer.zero_grad()
            # Forward pass
            outputs = model(images)
            # Compute loss
            loss = criterion(outputs, labels)
            # Backward pass
            loss.backward()
            optimizer.step()
            running_train_loss += loss.item() * images.size(0)

            if i % 10 == 0:
                print(f"Epoch [{epoch+1}/{num_epochs}], Batch [{i}/{len(train_loader)}], Train Loss: {loss.item():.4f}")
        
        # Compute average training loss for the epoch
        epoch_train_loss = running_train_loss / len(train_dataset)
        train_losses.append(epoch_train_loss)

        # Validation loop
        model.eval()
        running_val_loss = 0.0
        with torch.no_grad():
            for i, (images, labels) in enumerate(val_loader, 1):
                outputs = model(images)
                loss = criterion(outputs, labels)
                running_val_loss += loss.item() * images.size(0)

                if i % 10 == 0:
                    print(f"Epoch [{epoch+1}/{num_epochs}], Validation Batch [{i}/{len(val_loader)}], Val Loss: {loss.item():.4f}")
        
        # Compute average validation loss for the epoch
        epoch_val_loss = running_val_loss / len(val_dataset)
        val_losses.append(epoch_val_loss)

        # Print progress
        print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {epoch_train_loss:.4f}, Val Loss: {epoch_val_loss:.4f}")

    print("Training completed.")

    return model, train_losses, val_losses


def test_model(model, test_dataset, batch_size=32):
    """
    Evaluate the model on the test dataset.

    Args:
    - model: The trained model to be evaluated
    - test_dataset: Dataset for testing
    - batch_size: Batch size for testing

    Returns:
    - test_loss: Test loss
    """
    # Define data loader for testing
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    # Define loss function
    criterion = nn.MSELoss()

    # Set model to evaluation mode
    model.eval()

    # Initialize variables for computing test loss
    running_test_loss = 0.0
    num_samples = 0

    print("Testing started...")
    with torch.no_grad():
        for images, labels in test_loader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_test_loss += loss.item() * images.size(0)
            num_samples += images.size(0)

    # Compute test loss
    test_loss = running_test_loss / num_samples

    print(f"Test Loss: {test_loss:.4f}")
    print("Testing completed.")

    return test_loss

# Example usage:
# model = Setting_2_model(model_name='resnet50', embed_dim=512)
# train_dataset = SeveritySimilarityDataset(train_annotation_df, dataset_dir, phase='training')
# val_dataset = SeveritySimilarityDataset(val_annotation_df, dataset_dir, phase='validation')
# test_dataset = SeveritySimilarityDataset(test_annotation_df, dataset_dir, phase='testing')
# trained_model, train_losses, val_losses = train_model(model, train_dataset, val_dataset, num_epochs=10, batch_size=32, learning_rate=0.001)
# test_loss = test_model(trained_model, test_dataset, batch_size
