# Importing libraries

In [None]:
import pandas as pd
import numpy as np
import os
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
!pip install sentence-transformers

# Data Preparation

## reports file

In [None]:
df = pd.read_csv("/kaggle/input/chest-xrays-indiana-university/indiana_reports.csv")

In [None]:
df

 ### some insights

- **uid:** *identifier to match with image metadata*.

- **MeSH:** *Medical Subject Headings—terms or tags related to the study*.

- **Problems:** *Additional annotated issues (if any).*

- **image:** *image descriptive field*

- **indication:** *Reason or clinical indication for the X-ray.*

- **comparison:** *Reference to prior studies for comparison, not available for patient privacy.*

- **Findings:** *Observations by the radiologist based on image analysis.*

- **impression:** *The final diagnostic impression or conclusion.*

- THE **XXXX** signifies confidential information that was removed from the reports

In [None]:
df.isnull().sum()

In [None]:
df.dropna(subset={'findings','impression'},inplace=True)

In [None]:
df

In [None]:
# getting report from findings and impression
df['report']=df['findings']+' '+df['impression']

In [None]:
df['report']

## projection file

In [None]:
proj_df=pd.read_csv("/kaggle/input/chest-xrays-indiana-university/indiana_projections.csv")
proj_df

### Projection file insights
- **uid:** Unique identifier linking to a particular study or patient.

- **filename:** The image file’s name.

- **projection:** The view orientation,frontal or "lateral.

 ## Visualization Image Captions

IT appears that each patient has 2 chest x-ray images frontal and lateral. every two rows

# Data preprocessing

## Data cleansing

In [None]:
import re
def cleaning(s):
    s=s.lower()               # lowercasing
    s = re.sub(r"[^a-z.,]", " ", s)  # Replaces any character that is NOT a lowercase letter , period or a comma with space
    s = re.sub(r"\s+", " ", s).strip() # remove multiple spaces with single space
    return s

In [None]:
df.report.iloc[0]

In [None]:
df.report=df.report.apply(cleaning)

In [None]:
df.report.iloc[0]

## Merging the files into one dataframe

In [None]:
df=pd.merge(df,proj_df,how='inner',on='uid')

In [None]:
df

In [None]:
df['path']="/kaggle/input/chest-xrays-indiana-university/images/images_normalized/"+df['filename']
df.head(2)

In [None]:
image_folder = "/kaggle/input/chest-xrays-indiana-university/images/images_normalized"  # change to your dataset path

def show_image_with_caption(row):
    img_path = os.path.join(image_folder, row['filename'])
    img = Image.open(img_path)

    plt.figure(figsize=(6, 6))
    plt.imshow(img, cmap='gray')
    plt.axis("off")
    plt.title(row['report'], fontsize=10)
    plt.show()

In [None]:
for _, row in df.head(5).iterrows():
    show_image_with_caption(row)

## Tokenizer

In [None]:
import nltk
from collections import Counter
text=" ".join(df['report'])
tokens=nltk.word_tokenize(text)
counterr=Counter(tokens)
print(tokens[0:5])

In [None]:
word2idx = {"<PAD>": 0, "<UNK>": 1, "<SOS>": 2, "<EOS>": 3}
j = 4
for i in counterr.keys():
    word2idx[i]=j
    j+=1

In [None]:
import nltk
from typing import List, Dict

