In [None]:
# importing necessary libraries 

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision 
from torchvision import transforms, datasets, models 
from torch.nn import functional as F 
from PIL import Image
import pandas as pd 
import numpy as np
import tensorflow as tf
import os
import sys
import glob
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt 
import cv2
import json 
from tqdm import tqdm
from sklearn.decomposition import PCA 
from sklearn.preprocessing import StandardScaler
from skimage.transform import rotate, AffineTransform
import random
from scipy import ndimage
import openslide
import matplotlib.patches as patches
from matplotlib.patches import Polygon
import xml.etree.ElementTree as ET 
import seaborn as sns
from sklearn.metrics import confusion_matrix

**EXTRACTING PATCH EMBEDDINGS** 

In [None]:
# feature extractor for patches 

class ViT(nn.Module):
    def __init__(self,num_classes):
        #define necessary layers
        super().__init__()
        self.num_classes = num_classes
        self.model = models.vit_b_32(weights='DEFAULT')
        
        # Unfreeze model weights
        for param in self.model.parameters():
            param.requires_grad = False 
        
    def forward(self,X):
        #define forward pass here
        X = self.model(X)
        return X        
            
model = ViT(1)
model = model.to('cuda')

In [None]:
# transform function for patches 

transform = torchvision.transforms.Compose(
    [ 
        torchvision.transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),  # normalization
    ] 
) 

In [None]:
# function to obtain feature embedding for a given patch 

def get_feature_vector(img): 
    img = torch.from_numpy(img.astype(np.double)) 
    img = img.to('cuda')
    img = img.permute(2, 0, 1) 
    img = transform(img) 
    img = img.float() 
    img = torch.unsqueeze(img, dim=0) 
    return model(img) 

In [None]:
# Get a list of all image ids

benign_ids = os.listdir('/kaggle/input/bach-breast-cancer-histology-images/ICIAR2018_BACH_Challenge/ICIAR2018_BACH_Challenge/Photos/Benign')
insitu_ids = os.listdir('/kaggle/input/bach-breast-cancer-histology-images/ICIAR2018_BACH_Challenge/ICIAR2018_BACH_Challenge/Photos/InSitu')
invasive_ids = os.listdir('/kaggle/input/bach-breast-cancer-histology-images/ICIAR2018_BACH_Challenge/ICIAR2018_BACH_Challenge/Photos/Invasive')
normal_ids = os.listdir('/kaggle/input/bach-breast-cancer-histology-images/ICIAR2018_BACH_Challenge/ICIAR2018_BACH_Challenge/Photos/Normal')

# Patch Embedding Extraction

This function `get_patch_embeddings` is used to extract feature vectors (embeddings) from patches of histology images. Each patch is extracted from a larger image, and its corresponding feature vector is computed. The function also generates one-hot encoded labels for each image based on its class (Benign, InSitu, Invasive, or Normal).

In [None]:
def get_patch_embeddings(img_ids, label): 
    '''
    Function to return feature vectors along with labels for patches.
    Inputs:
    - img_ids: List
    - label: String
    Returns:
    - feature_vectors: List
    - labels: List
    '''
    feature_vectors = [] 
    labels = [] 
    
    for img_id in tqdm(img_ids):         
        if img_id.endswith('tif'):
            # Path to the TIF file
            img_path = '/kaggle/input/bach-breast-cancer-histology-images/ICIAR2018_BACH_Challenge/ICIAR2018_BACH_Challenge/Photos/' + label + '/' + img_id
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) 

            # Check if the image was read successfully
            if img is not None:
                #print(img.shape)
                pass
            else:
                print("Error: Could not read the image.")

            width, height, channels = img.shape

            patch_no = 0

            # loop to slide vertically for patches 
            for i in range(int(0), int(height), 112): 
                # loop to slide horizontally 
                for j in range(int(0), int(width), 112): 

                    # Read a region of the slide - current patch
                    curr_patch = np.array(img) 
                    curr_patch = curr_patch[i:i+224, j:j+224]

                    if curr_patch.shape != (224, 224, 3):
                        continue
                    patch_no += 1
                    
                    # storing feature vector
                    feature_vector = get_feature_vector(curr_patch) 
                    squeezed_arr = np.squeeze(feature_vector) 

                    feature_vectors.append(squeezed_arr) 

                    # Storing one hot encoding labels 
                    labels.append([float(label=='Benign'), float(label=='InSitu'),float(label=='Invasive'), float(label=='Normal')]) 

    return feature_vectors, labels

In [None]:
# obtaining patch embeddings for all wsis 

