In [1]:

import torch
import numpy as np
import pandas as pd

import torch
import torchvision
from torch.utils.data import Dataset

In [2]:
from torchvision import transforms
from torchvision.models import resnet50, ResNet50_Weights

In [3]:

import torch
from pathlib import Path
import numpy as np
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt

### Reading Data and Loading Data 

In [4]:
import sys
print("Python version:", sys.version)
print("Python executable:", sys.executable)

Python version: 3.8.20 (default, Oct  3 2024, 15:24:27) 
[GCC 11.2.0]
Python executable: /sc/home/masoumeh.javanbakhat/conda3/envs/myenv/bin/python


In [8]:
import os
# The path that data has been saved is as follows:
# /sc/dhc-cold/groups/fglippert => but the permission to dhc-cold has been denied
IMG_DIR_OLD = "/sc/dhc-cold/groups/fglippert/adni_t1_mprage/T1_3T_coronal_slice/T1_3T_coronal_mni_linear_hippo_resolution256"
IMG_DIR = '/sc/projects/sci-lippert/chair/adni_t1_mprage/T1_3T_coronal_slice/T1_3T_coronal_mni_linear_hippo_resolution256'
IMG_DIR = os.path.join(IMG_DIR, "group_by_hippocampus")
ANNOTATION_FILE = "adni_T1_3T_linear_annotation.csv"
ANNOTATION_PATH = os.path.join(IMG_DIR, ANNOTATION_FILE)
COLS = ["archive_fname", "group"]

### Downloading Data 