class Tokenizer:
    """
    A custom tokenizer for mapping text <-> token IDs for NLP models.
    
    Attributes:
        word2idx (dict): Maps words to unique integer IDs.
        idx2word (dict): Maps IDs back to words.
        pad_token, unk_token, sos_token, eos_token (str): Special token strings.
        pad_id, unk_id, sos_id, eos_id (int): Special token IDs.
        vocab_size (int): Number of unique tokens in the vocabulary.
    """

    def __init__(self, word2idx: Dict[str, int]):
        """
        Initialize the tokenizer with a vocabulary mapping.
        
        Args:
            word2idx: Dictionary mapping words to integer IDs.
        """
        self.word2idx = word2idx
        self.idx2word = {v: k for k, v in word2idx.items()}

        # Special tokens
        self.pad_token = "<PAD>"
        self.unk_token = "<UNK>"
        self.sos_token = "<SOS>"
        self.eos_token = "<EOS>"

        # Special token IDs
        self.pad_id = self.word2idx[self.pad_token]
        self.unk_id = self.word2idx[self.unk_token]
        self.sos_id = self.word2idx[self.sos_token]
        self.eos_id = self.word2idx[self.eos_token]

        self.vocab_size = len(word2idx)

    def encode(self, sentence: str, add_special_tokens: bool = True) -> List[int]:
        """
        Convert a sentence into a list of token IDs.

        Args:
            sentence: The sentence to encode.
            add_special_tokens: Whether to add <SOS> and <EOS> around the sequence.

        Returns:
            List of token IDs representing the sentence.
        """
        # Tokenize and lowercase
        tokens = nltk.word_tokenize(sentence.lower())

        # Convert tokens to IDs, use <UNK> if token not in vocabulary
        token_ids = [self.word2idx.get(token, self.unk_id) for token in tokens]

        # Add start/end markers if required
        if add_special_tokens:
            token_ids = [self.sos_id] + token_ids + [self.eos_id]

        return token_ids

    def decode(self, token_ids: List[int], skip_special_tokens: bool = True) -> str:
        """
        Convert a list of token IDs back into a sentence.

        Args:
            token_ids: List of integer IDs.
            skip_special_tokens: If True, removes <PAD>, <SOS>, and <EOS> from output.

        Returns:
            Decoded sentence as a string.
        """
        words = []
        for idx in token_ids:
            word = self.idx2word.get(idx, self.unk_token)
            # Skip special tokens if requested
            if skip_special_tokens and word in {self.pad_token, self.sos_token, self.eos_token}:
                continue
            words.append(word)

        return " ".join(words)

In [None]:
#Example usage:
tokenizer = Tokenizer(word2idx)
ids = tokenizer.encode("No pleural effusion.")
print(ids)  # Example: [2, 6, 300, 450, 3]
print(tokenizer.decode(ids))  # "no pleural effusion ."

### Adding tokonized sequence in the dataframe

In [None]:
df["sequence"]=df["report"].apply(tokenizer.encode)
df.head(2)

In [None]:
row=df.iloc[0]
tokenizer.decode(row.sequence)

## Padding and trimming

In [None]:
lengths=[len(i) for i in df['sequence']]
sns.violinplot(lengths)

Max length of sequence from the plot around 100

In [None]:
seq_len=100
def pad_and_trim(seq):
    # Truncate if longer
    if len(seq) > seq_len:
        seq = seq[:seq_len-1] + [tokenizer.eos_id] 
        return seq
    else:
        return seq + [tokenizer.pad_id] * (seq_len - len(seq))
df.sequence=df.sequence.apply(pad_and_trim)

# Modeling

## Data split

In [None]:
from sklearn.model_selection import train_test_split
train_df,temp=train_test_split(df,test_size=0.2)
test_df,val_df=train_test_split(temp,test_size=0.5)
print(len(train_df),len(test_df),len(val_df))

## Image preprocessing

In [None]:
from torchvision import transforms
from torch.utils.data import Dataset

img_size=(512,512)
image_transforms = transforms.Compose([
    transforms.Resize(img_size),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])
