In [1]:
import torch
import os
from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms
import torch.nn as nn
import torch.optim as optim
!pip install open_clip_torch==2.23.0 transformers==4.35.2 matplotlib
from open_clip import create_model_and_transforms, get_tokenizer
import pandas as pd
import numpy as np

# Custom Dataset to handle nested subfolders
class CustomImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        self.class_to_idx = self.get_class_to_idx()

        # Populate image paths and corresponding labels
        self.load_images()

    def get_class_to_idx(self):
        """Map class names to labels based on the top-level directories (class folders)."""
        classes = sorted([d.name for d in os.scandir(self.root_dir) if d.is_dir()])
        return {cls_name: idx for idx, cls_name in enumerate(classes)}

    def load_images(self):
        """Recursively find all images in the nested folders and associate them with labels."""
        for class_name, class_idx in self.class_to_idx.items():
            class_folder = os.path.join(self.root_dir, class_name)
            for root, _, files in os.walk(class_folder):
                for file in files:
                    if file.endswith(('.png', '.jpg', '.jpeg')):
                        file_path = os.path.join(root, file)
                        self.image_paths.append(file_path)
                        self.labels.append(class_idx)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        label = self.labels[idx]

        # Load image
        image = Image.open(image_path).convert('RGB')

        # Apply transformations if provided
        if self.transform:
            image = self.transform(image)

        return image, label

# Define the transform to resize and normalize images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Load the training and validation datasets using the custom dataset
train_dataset = CustomImageDataset(root_dir='/kaggle/input/dataset01/Dataset/training', transform=transform)
val_dataset = CustomImageDataset(root_dir='/kaggle/input/dataset01/Dataset/validation', transform=transform)

# Create DataLoader for training and validation
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False)

# Display class mapping (class names to index)
print(train_dataset.class_to_idx)

# Load the BiomedCLIP model and tokenizer from Hugging Face hub
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model, preprocess, _ = create_model_and_transforms('hf-hub:microsoft/BiomedCLIP-PubMedBERT_256-vit_base_patch16_224')
# Move the model to the device
model.to(device)

# Load the tokenizer for PubMedBERT
tokenizer = get_tokenizer('hf-hub:microsoft/BiomedCLIP-PubMedBERT_256-vit_base_patch16_224')

# Manually define the class names
class_names = ['Angioectasia', 'Bleeding', 'Erosion', 'Erythema', 'Foreign Body', 
               'Lymphangiectasia', 'Normal', 'Polyp', 'Ulcer', 'Worms']

# Function to extract text embeddings using PubMedBERT for class names
def get_text_embeddings(class_names, template='This is an endoscopic photo of '):
    # Tokenize the class names by concatenating them with the template
    tokenized_inputs = tokenizer([template + name for name in class_names], context_length=256).to(device)
    
    # Generate text embeddings using the model
    with torch.no_grad():
        text_embeddings = model.encode_text(tokenized_inputs)
    
    return text_embeddings

# Extract embeddings for the class names
class_name_embeddings = get_text_embeddings(class_names)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Training loop
def train_multimodal(model, train_loader, class_name_embeddings, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        
        # Forward pass for images
        image_features = model.encode_image(images)
        
        # Forward pass for text embeddings (class names)
        text_features = class_name_embeddings[labels]
        
        # Compute cosine similarity between image and text features
        logits = torch.matmul(image_features, text_features.T)
        
        # Loss computation
        loss = criterion(logits, labels)
        
        # Backward pass and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(train_loader)



# Validation Loop with Prediction Saving
def validate_and_save_predictions(model, val_loader, class_name_embeddings, device, output_path='predictions.xlsx'):
    model.eval()
    predictions = []
    image_paths = []
    
    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            
            # Get image embeddings
            image_features = model.encode_image(images)
            
            # Compute similarity with class name embeddings
            logits = torch.matmul(image_features, class_name_embeddings.T)
            probabilities = torch.softmax(logits, dim=1).cpu().numpy()
            
            # Store predicted probabilities
            predictions.extend(probabilities)
            
            # Access image paths directly from the dataset
            batch_image_paths = [val_loader.dataset.image_paths[idx] for idx in range(len(labels))]
            image_paths.extend(batch_image_paths)
    
    # Save predictions to Excel
    save_predictions_to_excel(image_paths, np.array(predictions), output_path)

# Function to save predictions to Excel
def save_predictions_to_excel(image_paths, y_pred, output_path):
    classes = ['Angioectasia', 'Bleeding', 'Erosion', 'Erythema', 'Foreign Body', 
               'Lymphangiectasia', 'Normal', 'Polyp', 'Ulcer', 'Worms']
    
    data = {'image_path': image_paths}
    for i, class_name in enumerate(classes):
        data[class_name] = y_pred[:, i]
    
    predicted_classes = np.argmax(y_pred, axis=1)
    data['predicted_class'] = [classes[i] for i in predicted_classes]
    
    df = pd.DataFrame(data)
    df.to_excel(output_path, index=False)

# Training and Validation
num_epochs = 1
for epoch in range(num_epochs):
    train_loss = train_multimodal(model, train_loader, class_name_embeddings, optimizer, criterion, device)

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss:.4f}')
    
    # Validate and save predictions at the end of each epoch
    if epoch == num_epochs - 1:
        validate_and_save_predictions(model, val_loader, class_name_embeddings, device, output_path='validation_predictions.xlsx')