In [9]:
class AdniMRIDataset2D(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(
            self.img_dir, self.img_labels["archive_fname"].iloc[idx])
        # print(img_path)
        image = self.read_image(img_path)
        label = self.img_labels["group"].iloc[idx]

        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

    def read_image(self, path):
        img = nib.load(path).get_fdata().astype(np.uint8)
        if img.ndim == 2:
            img = img[:, :, np.newaxis]  # HW -> HWC
        img = img.transpose(2, 0, 1)  # HWC -> CHW
        return img

In [10]:
adni_dataset = AdniMRIDataset2D(
    annotations_file=ANNOTATION_PATH, img_dir=IMG_DIR)

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

### Path not available!
As it's apparent from the previous cell, the location "/sc/dhc-cold/groups/fglippert/adni_t1_mprage/T1_3T_coronal_slice/T1_3T_coronal_mni_linear_hippo_resolution256/group_by_hippocampus/adni_T1_3T_linear_annotation.csv" doesn't exist!
Then we should find the new dataset

In [13]:
import nibabel as nib

In [14]:
def get_groups(dataset):
    group0 = []
    group1 = []

    for img, label in dataset:
        if label == 0:
            group0.append(img)
        else:
            group1.append(img)

    group0 = np.concatenate(group0, axis=0)
    group1 = np.concatenate(group1, axis=0)

    return (group0, group1)

In [15]:
group0, group1 = get_groups(adni_dataset)

In [16]:
print(group0.shape)
print(group1.shape)

(4591, 256, 256)
(4592, 256, 256)


## Loading pre-trianed model

- Loading the pretariend model along with weighst

- Extracting embeddings from the last layer on groups 

- Checking statitical test to see if they are statistically different

- Uisng Grad-Cam for back-probabgating test-statistic and visualise differences 

In [17]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import resnet50, ResNet50_Weights

HID_DIM = 2048
OUT_DIM = 128


# Renet backbone
class resnet50_fext(nn.Module):
    def __init__(self, pretarin=True):
        super(resnet50, self).__init__()
        if pretarin:
            backbone = resnet50(weights=ResNet50_Weights.IMAGENET1K_V2)
        self.encoder = torch.nn.Sequential(*list(backbone.children())[:-1])

    def forward(self, x):
        embedding = self.encoder(x)
        embedding = embedding.view(embedding.size()[0], -1)
        return (embedding)


# Linear model
class MLP(nn.Module):
    def __init__(self, in_dim, mlp_hid_size, proj_size):
        super(MLP, self).__init__()
        self.head = nn.Sequential(nn.Linear(in_dim, mlp_hid_size),
                                  nn.BatchNorm1d(mlp_hid_size),
                                  nn.ReLU(),
                                  nn.Linear(mlp_hid_size, proj_size))

    def forward(self, x):
        x = self.head(x)
        return (x)

# Byol model


class BYOL(nn.Module):
    def __init__(self, net, backbone, hid_dim, out_dim):
        super(BYOL, self).__init__()
        self.net = net
        self.encoder = torch.nn.Sequential(*list(backbone.children())[:-1])
        self.projection = MLP(in_dim=backbone.fc.in_features,
                              mlp_hid_size=hid_dim, proj_size=out_dim)
        self.prediction = MLP(
            in_dim=out_dim, mlp_hid_size=hid_dim, proj_size=out_dim)

    def forward(self, x):
        embedding = self.encoder(x)
        embedding = embedding.view(embedding.size()[0], -1)
        project = self.projection(embedding)

        if self.net == 'target':
            return (project)
        predict = self.prediction(project)
        return (predict)

# SimCLR model


class SimCLR(nn.Module):
    def __init__(self, backbone, hid_dim, out_dim):
        super(SimCLR, self).__init__()
        # we get representations from avg_pooling layer
        self.encoder = torch.nn.Sequential(*list(backbone.children())[:-1])
        self.projection = MLP(in_dim=backbone.fc.in_features,
                              mlp_hid_size=hid_dim, proj_size=out_dim)

    def forward(self, x):
        embedding = self.encoder(x)
        embedding = embedding.view(embedding.size()[0], -1)
        project = self.projection(embedding)
        return (project)

In [18]:
base_path = "/dhc/home/masoumeh.javanbakhat/netstore-old/Baysian/3D/Explainability/Retina_Codes"

root_dir = Path(base_path)

checkpoint_dir = root_dir / 'self_supervised' / 'simclr' / 'simclr_ckpts'

pre_exp = 2

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

sam_dir_last = os.path.join(checkpoint_dir, f'{pre_exp}_last_sclr.pt')

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

state_dict = torch.load(sam_dir_last, map_location=device)

# Load model

backbone = resnet50(weights=ResNet50_Weights.IMAGENET1K_V2)

model = SimCLR(backbone, hid_dim=2048, out_dim=128).to(device)

model.load_state_dict(state_dict['model'])

encoder = torch.nn.Sequential(*list(model.children())[:-1])

  state_dict = torch.load(sam_dir_last, map_location=device)


FileNotFoundError: [Errno 2] No such file or directory: '/dhc/home/masoumeh.javanbakhat/netstore-old/Baysian/3D/Explainability/Retina_Codes/self_supervised/simclr/simclr_ckpts/2_last_sclr.pt'

- I can add a FC layer if I want to reduce dimension

In [None]:
# If I want to use a smaller dimension I can use a FC layer as follows


reduced_dim = 512  # You can adjust this as needed, e.g., 512, 256, 128
fc_layer = nn.Linear(2048, reduced_dim)

# it outputs hidden representations of dimension 512
reduced_encoder = nn.Sequential(encoder, fc_layer)

In [None]:
sample_input = torch.randn(1, 3, 224, 224).to(device)

encoder_output = encoder(sample_input)
# This removes the extra dimension, if any
encoder_output = encoder_output.squeeze()
latent_representation = fc_layer(encoder_output)
print(latent_representation.shape)

- To get embeddings I can use the following function

In [None]:
def get_embeddings_from_numpy(encoder, images_np, device, gr=1, batch_size=32):
    """Takes a numpy array of images, extract embedding vectors, return embeddings"""
    encoder.eval()
    embeddings_list = []

    # ImageNet mean and std values
    IMAGENET_MEAN = torch.tensor(
        [0.485, 0.456, 0.406]).view(1, 3, 1, 1).to(device)
    IMAGENET_STD = torch.tensor(
        [0.229, 0.224, 0.225]).view(1, 3, 1, 1).to(device)

    # Convert numpy array to torch tensor and move to device
    # Assuming images_np is of shape (n_samples, 1, 256, 256)
    images_tensor = torch.tensor(images_np).unsqueeze(1).to(device).float()
    images_tensor = images_tensor.repeat(
        1, 3, 1, 1)  # create image with 3 channels

    # Apply ImageNet normalization
    images_tensor = (images_tensor - IMAGENET_MEAN) / IMAGENET_STD

    reduced_dim = 512  # You can adjust this as needed, e.g., 512, 256, 128
    fc_layer = nn.Linear(2048, reduced_dim)

    print(f'shape of tensor:{images_tensor.shape}')
    # If you want to process the data in batches
    num_samples = images_tensor.size(0)
    for i in range(0, num_samples, batch_size):
        batch = images_tensor[i:i + batch_size]

        # Ensure the batch is on the right device
        batch = batch.to(device)

        # Get embeddings from the model
        with torch.no_grad():
            embeddings = encoder(batch)
            # Flatten embeddings if needed
            embeddings = embeddings.view(embeddings.size(0), -1)
            # embeddings = fc_layer(embeddings)
        # Append the embeddings
        embeddings_list.append(embeddings.cpu().numpy())

    n = embeddings_list[0].shape[1]

    path_embed = os.path.join('./adni_embed', f'embed_gr{gr}_{n}_nr.npy')

    gr_embed = np.vstack(embeddings_list)

    print(f'shape of embeddings:{gr_embed.shape}')

    np.save(path_embed, gr_embed)

    return gr_embed

In [None]:
group0_embed_nr = get_embeddings_from_numpy(
    encoder=encoder, images_np=group0, device=device, gr=1, batch_size=64)

group1_embed_nr = get_embeddings_from_numpy(
    encoder=encoder, images_np=group1, device=device, gr=2, batch_size=64)

- Now I can define path and save embeddings as follows:

- Note that you should change path as I used this path befoe 

In [None]:
path_gr1_embed = os.path.join(path, 'embed_gr1.npy')

path_gr2_embed = os.path.join(path, 'embed_gr2.npy')

In [None]:
# Now, let's save embeddings of each group

np.save(path_gr1_embed, group0_embed)

np.save(path_gr2_embed, group1_embed)

### Statistical Test 

In [None]:
class MMDTest:
    def __init__(self, features_X, features_Y, n_perm=1000):

        self.n_perm = n_perm
        self.features_X = features_X
        self.features_Y = features_Y

    def _compute_mmd(self, features_X, features_Y):

        mean_fX = features_X.mean(0)
        mean_fY = features_Y.mean(0)
        D = mean_fX - mean_fY
        statistic = np.linalg.norm(D)**2
        return statistic

    def _compute_p_value(self):

        # compute real test statistic
        stat = self._compute_mmd(self.features_X, self.features_Y)
        n, m = len(self.features_X), len(self.features_Y)
        l = n + m
        features_Z = np.vstack((self.features_X, self.features_Y))

        # compute null samples
        resampled_vals = np.empty(self.n_perm)
        for i in range(self.n_perm):
            index = np.random.permutation(l)  # it permutes indices from 0 to l
            feats_X, feats_Y = features_Z[index[:n]], features_Z[index[n:]]
            resampled_vals[i] = self._compute_mmd(feats_X, feats_Y)

        resampled_vals.sort()
        # p_val = np.mean(stat < resampled_vals)
        p_val = (np.sum(stat <= resampled_vals)+1)/(self.n_perm+1)
        return p_val

In [None]:
mmd = MMDTest(group0_embed, group1_embed)

In [None]:
gr1_embed = np.load(path_gr1_embed)

gr2_embed = np.load(path_gr2_embed)

print(gr1_embed.shape)

print(gr2_embed.shape)

### Getting mean embeddings and backprobagating test-statistic 

In [None]:
from gradcam import GradCAM
import gradcam
import sys
import os
sys.path.append(os.path.abspath("src"))

In [None]:
# This is my first code (old code)

def get_mean_embeddings(gcam, dataloader, device, latent_dim=2048):
    """Takes dataloader of each group, extract embedding vectors, return mean embeddings"""
    # Initialize accumulators for healthy and unhealthy groups
    sum_f = torch.zeros_like(torch.zeros(latent_dim)).to(device)
    count_f = 0  # Count of samples in each group
    for images in dataloader:
        images = images.to(device)
        embeddings = gcam.forward(images)
        embeddings = embeddings.view(embeddings.size()[0], -1)
        sum_f += embeddings.sum(dim=0)  # Sum of embeddings for this batch
        count_f += embeddings.size(0)
    mean_embed = sum_f / count_f if count_f > 0 else torch.zeros_like(sum_f)
    del sum_f, embeddings, images
    torch.cuda.empty_cache()
    return mean_embed

In [None]:
def convert_to_tensor(group, device):
    # Assuming images_np is of shape (n_samples, 1, 256, 256)
    group_tensor = torch.tensor(group).unsqueeze(1).to(device).float()
    group_tensor = group_tensor.repeat(
        1, 3, 1, 1)  # create image with 3 channels

    IMAGENET_MEAN = torch.tensor(
        [0.485, 0.456, 0.406]).view(1, 3, 1, 1).to(device)
    IMAGENET_STD = torch.tensor(
        [0.229, 0.224, 0.225]).view(1, 3, 1, 1).to(device)

    # Apply ImageNet normalization
    group_tensor = (group_tensor - IMAGENET_MEAN) / IMAGENET_STD

    return (group_tensor)

In [None]:
def get_loader(group0, group1, device, bs=64):
    group0_tensor = convert_to_tensor(group0, device)
    group1_tensor = convert_to_tensor(group1, device)

    print(group0_tensor.shape)
    print(group1_tensor.shape)

    group0_loader = DataLoader(
        group0_tensor, batch_size=bs, shuffle=False, drop_last=True)
    group1_loader = DataLoader(
        group1_tensor, batch_size=bs, shuffle=False, drop_last=True)

    return (group0_loader, group1_loader)

In [None]:
gr0_nr_loader, gr1_nr_loader = get_loader(gr0_100, gr1_100, device, bs=64)

## Backprobagating test statistic 

In [None]:
def backprobagate_statistics(model, group0, group1, target_layer, bs, device):
    """Calculate the test statistic for different groups of DR."""
    # convert numpy groups to tensors
    group0_loader, group1_loader = get_loader(group0, group1, device, bs=64)

    gcam = GradCAM(model, target_layer=target_layer, relu=True, device=device)
    # Calculate mean embeddings
    group0_mean = get_mean_embeddings(gcam, group0_loader, device)
    group1_mean = get_mean_embeddings(gcam, group1_loader, device)
    D = group0_mean - group1_mean
    print(f'group0_mean:{group0_mean.shape}')
    print(f'group1_mean:{group1_mean.shape}')
    test_statistic = torch.norm(D, p=2)
    return (test_statistic, D, gcam)

In [None]:
def process_attributions(dataloader, gcam, device, backprop_value):
    """Process and return GradCAM attributions in batches."""
    attributions_list = []
    embed_list = []  # save embeddings as numpy array
    # Compute attribution maps for each group
    for images, _ in dataloader:
        images = images.to(device)
        embeddings = gcam.forward(images)
        embeddings = embeddings.view(
            embeddings.size()[0], -1).cpu().data.numpy()
        embed_list.append(embeddings)
        del embeddings
        gcam.backward(backprop_value)
        attributions = gcam.generate()
        attributions = attributions.squeeze().cpu().data.numpy()
        attributions_list.append(attributions)
    return np.vstack(attributions_list), np.vstack(embed_list)

In [None]:
def run(model, group0, group1, device, target_layer, bs=64, backprop_type='test_statistic', latent_dim_idx=None):
    """Main experiment function."""
    test_statistic, D, gcam = backprobagate_statistics(
        model, group0, group1, target_layer, bs, device)

    backprop_value = test_statistic

    group0_loader, group1_loader = get_loader(group0, group1, device)

    group0_attr, group0_embed = process_attributions(
        group0_loader, gcam, backprop_value)
    group1_attr, group1_embed = process_attributions(
        group1_loader, gcam, backprop_value)

    # save_attributions(group0_attr, group1_attr,latent_dim_idx)

    print(f'gr1:{group0_attr.shape}')
    print(f'gr2:{group2_attr.shape}')

    return (group0_attr, group1_attr)

## Imorting heatmaps and looking at them 

In [None]:
import numpy as np

from PIL import Image

In [None]:
class ZeroGradientError(Exception):
    """Custom exception to handle cases where the gradient is zero."""
    pass


def save_cam_with_alpha(image, gcam, alpha=0.5):

    # Convert grayscale image to 3 channels if needed
    if len(image.shape) == 2:  # Grayscale image (H, W)
        image = np.stack([image] * 3, axis=-1)  # Convert to (H, W, 3)

    # Normalize the Grad-CAM values to [0, 1]
    # Normalize the Grad-CAM values to [0, 1], handling zero gradients
    gcam_min = np.min(gcam)
    gcam_max = np.max(gcam)

    try:
        if gcam_max == gcam_min:  # If all values are zero, raise an error
            raise ZeroGradientError(
                "Gradient map contains only zero values, cannot overlay.")
        # Normalize gradient map
        gcam = (gcam - gcam_min) / (gcam_max - gcam_min)
    except ZeroGradientError as e:
        print(f"Error: {e}")
        # Handle the error (for example, return the original image or skip processing)
        return image, image  # Return the original image if error occurs

    # Resize Grad-CAM to match the image dimensions (224x224)
    # Get height and width (height, width) from image shape
    h, w = image.shape[:2]
    gcam_resized = np.array(Image.fromarray(
        gcam).resize((w, h), Image.BILINEAR))

    # Apply a colormap (similar to cv2.applyColorMap)
    # Apply colormap and select RGB channels
    gcam_colored = plt.cm.jet(gcam_resized)[:, :, :3] * 255
    gcam_colored = gcam_colored.astype(np.uint8)

    # Add Grad-CAM on top of the original image using alpha blending
    heatmap = gcam_colored.astype(np.float64)

    # checking dimension of image and heatmaps
    print(f'heatmap:{heatmap.shape}')
    print(f'image:{image.shape}')

    overlaid_image = (alpha * heatmap + (1 - alpha) *
                      image.astype(np.float64)).astype(np.uint8)

    return image, overlaid_image

In [None]:
import utils
import sys
import os

# Get the absolute path of the 'src' directory
src_path = os.path.abspath("src")

# Add 'src' to system path
if src_path not in sys.path:
    sys.path.append(src_path)

## New way to solve OOM problem

- https://chatgpt.com/c/683eac2a-0d4c-8005-a5b7-71fd71ce4a9d

In [None]:
import torch


def compute_group_embeddings_and_grads(gcam, dataloader, device):
    """
    For a group, compute:
    - accumulated sum of embeddings (detached)
    - total number of samples
    - accumulated gradients w.r.t. input images (detached)
    """
    latent_dim = 2048  # or whatever your embedding size is
    group_sum = torch.zeros(latent_dim, device=device)
    group_count = 0
    grads_sum = None
    total_samples = 0

    for images in dataloader:
        # enable gradient on inputs
        images = images.to(device).requires_grad_()
        embeddings = gcam.forward(images)  # [batch_size, latent_dim]
        embeddings = embeddings.view(
            embeddings.size(0), -1)  # flatten if needed

        batch_size = embeddings.size(0)
        batch_sum = embeddings.sum(dim=0)  # sum embeddings in batch

        # Update running sums (detach to avoid graph growth)
        group_sum += batch_sum.detach()
        group_count += batch_size

        # Compute mean embedding so far
        mean_embed = group_sum / group_count

        # Define test statistic for *this batch* contribution
        # This is a placeholder — actual stat will be computed later after both groups processed
        # So here, just compute squared norm of mean_embed (as dummy scalar to get grads)
        # We will later combine gradients across groups.
        batch_test_stat = (mean_embed ** 2).sum()

        # Compute gradients of batch_test_stat w.r.t input images
        grads = torch.autograd.grad(
            batch_test_stat, images, retain_graph=False)[0]

        if grads_sum is None:
            grads_sum = grads.detach()
        else:
            grads_sum += grads.detach()

        total_samples += batch_size

        # Free up memory
        del embeddings, batch_sum, mean_embed, batch_test_stat, grads, images
        torch.cuda.empty_cache()

    # Average gradients over all samples processed
    grads_sum /= total_samples

    return group_sum, group_count, grads_sum

In [None]:

def main_gradcam_test_stat(gcam, dataloader1, dataloader2, device):
    # Compute sums and gradients for both groups
    sum1, count1, grads1 = compute_group_embeddings_and_grads(
        gcam, dataloader1, device)
    sum2, count2, grads2 = compute_group_embeddings_and_grads(
        gcam, dataloader2, device)

    # Compute final group means
    mean1 = sum1 / count1
    mean2 = sum2 / count2

    # Compute test statistic: squared L2 norm of difference of means
    test_stat = ((mean1 - mean2) ** 2).sum()

    # Now compute the *true* gradient of test_stat w.r.t group means
    # gradient of test_stat w.r.t mean1 and mean2
    grad_test_stat = 2 * (mean1 - mean2)

    # Combine batch-wise accumulated gradients from groups scaled by grad_test_stat
    # This rescales the earlier dummy grads to represent true gradient of T w.r.t inputs
    combined_grads = grads1 * \
        grad_test_stat.unsqueeze(0) - grads2 * grad_test_stat.unsqueeze(0)

    # combined_grads is the gradient of test_stat w.r.t input images across groups
    # You can now use combined_grads for GradCAM visualization or further processing

    return test_stat.item(), combined_grads


# Example usage:
# device = torch.device("cuda")
# test_stat_value, grads = main_gradcam_test_stat(gcam, dataloader_group1, dataloader_group2, device)

## Going for experiments 

- Data is required