# Data Augmentation
train_transforms = transforms.Compose([
    transforms.Resize(img_size),
    transforms.RandomRotation(degrees=10),
    transforms.ColorJitter(
        brightness=0.2, 
        contrast=0.2, 
        saturation=0.2, 
        hue=0.02
    ),
    transforms.RandomResizedCrop(img_size, scale=(0.8, 1.0)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

class IUXrayDataset(Dataset):
    def __init__(self, image_paths, captions_seq, transform=None):
        self.image_paths = image_paths
        self.captions_seq = captions_seq
        self.transform = transform
    def __len__(self):
        return len(self.image_paths)
    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert("RGB")
        if self.transform:
            image = self.transform(image)
        caption_seq = self.captions_seq[idx]
        return image, torch.tensor(caption_seq, dtype=torch.long)

In [None]:
from torch.utils.data import DataLoader

train_dataset = IUXrayDataset(
    image_paths=list(train_df['path']),
    captions_seq=list(train_df['sequence']),
    transform=train_transforms
)
test_dataset = IUXrayDataset(
    image_paths=list(test_df['path']),
    captions_seq=list(test_df['sequence']),
    transform=image_transforms
)
val_dataset = IUXrayDataset(
    image_paths=list(val_df['path']),
    captions_seq=list(val_df['sequence']),
    transform=image_transforms
)


train_loader = DataLoader(
    train_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=4,
    drop_last=True
)
print(len(train_loader))

In [None]:
test_loader = DataLoader(
    test_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=4,
    drop_last=True
)
print(len(test_loader))


val_loader = DataLoader(
    val_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=4,
    drop_last=True
)
print(len(val_loader))

## Image Encoder

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

class ImageEncoder(nn.Module):
    def __init__(self,embed_dim=512):
        super(ImageEncoder,self).__init__()
        effnet=models.efficientnet_b4(pretrained=True)

        # remove classification head
        self.backbone=effnet.features # Output shape: [B, 1792, H/32, W/32]

        # Freeze backbone
        for param in self.backbone.parameters():
            param.requires_grad=False

        # Adaptive flattening: [B, 1792,H/32, W/32] → [B, 49, 1792]
        self.pool = nn.AdaptiveAvgPool2d((7, 7))  # [B, 1792,H/32, W/32] → [B, 1792, 7, 7]
        self.flatten = nn.Flatten(2)  # dims (B, C, H, W) → (B, C, H*W)
        self.transpose = lambda x: x.permute(0, 2, 1)  # (B, C, L) → (B, L, C)

        self.project = nn.Linear(1792, embed_dim)

    def forward(self, x):
        x = self.backbone(x)               # (B, 1792, H, W)
        x = self.pool(x)                   # (B, 1792, 7, 7)
        x = self.flatten(x)                # (B, 1792, 49)
        x = self.transpose(x)              # (B, 49, 1792)
        x = self.project(x)                # (B, 49, embed_dim)
        return x

## Getting Image and Caption

In [None]:
for i in train_loader:
    img_batch,seq_batch = i
    break
print(img_batch.shape)

encoder = ImageEncoder(512)
out = encoder(img_batch)
print(out.shape)  # [batch_size, 49, 512]

In [None]:
print(seq_batch.shape)

In [None]:
class PositionalEmbedding(nn.Module):
    def __init__(self, vocab_size, max_len, embed_dim):
        super(PositionalEmbedding, self).__init__()
        self.token_embedding = nn.Embedding(vocab_size, embed_dim)
        self.position_embedding = nn.Embedding(max_len, embed_dim)

    def forward(self, x):
        # x: [B, T]
        positions = torch.arange(0, x.size(1), device=x.device).unsqueeze(0)
        pos_embed = self.position_embedding(positions)   # [1, T, D]
        tok_embed = self.token_embedding(x)              # [B, T, D]
        return tok_embed + pos_embed

In [None]:
embed = PositionalEmbedding(vocab_size=tokenizer.vocab_size, max_len=102, embed_dim=512)
x = torch.tensor([row.sequence,row.sequence], dtype=torch.long) # batch of 2
output = embed(x)
print(output.shape)  # [B, sequence_len, 256]

In [None]:
import torch.nn.functional as F

class TransformerEncoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
        """
        Args:
            embed_dim (int): The dimensionality of the input and output.
            num_heads (int): The number of attention heads.
            ff_dim (int): The dimensionality of the inner-layer in the feed-forward network.
            dropout (float): The dropout probability.
        """
        super().__init__()
        self.self_attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout, batch_first=True)
        self.ff = nn.Sequential(
            nn.Linear(embed_dim, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, embed_dim)
        )
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        """
        Forward pass for the Transformer Encoder Block.
        
        Args:
            x (torch.Tensor): Input tensor of shape [B, N, D] (Batch, SequenceLength, EmbedDim)
        
        Returns:
            torch.Tensor: Output tensor of the same shape [B, N, D]
        """
        # Self-attention part
        attn_output, _ = self.self_attn(x, x, x)
        x = self.norm1(x + self.dropout(attn_output))
        
        # Feed-forward part
        ff_output = self.ff(x)
        x = self.norm2(x + self.dropout(ff_output))
        
        return x

In [None]:
class TransformerDecoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout, batch_first=True)
        self.cross_attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout, batch_first=True)
        self.ff = nn.Sequential(
            nn.Linear(embed_dim, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, embed_dim)
        )
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.norm3 = nn.LayerNorm(embed_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, tgt_mask=None):
        # Masked self-attention
        _x = self.norm1(x + self.dropout(self.self_attn(x, x, x, attn_mask=tgt_mask)[0]))
        # Cross-attention
        _x = self.norm2(_x + self.dropout(self.cross_attn(_x, enc_out, enc_out)[0]))
        # Feedforward
        out = self.norm3(_x + self.dropout(self.ff(_x)))
        return out