benign_patch_embeddings, benign_patch_labels = get_patch_embeddings(benign_ids, 'Benign') 
insitu_patch_embeddings, insitu_patch_labels = get_patch_embeddings(insitu_ids, 'InSitu') 
invasive_patch_embeddings, invasive_patch_labels= get_patch_embeddings(invasive_ids, 'Invasive')
normal_patch_embeddings, normal_patch_labels = get_patch_embeddings(normal_ids, 'Normal') 

In [None]:
# function to apply PCA to reduce dimensions to 500 

def getPCA(feature_map, n):
    pca = PCA(n_components = n)
    feature_map = [f.cpu().detach() for f in feature_map]
    pca.fit(feature_map)
    return pca.transform(feature_map) 

In [None]:
# applying PCA 

benign_features = getPCA(benign_patch_embeddings, 224) 
insitu_features = getPCA(insitu_patch_embeddings, 224) 
invasive_features = getPCA(invasive_patch_embeddings, 224) 
normal_features = getPCA(normal_patch_embeddings, 224)

**BAG FORMATION** 

In [None]:
def create_bags(img_ids, feature_maps, labels, no_of_patches):
    
    i = 0
    bags = []
    bag_labels = []
    img_ids_against_bags = []
    
    for img in img_ids:
        if img.endswith('.tif'):
            bags.append(feature_maps[i: i + no_of_patches])
            bag_labels.append(labels[i])   
            img_ids_against_bags.append(img) 
            i += no_of_patches
    
    return bags, bag_labels, img_ids_against_bags

In [None]:
# creating bags 

benign_bags, benign_bag_labels, benign_ids_against_bags = create_bags(benign_ids, benign_features, benign_patch_labels, 168) 
insitu_bags, insitu_bag_labels, insitu_ids_against_bags = create_bags(insitu_ids, insitu_features, insitu_patch_labels, 168) 
invasive_bags, invasive_bag_labels, invasive_ids_against_bags = create_bags(invasive_ids, invasive_features, invasive_patch_labels, 168) 
normal_bags, normal_bag_labels, normal_ids_against_bags = create_bags(normal_ids, normal_features, normal_patch_labels, 168) 

# Train, Test, and Validation Split

This code splits the dataset into train, validation, and test sets in a sequential manner. Data is first organized by class (benign, insitu, invasive, and normal), and then split ratio follows an 80-10-10 rule.

In [None]:
# Creating train, test and validation spilt

bags = []
labels = []
ids_against_bags = []

for i in range(len(benign_bags)):  # Assuming all classes have the same length
    bags.extend([benign_bags[i], insitu_bags[i], invasive_bags[i], normal_bags[i]])
    labels.extend([benign_bag_labels[i], insitu_bag_labels[i], invasive_bag_labels[i], normal_bag_labels[i]])
    ids_against_bags.extend([benign_ids_against_bags[i], insitu_ids_against_bags[i], invasive_ids_against_bags[i], normal_ids_against_bags[i]])

# Manual split: 80% train, 10% validation, 10% test
n_total = len(bags)
n_train = int(0.8 * n_total)
n_val = int(0.1 * n_total)

# Train set
train_bags = bags[:n_train]
train_labels = labels[:n_train]
train_img_ids_against_bag = ids_against_bags[:n_train]
                             
# Validation set
val_bags = bags[n_train:n_train + n_val]
val_labels = labels[n_train:n_train + n_val]
val_img_ids_against_bag = ids_against_bags[:n_train + n_val]

# Test set
test_bags = bags[n_train + n_val:]
test_labels = labels[n_train + n_val:]
test_img_ids_against_bag = ids_against_bags[n_train + n_val:]

In [None]:
import matplotlib.pyplot as plt

# class distribution of train, validation, and test datasets 

def plot_bag_labels(ax, title, labels):
    # Convert list of labels (one-hot encoded) to a list of class indices
    class_indices = [label.index(1) for label in labels]
    
    # Count occurrences of each class
    class_counts = [class_indices.count(i) for i in range(4)]
    
    # Plot the distribution
    ax.bar(['Benign', 'Insitu Carcinoma', 'Invasive Carcinoma', 'Normal'], class_counts)
    ax.set_title(title)
    ax.set_ylabel('Count')

# Plotting
fig, axs = plt.subplots(1, 3, figsize=(15, 5))

plot_bag_labels(axs[0], 'TRAIN BAG LABELS', train_labels)
plot_bag_labels(axs[1], 'VALIDATION BAG LABELS', val_labels)
plot_bag_labels(axs[2], 'TEST BAG LABELS', test_labels)

plt.tight_layout()
plt.show()

