In [None]:
import gdown
import zipfile
import os

# Google Drive file ID and URL
file_id = '1c0BXEuDy8Cmm2jfN0YYGkQxFZd2ZIoLg'
url = f'https://drive.google.com/uc?id={file_id}'

# Output file path
output_file = '/content/dataset.zip'

# Download the file
gdown.download(url, output_file, quiet=False)


Downloading...
From (original): https://drive.google.com/uc?id=1c0BXEuDy8Cmm2jfN0YYGkQxFZd2ZIoLg
From (redirected): https://drive.google.com/uc?id=1c0BXEuDy8Cmm2jfN0YYGkQxFZd2ZIoLg&confirm=t&uuid=02bf9499-cbce-4933-93d0-22c5e76cc995
To: /content/dataset.zip
100%|██████████| 1.11G/1.11G [00:17<00:00, 61.7MB/s]


'/content/dataset.zip'

In [None]:
# Directory to extract the contents
extract_dir = '/content/dataset/'

# Create the extraction directory if it doesn't exist
os.makedirs(extract_dir, exist_ok=True)

# Extract the ZIP file
with zipfile.ZipFile(output_file, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)


In [None]:
!pip install rouge

Collecting rouge
  Downloading rouge-1.0.1-py3-none-any.whl.metadata (4.1 kB)
Downloading rouge-1.0.1-py3-none-any.whl (13 kB)
Installing collected packages: rouge
Successfully installed rouge-1.0.1


# Code Description

This code implements a deep learning model for generating reports from medical images, specifically X-ray images. It leverages a combination of the T5 transformer model and a ResNet50 model for this task. Below is a high-level overview of the key components and functionalities of the code:

## `R2GenModel` Class

The `R2GenModel` class extends `torch.nn.Module` and integrates both a transformer model (T5) and a convolutional neural network (ResNet50) to generate textual reports from images.

- **Initialization (`__init__` method)**:
  - Loads the T5 tokenizer and model for text generation.
  - Initializes a pre-trained ResNet50 model for visual feature extraction.
  - Modifies the final fully connected layer of ResNet50 to match the T5 model's input size.
  - Adds a dropout layer for regularization.

- **Forward Pass (`forward` method)**:
  - Extracts visual features from input images using the ResNet50 model.
  - Applies dropout to the extracted features.
  - Generates reports by passing both the text inputs and the visual features to the T5 model.
  - Supports both training (with labels) and inference (without labels).

- **Visual Feature Extraction (`extract_visual_features` method)**:
  - Processes input images to extract visual features using the modified ResNet50 model.
  - Adjusts the dimensions of the extracted features to match the expected input shape of the T5 model.

- **Caption Generation (`generate_caption` method)**:
  - Generates textual captions for given images and text inputs.
  - Uses the T5 model to generate predictions based on the visual features and text inputs.



In [None]:
import torch
from transformers import T5ForConditionalGeneration, BertTokenizer
import torchvision.models as models
import torch.nn as nn

class R2GenModel(torch.nn.Module):
    def __init__(self, model_name='t5-small', device='cuda', dropout_prob=0.1):
        super(R2GenModel, self).__init__()
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.model = T5ForConditionalGeneration.from_pretrained(model_name).to(device)
        self.device = device

        # Add visual extractor (ResNet101)
        self.visual_extractor = models.resnet101(pretrained=True).to(device)
        self.visual_extractor.fc = torch.nn.Linear(self.visual_extractor.fc.in_features, self.model.config.d_model).to(device)

        # Dropout layer
        self.dropout = nn.Dropout(dropout_prob)

    def forward(self, input_ids, attention_mask, images, labels=None):
        visual_features = self.extract_visual_features(images)
        visual_features = self.dropout(visual_features)  # Apply dropout
        if labels is not None:
            return self.model(input_ids=input_ids, attention_mask=attention_mask, encoder_outputs=(visual_features,), labels=labels)
        else:
            return self.model.generate(input_ids=input_ids, attention_mask=attention_mask, encoder_outputs=(visual_features,))

    def extract_visual_features(self, images):
        images = images.to(self.device)
        # Use features from the second-to-last layer for pooling
        visual_features = self.visual_extractor(images)
        visual_features = visual_features.unsqueeze(1)  # Add sequence dimension
        batch_size, dim = visual_features.size(0), visual_features.size(2)
        visual_features = visual_features.expand(batch_size, 512, dim)  # Expand to the same sequence length
        return visual_features

    def generate_caption(self, input_ids, images, max_length=50):
        visual_features = self.extract_visual_features(images)
        generated_ids = self.model.generate(input_ids=input_ids, encoder_outputs=(visual_features,), max_length=max_length, num_beams=1)
        generated_texts = [self.tokenizer.decode(g, skip_special_tokens=True, clean_up_tokenization_spaces=True) for g in generated_ids]
        return generated_texts