In [None]:
class CaptionDecoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, ff_dim, num_heads, max_len, num_layers):
        super().__init__()
        self.pos_embed = PositionalEmbedding(vocab_size, max_len, embed_dim)
        self.dec_layers = nn.ModuleList([
            TransformerDecoderBlock(embed_dim, num_heads, ff_dim) 
            for _ in range(num_layers)
        ])
        self.output_proj = nn.Linear(embed_dim, vocab_size)

    def make_causal_mask(self, size):
        return torch.triu(torch.ones(size, size), diagonal=1).bool()

    def forward(self, tgt, enc_out):
        """
        tgt: [B, T]       -- tokenized caption (with <SOS>)
        enc_out: [B, N, D] -- image features
        """
        x = self.pos_embed(tgt)  # [B, T, D]

        # Create causal mask for masked self-attn
        B, T, _ = x.shape
        mask = self.make_causal_mask(T).to(x.device)  # [T, T]
        
        for layer in self.dec_layers:
            x = layer(x, enc_out, tgt_mask=mask)
        
        logits = self.output_proj(x)  # [B, T, vocab_size]
        return logits

In [None]:
vocab_size = tokenizer.vocab_size
embed_dim = 512
ff_dim = 512
num_heads = 8
max_len = 102
num_layers = 4

decoder = CaptionDecoder(vocab_size, embed_dim, ff_dim, num_heads, max_len, num_layers)

# Dummy inputs
sample_tokens = x  # [B, T]
img_features = output            # [B, N, D]

output_logits = decoder(sample_tokens, img_features)
print(output_logits.shape)  # [B, T, vocab_size]

In [None]:
import torch
import math

class ImageCaptioningModel(nn.Module):
    def __init__(self, cnn_encoder, transformer_encoder, decoder, tokenizer):
        super().__init__()
        self.cnn_encoder = cnn_encoder
        self.transformer_encoder = transformer_encoder
        self.decoder = decoder
        self.tokenizer = tokenizer

    def forward(self, images, captions):
        img_features = self.cnn_encoder(images)
        encoded_img = self.transformer_encoder(img_features)
        logits = self.decoder(captions, encoded_img)
        return logits

    def generate(self, image, max_length=100, beam_width=3, device='cuda', length_penalty=0.7):
        """
        Beam search decoding for image captioning.
        """
        self.eval()
        with torch.no_grad():
            # Encode image
            image = image.unsqueeze(0).to(device)  # [1, C, H, W]
            img_features = self.cnn_encoder(image)
            encoded_img = self.transformer_encoder(img_features)

            # Beam: list of (sequence, score)
            beam = [([self.tokenizer.sos_id], 0.0)]

            for _ in range(max_length):
                candidates = []
                for seq, score in beam:
                    if seq[-1] == self.tokenizer.eos_id:
                        # Already ended, keep as is
                        candidates.append((seq, score))
                        continue

                    # Predict next token
                    input_ids = torch.tensor(seq).unsqueeze(0).to(device)  # [1, len]
                    logits = self.decoder(input_ids, encoded_img)  # [1, len, vocab_size]
                    probs = torch.softmax(logits[0, -1, :], dim=-1)  # last token probs

                    # Get top-k next tokens
                    topk_probs, topk_ids = probs.topk(beam_width)

                    for prob, idx in zip(topk_probs, topk_ids):
                        new_seq = seq + [idx.item()]
                        new_score = score + math.log(prob.item() + 1e-12)  # sum log-probs
                        candidates.append((new_seq, new_score))

                # Keep best beam_width sequences (apply length penalty)
                beam = sorted(
                    candidates,
                    key=lambda x: x[1] / ((len(x[0]) ** length_penalty) if length_penalty > 0 else 1),
                    reverse=True
                )[:beam_width]

                # If all beams ended, stop early
                if all(seq[-1] == self.tokenizer.eos_id for seq, _ in beam):
                    break

            # Return best sequence (highest score)
            best_seq = beam[0][0]
            return self.tokenizer.decode(best_seq, skip_special_tokens=True)