# Save the trained model
torch.save(model, 'best_model.pth')
# Testing on new images
def test_model_on_images(model, test_image_dir, class_name_embeddings, output_excel_path, preprocess, device):
    image_paths = []
    
    # Traverse test image directory and collect all image paths
    for root, _, files in os.walk(test_image_dir):
        for file in files:
            if file.endswith(('.png', '.jpg', '.jpeg')):
                image_paths.append(os.path.join(root, file))
    
    results = []
    
    # Class names for reference
    classes = ['Angioectasia', 'Bleeding', 'Erosion', 'Erythema', 'Foreign Body', 
               'Lymphangiectasia', 'Normal', 'Polyp', 'Ulcer', 'Worms']
    
    # Looping over test images
    for img_path in image_paths:
        image = Image.open(img_path).convert('RGB')
        image_tensor = preprocess(image).unsqueeze(0).to(device)
        
        with torch.no_grad():
            # Encode the image
            image_features = model.encode_image(image_tensor)
            
            # Compute similarity between image features and class name embeddings
            logits = torch.matmul(image_features, class_name_embeddings.T)
            probabilities = torch.softmax(logits, dim=1).cpu().numpy()
            
            # Get the most probable class
            predicted_class_idx = probabilities.argmax(axis=1).item()
            predicted_class = classes[predicted_class_idx]
            
            # Store image path, all class probabilities, and predicted class
            result = {'image_path': os.path.basename(img_path)} 
            for i, class_name in enumerate(classes):
                result[class_name] = probabilities[0, i]  # Add the probability for each class
            result['predicted_class'] = predicted_class  # Add predicted class
            
            results.append(result)
    
    # Convert the results to a DataFrame and save to Excel
    results_df = pd.DataFrame(results)
    results_df.to_excel(output_excel_path, index=False)

# Example of testing the model on new images
test_model_on_images(model, '/kaggle/input/cvc-test-2024/Testing set/Images', class_name_embeddings, 'test_predictions.xlsx', preprocess, device)


Collecting open_clip_torch==2.23.0
  Downloading open_clip_torch-2.23.0-py3-none-any.whl.metadata (30 kB)