## `ReportImageDataset` Class

The `ReportImageDataset` class is a custom dataset class that prepares the data for training and evaluation.

- **Initialization (`__init__` method)**:
  - Takes lists of reports and corresponding image paths, along with the tokenizer and optional image transformations.
  
- **Length (`__len__` method)**:
  - Returns the total number of samples in the dataset.
  
- **Item Retrieval (`__getitem__` method)**:
  - Loads and preprocesses an image.
  - Tokenizes the corresponding report text.
  - Returns a dictionary containing the tokenized inputs, attention masks, and preprocessed images.



In [None]:
import torch
from PIL import Image
from torchvision import transforms

class ReportImageDataset(torch.utils.data.Dataset):
    def __init__(self, reports, image_paths, tokenizer, image_transform=None):
        self.reports = reports
        self.image_paths = image_paths
        self.tokenizer = tokenizer
        self.image_transform = image_transform

    def __len__(self):
        return len(self.reports)

    def __getitem__(self, idx):
        report = self.reports[idx]
        image_path = self.image_paths[idx]

        # Load and preprocess image
        image = Image.open(image_path).convert('RGB')
        if self.image_transform:
            image = self.image_transform(image)

        # Tokenize report text
        inputs = self.tokenizer.encode_plus("generate report: " + report, return_tensors="pt", max_length=512, truncation=True, padding="max_length")

        return {
            'input_ids': inputs['input_ids'].flatten(),
            'attention_mask': inputs['attention_mask'].flatten(),
            'image': image
        }

## Training Function

The `train` function trains the `R2GenModel` using the provided training and validation datasets.

- **Parameters**:
  - `model`: The `R2GenModel` instance to be trained.
  - `train_dataset` and `val_dataset`: The datasets for training and validation.
  - `batch_size`: Batch size for data loading.
  - `num_epochs`: Number of epochs to train.
  - `learning_rate`: Learning rate for the optimizer.

- **Training Loop**:
  - Loads the data in batches.
  - Performs forward and backward passes.
  - Optimizes the model parameters.
  - Computes and prints average training and validation losses.



In [None]:
def train(model, train_dataset, val_dataset, batch_size=8, num_epochs=5, learning_rate=1e-4):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

    model.to(device)
    model.train()

    for epoch in range(num_epochs):
        total_train_loss = 0.0
        model.train()
        for batch in train_dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            images = batch['image'].to(device)
            labels = batch['input_ids'].to(device)

            outputs = model(input_ids, attention_mask, images, labels=labels)
            loss = outputs.loss

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(train_dataloader)
        print(f"Epoch {epoch + 1}, Average Training Loss: {avg_train_loss}")

        # Validation step
        total_val_loss = 0.0
        model.eval()
        with torch.no_grad():
            for batch in val_dataloader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                images = batch['image'].to(device)
                labels = batch['input_ids'].to(device)

                outputs = model(input_ids, attention_mask, images, labels=labels)
                loss = outputs.loss

                total_val_loss += loss.item()

        avg_val_loss = total_val_loss / len(val_dataloader)
        print(f"Epoch {epoch + 1}, Average Validation Loss: {avg_val_loss}")

## Evaluation Function