In [None]:
# --- Model Hyperparameters ---
device = 'cuda' if torch.cuda.is_available() else 'cpu'
embed_dim = 512
ff_dim = 512
num_heads = 8
num_decoder_layers = 4 # Keeping the decoder 4 layers deep
vocab_size = tokenizer.vocab_size
max_len = 102

cnn_encoder = ImageEncoder(embed_dim=embed_dim)

# 2. Transformer Encoder 
transformer_encoder = TransformerEncoderBlock(
    embed_dim=embed_dim, 
    num_heads=num_heads, 
    ff_dim=ff_dim
)

# 3. Transformer Decoder 
decoder = CaptionDecoder(
    vocab_size=vocab_size,
    embed_dim=embed_dim,
    ff_dim=ff_dim,
    num_heads=num_heads,
    max_len=max_len,
    num_layers=num_decoder_layers
)

# 4. The full Image Captioning Model
model = ImageCaptioningModel(
    cnn_encoder=cnn_encoder,
    transformer_encoder=transformer_encoder,
    decoder=decoder,
    tokenizer=tokenizer
).to(device)

In [None]:
device='cuda'
model = ImageCaptioningModel(cnn_encoder,transformer_encoder, decoder, tokenizer).to(device)

# Assume `img_tensor` is [3, H, W] and preprocessed
caption = model.generate(img_batch[0], max_length=40)
print("Generated Caption:", caption)

In [None]:
def caption_loss_fn(logits, targets, pad_token_id):
    """
    logits: [B, T, vocab_size]
    targets: [B, T] (next tokens)
    """
    logits = logits.view(-1, logits.size(-1))   # [(B*T), vocab_size]
    targets = targets.reshape(-1)                # [(B*T)]
    
    loss = F.cross_entropy(logits, targets, ignore_index=pad_token_id,label_smoothing=0.1)
    return loss

In [None]:
from tqdm import tqdm
def train_one_epoch(model, dataloader, optimizer, pad_token_id, device):
    model.train()
    total_loss = 0

    for images, captions in tqdm(dataloader):
        images, captions = images.to(device), captions.to(device)

        # Inputs: captions[:-1], Targets: captions[1:]
        inputs = captions[:, :-1]
        targets = captions[:, 1:]

        optimizer.zero_grad()
        logits = model(images, inputs)
        loss = caption_loss_fn(logits, targets, pad_token_id)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    return total_loss / len(dataloader)