**CUSTOM DATASETS** 

In [None]:
# Custom dataset to load feature vectors & binary class labels 

class CustomDataset(Dataset):
    def __init__(self, bags, labels):
        self.bags = bags
        self.labels = labels

    def __len__(self):
        return len(self.bags)

    def __getitem__(self, idx):
        
        x = self.bags[idx]
        if len(x) > 224:
            feat_map = x[:224]
        else:
            feat_map = []
            feat_map.extend([np.array([0.0] * 224)] * int((224 - len(x)) / 2)) # black padding above
            feat_map.extend(x) # feature map of patches in between
            feat_map.extend([np.array([0.0] * 224)] * int(((224 - len(x)) / 2) + 1)) # black padding below
            feat_map = feat_map[:224]
        
        x = np.array(feat_map) 
        
        x = torch.tensor(x).float()
        x = nn.functional.normalize(x, dim=0, p=2)  # p=2 for L2 norm, dim=0 for cols
        bag = x
        label = self.labels[idx]
        label = torch.tensor(label)

        return torch.stack([bag]), label

In [None]:
# Create instances of custom datasets

train_dataset = CustomDataset(train_bags, train_labels)
valid_dataset = CustomDataset(val_bags, val_labels)
test_dataset = CustomDataset(test_bags, test_labels)

**DATA LOADERS** 

In [None]:
batch_size = 16  

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers = 0)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, num_workers = 0)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers = 0)

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

**FINAL MODEL** 

In [None]:
# defining final pipeline to process prepared bags 

class Pipeline(nn.Module):
    def __init__(self,num_classes):
        #define necessary layers
        super().__init__()
        self.num_classes = num_classes
          
        self.base = models.vit_b_32(weights='DEFAULT')
        # Unfreeze model weights
        for param in self.base.parameters():
            param.requires_grad = True
        
        self.flatten = nn.Flatten()
        
        self.head = nn.Sequential(
            nn.Linear(1000, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 4)
        ) 
    
    def attention(self, query, key, value, mask=None, dropout=None):
        "Compute 'Scaled Dot Product Attention'"
        d_k = query.size(-1)
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        p_attn = scores.softmax(dim=-1)
        if dropout is not None:
            p_attn = dropout(p_attn)
        return torch.matmul(p_attn, value), p_attn
    
    def forward(self,X):
        attended_values, attention_scores = self.attention(X, X, X)
        X = self.base(attended_values)
        X = self.flatten(X)
        X = self.head(X)
        return X, F.softmax(X)

# modify this depending on the distribution of classes
pos_weight = torch.tensor([1.0, 1.5, 2.5, 0.15])                      # order of weights: benign, in-situ, invasive 
pos_weight = pos_weight.to(device)
loss_fn = nn.CrossEntropyLoss(weight = pos_weight)
model = Pipeline(1)
#model = nn.DataParallel(model)

In [None]:
# modifying first layer for one color channel

model.base.features[0][0] = nn.Conv2d(1, 32, kernel_size= (3,3), stride = 2, padding= 1, bias=False)

In [None]:
# Check if GPU is available
if torch.cuda.is_available():
    model = model.to('cuda')
    print('available')

#criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(params=model.parameters(), lr=0.00001)

In [None]:
!export CUDA_LAUNCH_BLOCKING=1

**TRAINING** 

In [None]:
# Initialize empty lists to store loss and accuracy for training and validation 

train_losses = []
valid_losses = []
train_accuracies = []
valid_accuracies = []
roc_values_train = []
roc_values_val = []

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc, classification_report
import math 

# Training loop
num_epochs = 20  # Adjust as needed