The `evaluate_model` function evaluates the trained model on a test dataset using various metrics.

- **Process**:
  - Generates reports for images in the test set.
  - Computes evaluation metrics (BLEU, METEOR, ROUGE, precision, recall, and F1 score).

## Metrics Calculation

Various functions and libraries (e.g., `Rouge`, `sentence_bleu`, `meteor_score`) are used to calculate evaluation metrics for the generated reports against reference reports.

- **calculate_scores**:
  - Calculates BLEU-1, BLEU-4, METEOR, and ROUGE scores.
  - Computes precision, recall, and F1 score for the generated reports.



In [None]:
from rouge import Rouge
from nltk.tokenize import word_tokenize
import nltk
from nltk.translate.meteor_score import meteor_score
import numpy as np
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import torch

nltk.download('wordnet')
nltk.download('omw-1.4')

findings = ['Enlarged Cardiomediastinum', 'Cardiomegaly', 'Lung Opacity',
            'Lung Lesion', 'Edema', 'Consolidation', 'Pneumonia', 'Atelectasis',
            'Pneumothorax', 'Pleural Effusion', 'Pleural Other', 'Fracture',
            'Support Devices', 'No Finding']

def calculate_ce(reference, prediction):
    reference_set = set(reference.split())
    prediction_set = set(prediction.split())
    true_positives = len(reference_set.intersection(prediction_set))
    precision = true_positives / len(prediction_set) if prediction_set else 0
    recall = true_positives / len(reference_set) if reference_set else 0
    f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) else 0
    return precision, recall, f1_score

def calculate_scores(references, predictions):
    bleu_scores_1 = []
    bleu_scores_4 = []
    meteor_scores_list = []
    rouge = Rouge()
    rouge_scores = {'rouge-1': [], 'rouge-2': [], 'rouge-l': []}
    precisions = []
    recalls = []
    f1_scores = []

    for ref, pred in zip(references, predictions):
        if not pred:  # Skip empty predictions
            continue

        ref_tokens = ref.split()
        pred_tokens = pred.split()

        # BLEU-1
        bleu_scores_1.append(sentence_bleu([ref_tokens], pred_tokens, weights=(1, 0, 0, 0), smoothing_function=SmoothingFunction().method1))

        # BLEU-4
        bleu_scores_4.append(sentence_bleu([ref_tokens], pred_tokens, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=SmoothingFunction().method1))

        # METEOR
        meteor_scores_list.append(meteor_score([ref_tokens], pred_tokens))  # Tokenize the inputs

        # ROUGE
        rouge_score = rouge.get_scores(pred, ref)
        for key in rouge_scores:
            rouge_scores[key].append(rouge_score[0][key]['f'])

        # Precision, Recall, F1
        precision, recall, f1_score = calculate_ce(ref, pred)
        precisions.append(precision)
        recalls.append(recall)
        f1_scores.append(f1_score)

    avg_bleu_score_1 = np.mean(bleu_scores_1) if bleu_scores_1 else 0
    avg_bleu_score_4 = np.mean(bleu_scores_4) if bleu_scores_4 else 0
    avg_meteor_score = np.mean(meteor_scores_list) if meteor_scores_list else 0
    avg_rouge_scores = {key: np.mean(value) if value else 0 for key, value in rouge_scores.items()}
    avg_precision = np.mean(precisions) if precisions else 0
    avg_recall = np.mean(recalls) if recalls else 0
    avg_f1_score = np.mean(f1_scores) if f1_scores else 0

    return avg_bleu_score_1, avg_bleu_score_4, avg_meteor_score, avg_rouge_scores, avg_precision, avg_recall, avg_f1_score

def evaluate_model(model, dataloader):
    model.eval()
    references = []
    predictions = []

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            images = batch['image'].to(device)

            generated_texts = model.generate_caption(input_ids, images, max_length=50)
            for i, generated_text in enumerate(generated_texts):
                reference_text = model.tokenizer.decode(input_ids[i], skip_special_tokens=True)
                if generated_text:  # Check if generated text is not empty
                    references.append(reference_text)
                    predictions.append(generated_text)

    bleu_scores_1, bleu_scores_4, meteor_scores, rouge_scores, avg_precision, avg_recall, avg_f1_score = calculate_scores(references, predictions)
    return bleu_scores_1, bleu_scores_4, meteor_scores, rouge_scores, avg_precision, avg_recall, avg_f1_score