In [None]:
from tqdm import tqdm
def eval(model, dataloader, pad_token_id, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for images, captions in tqdm(dataloader):
            images, captions = images.to(device), captions.to(device)
    
            # Inputs: captions[:-1], Targets: captions[1:]
            inputs = captions[:, :-1]
            targets = captions[:, 1:]
    
            logits = model(images, inputs)
            loss = caption_loss_fn(logits, targets, pad_token_id)

            total_loss += loss.item()

    return total_loss / len(dataloader)

In [None]:
import copy

model = ImageCaptioningModel(cnn_encoder,transformer_encoder, decoder, tokenizer).to(device)
import warnings
warnings.filterwarnings("ignore")
# Optimizer
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

best_val_loss = float('inf')
best_model = copy.deepcopy(model.state_dict())
# In your model setup cell (e.g., Cell 44)

# 1. Separate parameters
cnn_params = list(model.cnn_encoder.parameters())
transformer_params = list(model.transformer_encoder.parameters()) + list(model.decoder.parameters())

# Identify the last block of the CNN to fine-tune
finetune_layers = model.cnn_encoder.backbone[-2:] # Example: unfreeze last 2 blocks
finetune_params = list(finetune_layers.parameters())
for param in model.cnn_encoder.backbone[-2:].parameters():
    param.requires_grad = True

# Freeze everything else in the CNN
for param in model.cnn_encoder.backbone[:-2].parameters():
    param.requires_grad = False

# 2. Setup optimizer with different learning rates
optimizer = torch.optim.Adam([
    {'params': transformer_params, 'lr': 1e-4},
    {'params': finetune_params, 'lr': 1e-5}
])

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=2, verbose=True)

# Train for 60 epochs
for epoch in range(50):
    loss = train_one_epoch(model, train_loader, optimizer, tokenizer.pad_id, device)
    val_loss = eval(model, val_loader, tokenizer.pad_id, device)
    scheduler.step(val_loss)
    if best_val_loss>val_loss:
        print("Saving best model...")
        best_val_loss = val_loss
        best_model = copy.deepcopy(model.state_dict())
    print(f"Epoch {epoch+1}, Loss: {loss:.4f}, eval_loss: {val_loss:.4f}")

print("loading the best model")
model.load_state_dict(best_model)  
model.eval()

# Save Model

In [None]:
torch.save(model.state_dict(), "X-ray_transformer_model.pt")

In [None]:
model = ImageCaptioningModel(cnn_encoder,transformer_encoder, decoder, tokenizer).to(device)
model.load_state_dict(torch.load("X-ray_transformer_model.pt"))
model.eval()

# Evaluation

In [None]:
!pip install rouge_score
!pip install bert_score

In [None]:
import nltk
nltk.download('wordnet')

In [None]:
from tqdm import tqdm
from nltk.translate.bleu_score import sentence_bleu
from nltk.translate.meteor_score import meteor_score
from rouge_score import rouge_scorer
from bert_score import score as bert_scorer