Collecting transformers==4.35.2
  Downloading transformers-4.35.2-py3-none-any.whl.metadata (123 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m123.5/123.5 kB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
Collecting ftfy (from open_clip_torch==2.23.0)
  Downloading ftfy-6.3.0-py3-none-any.whl.metadata (7.1 kB)
Collecting tokenizers<0.19,>=0.14 (from transformers==4.35.2)
  Downloading tokenizers-0.15.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Downloading open_clip_torch-2.23.0-py3-none-any.whl (1.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m44.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading transformers-4.35.2-py3-none-any.whl (7.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.9/7.9 MB[0m [31m86.2 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hDownloadin

  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(


{'Angioectasia': 0, 'Bleeding': 1, 'Erosion': 2, 'Erythema': 3, 'Foreign Body': 4, 'Lymphangiectasia': 5, 'Normal': 6, 'Polyp': 7, 'Ulcer': 8, 'Worms': 9}


open_clip_pytorch_model.bin:   0%|          | 0.00/784M [00:00<?, ?B/s]

open_clip_config.json:   0%|          | 0.00/707 [00:00<?, ?B/s]



config.json:   0%|          | 0.00/385 [00:00<?, ?B/s]

  _torch_pytree._register_pytree_node(
  checkpoint = torch.load(checkpoint_path, map_location=map_location)


tokenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/225k [00:00<?, ?B/s]

Epoch [1/1], Loss: 6.0864


# **Graphs**

In [5]:
import matplotlib.pyplot as plt

# Function to plot accuracy
def plot_accuracy(train_accuracies, val_accuracies, num_epochs):
    epochs = range(1, num_epochs + 1)
    
    plt.figure(figsize=(10, 6))
    plt.plot(epochs, train_accuracies, 'b', label='Training Accuracy')
    plt.plot(epochs, val_accuracies, 'r', label='Validation Accuracy')
    plt.title('Training and Validation Accuracy over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

# Example usage: After training, pass in train and val accuracies collected during training
# plot_accuracy(train_accuracies, val_accuracies, num_epochs)


In [6]:
# Function to plot loss
def plot_loss(train_losses, val_losses, num_epochs):
    epochs = range(1, num_epochs + 1)
    
    plt.figure(figsize=(10, 6))
    plt.plot(epochs, train_losses, 'b', label='Training Loss')
    plt.plot(epochs, val_losses, 'r', label='Validation Loss')
    plt.title('Training and Validation Loss over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

# Example usage: After training, pass in train and val losses collected during training
# plot_loss(train_losses, val_losses, num_epochs)


In [7]:
import seaborn as sns
from sklearn.metrics import confusion_matrix

# Function to plot confusion matrix
def plot_confusion_matrix(true_labels, pred_labels, class_names):
    cm = confusion_matrix(true_labels, pred_labels)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.show()

# Example usage: After predictions, pass in true and predicted labels along with class names
# plot_confusion_matrix(true_labels, pred_labels, class_names)


In [8]:
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize

# Function to plot ROC curve for each class
def plot_roc_curve(true_labels, pred_probs, class_names):
    # Binarize the true labels for multi-class ROC
    true_labels_bin = label_binarize(true_labels, classes=range(len(class_names)))
    n_classes = len(class_names)
    
    plt.figure(figsize=(10, 8))
    
    # Plot ROC curve for each class
    for i in range(n_classes):
        fpr, tpr, _ = roc_curve(true_labels_bin[:, i], pred_probs[:, i])
        roc_auc = auc(fpr, tpr)
        plt.plot(fpr, tpr, label=f'{class_names[i]} (AUC = {roc_auc:.2f})')
    
    plt.plot([0, 1], [0, 1], 'k--', label='Random Guess')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curves')
    plt.legend(loc="lower right")
    plt.show()

# Example usage: After predictions, pass in true labels and predicted probabilities
# plot_roc_curve(true_labels, pred_probs, class_names)


In [9]:
from sklearn.metrics import precision_recall_curve

# Function to plot Precision-Recall curve for each class
def plot_precision_recall_curve(true_labels, pred_probs, class_names):
    # Binarize the true labels for multi-class PR curve
    true_labels_bin = label_binarize(true_labels, classes=range(len(class_names)))
    n_classes = len(class_names)
    
    plt.figure(figsize=(10, 8))
    
    # Plot Precision-Recall curve for each class
    for i in range(n_classes):
        precision, recall, _ = precision_recall_curve(true_labels_bin[:, i], pred_probs[:, i])
        plt.plot(recall, precision, label=f'{class_names[i]}')
    
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curves')
    plt.legend(loc="lower left")
    plt.show()

# Example usage: After predictions, pass in true labels and predicted probabilities
# plot_precision_recall_curve(true_labels, pred_probs, class_names)


In [10]:
train_accuracies = []
val_accuracies = []
train_losses = []
val_losses = []

for epoch in range(num_epochs):
    train_loss, train_acc = train_multimodal(model, train_loader, class_name_embeddings, optimizer, criterion, device)  # Your train function
    val_loss, val_acc = validate_multimodal(model, val_loader, class_name_embeddings, optimizer, criterion, device)  # Your validation function
    
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)

# After training, generate the graphs
plot_accuracy(train_accuracies, val_accuracies, num_epochs)
plot_loss(train_losses, val_losses, num_epochs)


TypeError: cannot unpack non-iterable float object

In [None]:
import numpy as np
from sklearn.metrics import roc_auc_score, precision_recall_fscore_support, confusion_matrix, balanced_accuracy_score
from sklearn.preprocessing import label_binarize
import json

def generate_metrics_report(y_true, y_pred):
    # Class names for reference
    class_names = ['Angioectasia', 'Bleeding', 'Erosion', 'Erythema', 'Foreign Body', 
                   'Lymphangiectasia', 'Normal', 'Polyp', 'Ulcer', 'Worms']
    
    # Initialize metrics dictionary
    metrics_report = {}
    
    # Ensure y_true and y_pred are numpy arrays
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    
    # Convert one-hot encoded y_true to class labels
    y_true_labels = np.argmax(y_true, axis=1)
    y_pred_labels = np.argmax(y_pred, axis=1)
    
    # Confusion matrix
    cm = confusion_matrix(y_true_labels, y_pred_labels)
    
    # Specificity (True Negative Rate)
    specificity = []
    for i in range(len(class_names)):
        tn = cm.sum() - (cm[i, :].sum() + cm[:, i].sum() - cm[i, i])
        fp = cm[:, i].sum() - cm[i, i]
        specificity.append(tn / (tn + fp) if (tn + fp) > 0 else 0)
    
    # Sensitivity (Recall), Precision, F1 Score, and Support
    precision, recall, f1, _ = precision_recall_fscore_support(y_true_labels, y_pred_labels, average=None, labels=range(len(class_names)))
    
    # ROC AUC score for each class (One-vs-Rest)
    roc_auc = []
    for i in range(len(class_names)):
        roc_auc.append(roc_auc_score(y_true[:, i], y_pred[:, i]))
    
    # Mean AUC across all classes
    mean_auc = np.mean(roc_auc)
    
    # Balanced Accuracy Score
    balanced_acc = balanced_accuracy_score(y_true_labels, y_pred_labels)
    
    # Aggregated Metrics
    metrics_report['mean_auc'] = mean_auc
    metrics_report['balanced_accuracy'] = balanced_acc
    
    # Class-wise Metrics
    class_metrics = {}
    for i, class_name in enumerate(class_names):
        class_metrics[class_name] = {
            'specificity': specificity[i],
            'roc_auc': roc_auc[i],
            'precision': precision[i],
            'recall': recall[i],
            'f1_score': f1[i]
        }
    
    metrics_report['class_metrics'] = class_metrics
    
    # Convert to JSON string
    return json.dumps(metrics_report, indent=4)