[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...


## Main Script

The main script orchestrates the entire process:

1. Loads the dataset annotations.
2. Prepares image paths and corresponding reports for training, validation, and testing.
3. Initializes and trains the `R2GenModel`.
4. Evaluates the trained model on the test dataset and prints the evaluation metrics.

In [None]:

import os
import json
from torchvision import transforms

if __name__ == '__main__':
    data_path = '/content/dataset/iu_xray/annotation.json'

    with open(data_path, 'r') as f:
        data = json.load(f)

    train_data = data['train']
    test_data = data['test']
    val_data = data['val']

    train_image_paths = []
    train_reports = []
    test_image_paths = []
    test_reports = []
    val_image_paths = []
    val_reports = []

    for example in train_data:
        for path in example['image_path']:
            train_image_paths.append(os.path.join('/content/dataset/iu_xray/images', path))
            train_reports.append(example['report'])

    for example in test_data:
        for path in example['image_path']:
            test_image_paths.append(os.path.join('/content/dataset/iu_xray/images', path))
            test_reports.append(example['report'])

    for example in val_data:
        for path in example['image_path']:
            val_image_paths.append(os.path.join('/content/dataset/iu_xray/images', path))
            val_reports.append(example['report'])

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = R2GenModel(model_name='t5-small', device=device)
    model.to(device)

    image_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    train_dataset = ReportImageDataset(train_reports, train_image_paths, model.tokenizer, image_transform=image_transform)
    test_dataset = ReportImageDataset(test_reports, test_image_paths, model.tokenizer, image_transform=image_transform)
    val_dataset = ReportImageDataset(val_reports, val_image_paths, model.tokenizer, image_transform=image_transform)



The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/242M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

Downloading: "https://download.pytorch.org/models/resnet101-63fe2227.pth" to /root/.cache/torch/hub/checkpoints/resnet101-63fe2227.pth
100%|██████████| 171M/171M [00:01<00:00, 158MB/s]


In [None]:
train(model, train_dataset, val_dataset, batch_size=8, num_epochs=3, learning_rate=1e-4)


Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.48.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.


Epoch 1, Average Training Loss: 1.175930494852508
Epoch 1, Average Validation Loss: 0.6434864430008708
Epoch 2, Average Training Loss: 0.4886288440940923
Epoch 2, Average Validation Loss: 0.5362358692328673
Epoch 3, Average Training Loss: 0.4000027220279093
Epoch 3, Average Validation Loss: 0.3232153571538023


In [None]:
torch.save(model.state_dict(), 'r2gen_model_Bert.pth')


In [None]:
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=8, shuffle=False)
bleu_scores_1, bleu_scores_4, meteor_scores, rouge_scores, avg_precision, avg_recall, avg_f1_score = evaluate_model(model, test_dataloader)

print("BLEU 1 Scores:", bleu_scores_1)
print("BLEU 4 Scores:", bleu_scores_4)
print("METEOR Scores:", meteor_scores)
print("ROUGE Scores:", rouge_scores)
print("Precision:", avg_precision)
print("Recall:", avg_recall)
print("F1 Score:", avg_f1_score)


BLEU 1 Scores: 0.35899591973255307
BLEU 4 Scores: 0.12777378218586086
METEOR Scores: 0.36924185205264215
ROUGE Scores: {'rouge-1': 0.5263584051523261, 'rouge-2': 0.26399040462127565, 'rouge-l': 0.49396306392478934}
Precision: 0.5411229018748753
Recall: 0.42832934004579004
F1 Score: 0.46352299211306774


In [None]:
import random
import torch