def evaluate_model(model, dataloader, tokenizer, device):
    """
    Generates predictions and computes BLEU, ROUGE, METEOR, and BERTScore.
    """
    model.eval()
    references = []
    hypotheses = []

    # 1. Generate Predictions
    print("Generating predictions on the test set...")
    with torch.no_grad():
        for images, captions_seq in tqdm(dataloader):
            # Move images to the correct device for model.generate
            images = images.to(device)
            
            # Generate a caption for each image in the batch
            for i in range(images.size(0)):
                image = images[i]
                generated_caption = model.generate(image, max_length=100, device=device)
                hypotheses.append(generated_caption)

            # Decode the ground truth captions
            # The caption sequences are on the CPU by default from the dataloader
            for seq in captions_seq:
                ref_caption = tokenizer.decode(seq.tolist(), skip_special_tokens=True)
                references.append(ref_caption)

    print(f"\nGenerated {len(hypotheses)} hypotheses.")
    print("Example Hypothesis:", hypotheses[0])
    print("Example Reference: ", references[0])

    # 2. Calculate Metrics
    
    # --- BLEU Score ---
    print("\nCalculating BLEU scores...")
    bleu_scores = {'bleu_1': 0, 'bleu_2': 0, 'bleu_3': 0, 'bleu_4': 0}
    for ref, hyp in zip(references, hypotheses):
        ref_tokens = [nltk.word_tokenize(ref)] # NLTK's sentence_bleu expects a list of reference translations
        hyp_tokens = nltk.word_tokenize(hyp)
        
        bleu_scores['bleu_1'] += sentence_bleu(ref_tokens, hyp_tokens, weights=(1, 0, 0, 0))
        bleu_scores['bleu_2'] += sentence_bleu(ref_tokens, hyp_tokens, weights=(0, 1, 0, 0))
        bleu_scores['bleu_3'] += sentence_bleu(ref_tokens, hyp_tokens, weights=(0, 0, 1, 0))
        bleu_scores['bleu_4'] += sentence_bleu(ref_tokens, hyp_tokens, weights=(0, 0, 0, 1))
        
    for k in bleu_scores:
        bleu_scores[k] /= len(hypotheses)

    # --- METEOR Score ---
    print("Calculating METEOR scores...")
    meteor_total = 0
    for ref, hyp in zip(references, hypotheses):
        ref_tokens = nltk.word_tokenize(ref)
        hyp_tokens = nltk.word_tokenize(hyp)
        meteor_total += meteor_score([ref_tokens], hyp_tokens)
    meteor_avg = meteor_total / len(hypotheses)
    
    # --- ROUGE Score ---
    print("Calculating ROUGE scores...")
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
    rouge_scores = {'rouge1': 0, 'rouge2': 0, 'rougeL': 0}
    for ref, hyp in zip(references, hypotheses):
        scores = scorer.score(ref, hyp)
        rouge_scores['rouge1'] += scores['rouge1'].fmeasure
        rouge_scores['rouge2'] += scores['rouge2'].fmeasure
        rouge_scores['rougeL'] += scores['rougeL'].fmeasure
        
    for k in rouge_scores:
        rouge_scores[k] /= len(hypotheses)

    # --- BERTScore ---
    # This is computationally intensive, will use GPU if available.
    print("Calculating BERTScore")
    P, R, F1 = bert_scorer(hypotheses, references, lang="en", model_type='distilbert-base-uncased', device=device, verbose=True)
    bertscore_avg = F1.mean().item()

    all_metrics = {
        **bleu_scores,
        'meteor': meteor_avg,
        **rouge_scores,
        'bert_score': bertscore_avg
    }
    
    return all_metrics

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)

# Run the evaluation
test_metrics = evaluate_model(model, test_loader, tokenizer, device)

# Print the final results in a clean format
print("\n--- Test Set Evaluation Metrics ---")
for metric, value in test_metrics.items():
    print(f"{metric.upper():<12}: {value:.4f}")

In [None]:
import matplotlib.pyplot as plt
from PIL import Image
import textwrap

def visualize_result(img_path, true_report, generated_report, title="Model Prediction"):

    img = Image.open(img_path).convert("RGB")
    
    plt.figure(figsize=(12, 6))
    plt.imshow(img, cmap="gray")
    plt.axis('off')
    
    wrapped_true = textwrap.fill(f"Ground Truth: {true_report}", width=100)
    wrapped_gen = textwrap.fill(f"Generated: {generated_report}", width=100)
    
    plt.title(title, fontsize=14, pad=20)
    plt.figtext(0.5, 0.01, f"{wrapped_true}\n\n{wrapped_gen}", 
                ha="center", va="bottom", fontsize=12, wrap=True, bbox={"facecolor":"white", "alpha":0.7, "pad":5})
    
    plt.show()

In [None]:
import random
import torch

def show_random_test_examples(model, test_df, tokenizer, image_transforms, device, num_examples=3):

    model.eval()
    
    random_indices = random.sample(range(len(test_df)), num_examples)
    
    for i, idx in enumerate(random_indices):
        sample_row = test_df.iloc[idx]
        img_path = sample_row['path']
        true_report = sample_row['report']
        
        image_pil = Image.open(img_path).convert("RGB")
        image_tensor = image_transforms(image_pil).unsqueeze(0).to(device)
        
        print(f"--- Example {i+1}/{num_examples} ---")
        
        with torch.no_grad():
            generated_report = model.generate(image_tensor.squeeze(0), max_length=100, device=device)
        visualize_result(img_path, true_report, generated_report, title=f"Test Example {i+1}")

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)

# Call the function to show 5 random examples
show_random_test_examples(model, test_df, tokenizer, image_transforms, device, num_examples=10)