for epoch in range(num_epochs):
    model.train()
    y_true_train = []
    y_scores_train = []
    train_loss = 0
    correct = 0
    total = 0
    for images, labels in tqdm(train_loader):
        # Move data to GPU if available
        images = images.to(device)
        labels = labels.to(device)
        labels = torch.argmax(labels, dim = 1)
        
        optimizer.zero_grad()
        outputs_without_softmax, outputs_with_softmax = model(images)
        loss = loss_fn(outputs_without_softmax.squeeze(-1), labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()  # L+=l.item()
        predicted = torch.argmax(outputs_with_softmax, dim = 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        y_true_train.extend(labels.cpu().detach().numpy())
        y_scores_train.extend(outputs_with_softmax.cpu().detach().numpy())

    train_losses.append(train_loss / len(train_loader))
    train_accuracies.append(100 * correct / total)

    # roc auc logic
    #fpr_train, tpr_train, _ = roc_curve(y_true_train, y_scores_train)
    #roc_auc_train = auc(fpr_train, tpr_train)
    #roc_values_train.append(roc_auc_train)

    # Plot ROC curve for the training set

    # Validate your model after each epoch if needed
    model.eval()
    valid_loss = 0
    correct = 0
    total = 0
    y_true_val = []
    y_scores_val = []
    with torch.no_grad():
        for images, labels in tqdm(valid_loader):
            # Move data to GPU if available
            images = images.to(device)
            labels = labels.to(device)
            labels = torch.argmax(labels, dim = 1)

            outputs_without_softmax, outputs_with_softmax = model(images)
            loss = loss_fn(outputs_without_softmax.squeeze(-1), labels)
            valid_loss += loss.item()  # L+=l.item()
            predicted = torch.argmax(outputs_with_softmax, dim = 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            y_true_val.extend(labels.cpu().detach().numpy())
            y_scores_val.extend(outputs_with_softmax.cpu().detach().numpy())

    valid_losses.append(valid_loss / len(valid_loader))
    valid_accuracies.append(100 * correct / total)
    
    # roc auc logic
    #fpr_val, tpr_val, _ = roc_curve(y_true_val, y_scores_val)
    #roc_auc_val = auc(fpr_val, tpr_val)
    #roc_values_val.append(roc_auc_val)

    # Classification report
    print(f'Epoch {epoch + 1}, Train Accuracy: {train_accuracies[-1]:.2f}%, Train Loss: {train_losses[-1]:.2f}%, Val Accuracy: {valid_accuracies[-1]:.2f}%, Val Loss: {valid_losses[-1]:.2f}%')

    y_true_val = np.array(y_true_val)
    y_pred_val = np.array(torch.argmax(torch.tensor(y_scores_val), dim = 1)).astype(int)

    print("Validation Classification Report:")
    print(classification_report(y_true_val, y_pred_val))

In [None]:
def display_training_curves(train_losses, valid_losses, train_accuracies, valid_accuracies, subplot):
    if subplot % 10 == 1:  # set up the subplots on the first call
        plt.subplots(figsize=(12, 5), facecolor='#F0F0F0')
        plt.tight_layout()
    ax = plt.subplot(subplot)
    ax.set_facecolor('#F8F8F8')
    ax.plot(train_losses)
    ax.plot(valid_losses)
    ax.set_title('Model Loss')
    ax.set_ylabel('Loss')
    ax.set_xlabel('Epoch')
    ax.legend(['Train', 'Valid'])

    ax = plt.subplot(subplot + 1)
    ax.set_facecolor('#F8F8F8')
    ax.plot(train_accuracies)
    ax.plot(valid_accuracies)
    ax.set_title('Model Accuracy')
    ax.set_ylabel('Accuracy')
    ax.set_xlabel('Epoch')
    ax.legend(['Train', 'Valid'])

In [None]:
# Plot the training curves
display_training_curves(train_losses, valid_losses, train_accuracies, valid_accuracies, 121)

**INFERENCE (BAG LEVEL)** 

In [None]:
from sklearn.metrics import confusion_matrix

model.eval()
test_loss = 0
correct = 0
total = 0
y_true_test = []
y_pred_test = []
y_probs = []

with torch.no_grad():
    for images, labels in tqdm(test_loader):
        # Move data to GPU if available
        images = images.to(device)
        labels = labels.to(device)
        labels = torch.argmax(labels, dim = 1)

        outputs_without_softmax, outputs_with_softmax = model(images)
        loss = loss_fn(outputs_without_softmax.squeeze(-1), labels)
        test_loss += loss.item()  # L+=l.item()
        predicted = torch.argmax(outputs_with_softmax, dim = 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        y_true_test.extend(labels.cpu().detach().numpy())
        y_pred_test.extend(predicted.cpu().detach().numpy())
        y_probs.extend(outputs_with_softmax.cpu().detach().numpy())

test_loss /= len(test_loader)
test_accuracy = 100 * correct / total

# Calculate ROC-AUC for test set
#fpr_test, tpr_test, _ = roc_curve(y_true_test, y_probs)
#roc_auc_test = auc(fpr_test, tpr_test)

print(f'Test Accuracy: {test_accuracy:.2f}%, Test Loss: {test_loss:.2f}%')

# Classification report for test set
print("Test Classification Report:")
print(classification_report(y_true_test, y_pred_test))

In [None]:
# Generate confusion matrix
conf_matrix = confusion_matrix(y_true_test, y_pred_test)

# Plot confusion matrix
plt.figure(figsize=(5, 5))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap="Blues", cbar=False)
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Bag Level Confusion Matrix')
plt.show()