def print_random_captions(model, train_dataset, test_dataset, num_samples=5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def get_random_samples(dataset, num_samples):
        indices = torch.tensor(random.sample(range(len(dataset)), num_samples)).to(dtype=torch.long, device=device)
        samples = [dataset[i] for i in indices]
        return samples

    train_samples = get_random_samples(train_dataset, num_samples)
    test_samples = get_random_samples(test_dataset, num_samples)

    model.eval()
    with torch.no_grad():
        print("Train Samples:\n")
        for sample in train_samples:
            input_ids = sample['input_ids'].unsqueeze(0).to(device)
            image = sample['image'].unsqueeze(0).to(device)
            # Check input tensor type and shape
            print(f"Input Tensor Type: {input_ids.dtype}, Shape: {input_ids.shape}")
            ground_truth = model.tokenizer.decode(input_ids[0], skip_special_tokens=True)
            generated_caption = model.generate_caption(input_ids, image)  # Corrected here
            print(f"Ground Truth: {ground_truth}")
            print(f"Generated Caption: {generated_caption}\n")

        print("Test Samples:\n")
        for sample in test_samples:
            input_ids = sample['input_ids'].unsqueeze(0).to(device)
            image = sample['image'].unsqueeze(0).to(device)
            # Check input tensor type and shape
            print(f"Input Tensor Type: {input_ids.dtype}, Shape: {input_ids.shape}")
            ground_truth = model.tokenizer.decode(input_ids[0], skip_special_tokens=True)
            generated_caption = model.generate_caption(input_ids, image)  # Corrected here
            print(f"Ground Truth: {ground_truth}")
            print(f"Generated Caption: {generated_caption}\n")
print_random_captions(model, train_dataset, test_dataset)


Train Samples:

Input Tensor Type: torch.int64, Shape: torch.Size([1, 512])
Ground Truth: generate report : there is a calcified left upper lobe granuloma. no xxxx suspicious pulmonary mass or nodule is identified. there is no focal airspace consolidation. no pleural effusion or pneumothorax. the lungs remain hyperexpanded. stable cardiomediastinal silhouette. calcified mediastinal and hilar lymph xxxx are consistent with prior granulomatous disease. there are minimal degenerative changes of the spine.
Generated Caption: ['generate report : the heart size is normal. the lungs are clear. there is no pneumothorax or pleural effusion. there is no acute bony abnormality.']

Input Tensor Type: torch.int64, Shape: torch.Size([1, 512])
Ground Truth: generate report : cardiomediastinal silhouette and pulmonary vasculature are within normal limits. aortic calcifications and tortuosity. lungs are clear. no pneumothorax or pleural effusion. no acute osseous findings. degenerative changes of the t

In [None]:
from PIL import Image
import torchvision.transforms as transforms

def test_on_external_image(model, image_path):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Load and preprocess the image
    def preprocess_image(image_path):
        image = Image.open(image_path).convert("RGB")  # Ensure it's in RGB format
        transform = transforms.Compose([
            transforms.Resize((224, 224)),  # Adjust size to match model requirements
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Example normalization
        ])
        return transform(image).unsqueeze(0)  # Add batch dimension

    # Preprocess the image
    image_tensor = preprocess_image(image_path).to(device)

    model.eval()
    with torch.no_grad():
        # Generate a caption for the image
        # Assuming the model uses a default input for testing external images
        input_ids = torch.tensor([[101]])  # Example input token for initialization
        input_ids = input_ids.to(device)

        generated_caption = model.generate_caption(input_ids, image_tensor)

        print("Generated Caption for External Image:")
        print(generated_caption)

# Path to the image downloaded from Google
image_path = "/content/iStock_22401848_MEDIUM-58262cb63df78c6f6adebb27.jpg"

# Test the model on the external image
test_on_external_image(model, image_path)


Generated Caption for External Image:
['generate report : the heart size is normal. the lungs are clear. there is no focal airspace consolidation, pleural effusion, or pneumothorax. the heart size is normal.']


In [None]:
from google.colab import files

# Download the model
files.download('/content/r2gen_model_Bert.pth')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>