# References

Hugging Face - Documentation [online]. Huggingface.co. Available from: https://huggingface.co/docs.

PyTorch Foundation [online]. PyTorch. Available from: https://pytorch.org/.

# <i> Huggingface Login </i>

In [None]:
from huggingface_hub import notebook_login
from google.colab import userdata

notebook_login(userdata.get('HF_TOKEN'))



VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

# Import libraries

In [None]:
# Fine-tuning Swin Transformer on IU X-Ray dataset (image-text contrastive learning)
import os
import json
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.optim import AdamW
from torchvision.transforms import v2
from transformers import (
    AutoModel,
    AutoTokenizer,
    AutoImageProcessor,
    AutoFeatureExtractor,
    VisionTextDualEncoderModel,
    VisionTextDualEncoderProcessor,
    EarlyStoppingCallback,
    TrainingArguments,
    Trainer
)

# Dataset Handler

In [None]:
# Step 3: Create a PyTorch Dataset to yield image pixels and tokenized text
class IUXRayDataset(Dataset):
    def __init__(self, image_paths, texts, tokenizer, image_processor, transforms, max_length=256):
        self.image_paths = image_paths
        self.texts = texts
        self.tokenizer = tokenizer
        self.image_processor = image_processor
        self.max_length = max_length
        self.transforms = transforms

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        # Load image and its corresponding report
        text = self.texts[idx]
        img_path = text['image_path'][1]
        img_path = os.path.join(self.image_paths, img_path)
        report = text.get("report", "")
        # Open the X-ray image and convert grayscale to RGB
        image = Image.open(img_path).convert("RGB")
        # Preprocess image (resize to 224x224, normalize) and get pixel tensor
        image = self.transforms(image)
        pixel_tensor = self.image_processor(image, return_tensors="pt")["pixel_values"].squeeze(0)
        # Normalize data between 0 and 1
        scaled_data  = (pixel_tensor - pixel_tensor.min()) / (pixel_tensor.max() - pixel_tensor.min())
        # Tokenize the report text (BERT tokenizer) without padding (we will pad in the collator)
        encoding = self.tokenizer(report, max_length=self.max_length, truncation=True, padding=False)
        input_ids = encoding["input_ids"]
        attention_mask = encoding["attention_mask"]
        return {"pixel_values": scaled_data, "input_ids": input_ids, "attention_mask": attention_mask}

In [None]:
# Image path
image_paths = "/content/drive/MyDrive/iu_xray/images"
# Report path
report_path = '/content/drive/MyDrive/iu_xray/annotation.json'
texts = json.load(open(report_path, 'r'))

# Preprocess images
transforms = v2.Compose([
    v2.ToImage(),
    v2.Resize((224, 224)),
    v2.RandomHorizontalFlip(p=0.5),
    v2.RandomRotation(degrees=(-10, 10)),
    v2.ToTensor()
])

# Step 2: Initialize the tokenizer and image processor for the encoders
text_model_name = "microsoft/BiomedVLP-CXR-BERT-general"   # BERT-style text encoder for radiology reports
# vision_model_name = "microsoft/swin-tiny-patch4-window7-224"  # Swin Transformer visual encoder (224x224 input)
vision_model_name = "facebook/maskformer-swin-base-coco"
cache_dir = '/content/huggingface'
tokenizer = AutoTokenizer.from_pretrained(text_model_name, cache_dir=cache_dir)
# The image processor will convert images to RGB, resize to Swin's expected size, and normalize pixel values.
image_processor = AutoImageProcessor.from_pretrained(vision_model_name, cache_dir=cache_dir)
# Combine into a single processor (for convenience in saving/loading later)
processor = VisionTextDualEncoderProcessor(image_processor=image_processor, tokenizer=tokenizer)

  return func(*args, **kwargs)


# Load Dataset

In [None]:
# Instantiate the training and validation datasets
test_len_half = int(len(texts['test'])/2)
train_reports = texts['train']+texts['test'][:test_len_half]
val_reports   = texts['val']+texts['test'][test_len_half:]
train_dataset = IUXRayDataset(image_paths, train_reports, tokenizer, image_processor, transforms)
val_dataset   = IUXRayDataset(image_paths, val_reports, tokenizer, image_processor, transforms)

In [None]:
len(train_dataset), len(val_dataset)

(2364, 591)

In [None]:
len(train_dataset) / (len(train_dataset) + len(val_dataset)) * 100

80.0

# Model

### Facebook Swin Transformer

In [None]:
class SwinWithPooling(nn.Module):
    def __init__(self, model_name):
        super().__init__()
        # Load the Swin backbone
        self.vision_model = AutoModel.from_pretrained(model_name)
        self.config = self.vision_model.config

    def forward(self, pixel_values):
        outputs = self.vision_model(pixel_values)
        last_hidden_state = outputs.last_hidden_state  # shape: [batch_size, num_patches, hidden_dim]

        # Apply mean pooling across spatial tokens
        pooled = last_hidden_state.mean(dim=1)  # shape: [batch_size, hidden_dim]

        return pooled

In [None]:
# Initialize your text and vision encoders
vision_encoder = SwinWithPooling(vision_model_name)
text_encoder = AutoModel.from_pretrained(text_model_name)

# Wrap them into a dual encoder
dual_model = VisionTextDualEncoderModel(vision_model=vision_encoder, text_model=text_encoder)

In [None]:
dual_model.logit_scale.requires_grad_(True)

Parameter containing:
tensor(2.6592, requires_grad=True)

In [None]:
# List all vision (Swin) encoder parameters and their requires_grad status
for name, param in dual_model.vision_model.named_parameters():
    print(name, param.requires_grad)
    break

# List all text (BERT) encoder parameters and their requires_grad status
for name, param in dual_model.text_model.named_parameters():
    print(name, param.requires_grad)
    break

vision_model.pixel_level_module.encoder.model.embeddings.patch_embeddings.projection.weight True
embeddings.word_embeddings.weight True


### Microsoft Swin Transformer

In [None]:
# Step 4: Initialize the VisionTextDualEncoderModel (Swin visual encoder + BERT text encoder)
# This will load the two pretrained models and initialize projection layers for contrastive learning&#8203;:contentReference[oaicite:0]{index=0}.
model = VisionTextDualEncoderModel.from_vision_text_pretrained(vision_model_name, text_model_name, cache_dir=cache_dir)

The projection layer and logit scale weights `['visual_projection.weight', 'text_projection.weight', 'logit_scale']` are newly initialized. You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
# Ensure temperature (logit_scale) is trainable: it should be an nn.Parameter. We explicitly set requires_grad to True.
model.logit_scale.requires_grad_(True)
# (The logit_scale is a learnable scalar that scales the similarity logits in contrastive loss)

Parameter containing:
tensor(2.6592, requires_grad=True)

In [None]:
# List all vision (Swin) encoder parameters and their requires_grad status
for name, param in model.vision_model.named_parameters():
    print(name, param.requires_grad)
    break

# List all text (BERT) encoder parameters and their requires_grad status
for name, param in model.text_model.named_parameters():
    print(name, param.requires_grad)
    break

embeddings.patch_embeddings.projection.weight True
embeddings.word_embeddings.weight True


### Contrastive Trainer

In [None]:
class ContrastiveTrainer(Trainer):
    def compute_loss(self,
                     model,
                     inputs,
                     return_outputs=False,
                     num_items_in_batch=None):
        outputs = model(**inputs, return_loss=True)
        loss = outputs.loss
        return (loss, outputs) if return_outputs else loss

In [None]:
class ContrastiveTrainer(Trainer):
    def __init__(self, *args, temperature=0.07, log_cosine_sim=False, **kwargs):
        super().__init__(*args, **kwargs)
        self.temperature = nn.Parameter(torch.tensor(temperature))
        self.log_cosine_sim = log_cosine_sim

    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        outputs = model(**inputs)

        # Get image and text embeddings
        image_embeds = outputs.image_embeds  # shape: [B, D]
        text_embeds = outputs.text_embeds    # shape: [B, D]

        # Normalize embeddings
        image_embeds = F.normalize(image_embeds, p=2, dim=-1)
        text_embeds = F.normalize(text_embeds, p=2, dim=-1)

        # Compute cosine similarity (dot product of normalized vectors)
        logits_per_image = torch.matmul(image_embeds, text_embeds.T) / self.temperature
        logits_per_text = logits_per_image.T

        # Contrastive loss (InfoNCE)
        batch_size = image_embeds.size(0)
        labels = torch.arange(batch_size, device=self.args.device)
        loss_i2t = F.cross_entropy(logits_per_image, labels)
        loss_t2i = F.cross_entropy(logits_per_text, labels)
        loss = (loss_i2t + loss_t2i) / 2

        # Optional: log cosine similarity between matched pairs
        if self.log_cosine_sim and self.state.global_step % 50 == 0:
            avg_cos_sim = F.cosine_similarity(image_embeds, text_embeds, dim=-1).mean().item()
            self.log({'avg_cosine_similarity': avg_cos_sim})

        return (loss, outputs) if return_outputs else loss

# Data collector

In [None]:
# Step 5: Define a custom data collator to batch samples (pads text to the same length in a batch)
def collate_fn(batch):
    # Stack image pixel tensors (they are all 3x224x224 after processing)
    # pixel_values = torch.stack([item["pixel_values"] for item in batch])
    pixel_values = torch.stack([item["pixel_values"] for item in batch], dim=0)
    # Pad the variable-length text sequences in the batch
    input_ids_batch = [item["input_ids"] for item in batch]
    attention_mask_batch = [item["attention_mask"] for item in batch]
    padded = tokenizer.pad(
        {"input_ids": input_ids_batch, "attention_mask": attention_mask_batch},
        padding=True, return_tensors="pt"
    )
    return {
        "pixel_values": pixel_values,
        "input_ids": padded["input_ids"],
        "attention_mask": padded["attention_mask"]
    }

# Trainer

In [None]:
# Step 6: Set up TrainingArguments with best practices (AdamW optimizer, 5e-5 LR, batch size 8, 5-8 epochs, etc.)
training_args = TrainingArguments(
    output_dir="./iu_xray_swin_bert",         # output directory for model checkpoints
    overwrite_output_dir=True,
    num_train_epochs=20,                      # train for 20 epochs (adjustable up to ~8)
    per_device_train_batch_size=8,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=4,            # accumulate gradients over 4 steps
    learning_rate=1e-5,
    # learning_rate=1e-6,
    weight_decay=0.01,                        # AdamW weight decay
    # warmup_steps=500,                         # warmup for learning rate scheduler
    warmup_ratio=0.1,                         # use 10% of training steps as warmup steps
    eval_strategy="epoch",                    # evaluate at end of each epoch
    save_strategy="epoch",                    # save checkpoint at end of each epoch
    save_total_limit=1,                       # only keep the latest checkpoint (or best, since we load_best_model_at_end)
    load_best_model_at_end=True,              # load best model (according to metric_for_best_model) at end of training
    metric_for_best_model="eval_loss",        # use validation loss to determine the best model
    logging_steps=50,                         # log training metrics every 50 steps
    remove_unused_columns=False,              # needed for multi-modal inputs (so Trainer doesn't drop image pixel_values)
    fp16=True,                                # mixed precision for speed (set False if not using GPU with FP16)
    greater_is_better=False,                  # lower eval_loss is better
    optim="adamw_torch",                      # use AdamW optimizer (Torch implementation)
    lr_scheduler_type="reduce_lr_on_plateau", # use ReduceLROnPlateau scheduler for dynamic LR
    # lr_scheduler_type="linear",
    logging_strategy="steps",
    logging_dir="./logs",                     # directory for TensorBoard logs
    report_to="tensorboard"                   # enable logging to TensorBoard (optional: use "wandb" for Weights & Biases)
)


# EarlyStoppingCallback with patience of 3 epochs (will stop after 3 cons3ecutive epochs with no improvement in eval_loss)
early_stop_callback = EarlyStoppingCallback(early_stopping_patience=3, early_stopping_threshold=0.0)


trainer = ContrastiveTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    data_collator=collate_fn,
    callbacks=[early_stop_callback],
    log_cosine_sim=True,
    temperature=0.07
    # (The Trainer will use AdamW optimizer by default for fine-tuning)
)

Steps to get rid of the below error message:



```
CUDA out of memory. Tried to allocate 254.00 MiB.
GPU 0 has a total capacity of 39.56 GiB of which 48.88 MiB is free.
Process 543181 has 39.50 GiB memory in use.
Of the allocated memory 38.18 GiB is allocated by PyTorch, and 843.57 MiB is reserved by PyTorch but unallocated.
If reserved but unallocated memory is large
try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.
```





In [None]:
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

In [None]:
# Step 7: Fine-tune the model
trainer.can_return_loss = True
trainer.train()

# Save the fine-tuned model and the processor (tokenizer + image processor)
trainer.save_model("./iu_xray_trainer")           # Saves the model, config, and projection layers
processor.save_pretrained("./iu_xray_swin_bert")  # Saves the image processor and tokenizer for future inference
image_processor.save_pretrained("./iu_xray_swin") # Saves the image processor separately

Epoch,Training Loss,Validation Loss
1,2.0752,1.384834
2,2.0697,1.384835
3,2.0697,1.384835
4,2.0712,1.384835


['./iu_xray_swin/preprocessor_config.json']

# Test

In [None]:
model.vision_model.save_pretrained("./swin_model")

In [None]:
from transformers import AutoModel, AutoImageProcessor

swin_model_after_training = AutoModel.from_pretrained("./swin_model")
type(swin_model_after_training)

In [None]:
image_processor_after_training = AutoImageProcessor.from_pretrained(vision_model_name)

# Prepare an input image (e.g., as pixel tensor of shape [1, 3, H, W])
image_sample_path = os.path.join(image_paths, train_dataset.texts[0]['image_path'][1])
image_sample = Image.open(image_sample_path)
inputs_after_training = image_processor_after_training(image_sample, return_tensors="pt")
outputs_after_training = swin_model_after_training(**inputs_after_training)
last_hidden_state_after_training = outputs_after_training.last_hidden_state

preprocessor_config.json:   0%|          | 0.00/255 [00:00<?, ?B/s]

In [None]:
last_hidden_state_after_training

tensor([[[-2.6363,  0.6361, -0.2943,  ..., -3.5945, -2.5090,  2.2054],
         [-0.6450,  0.7285, -1.0084,  ..., -2.3700,  0.6378,  0.2032],
         [-1.3251,  0.6057, -0.3762,  ..., -1.6838,  0.9445, -0.1109],
         ...,
         [-0.5691,  0.1459, -0.2711,  ..., -0.1910, -0.0324, -0.5937],
         [-0.6128,  0.0077, -0.2135,  ..., -0.1963, -0.1023, -0.7385],
         [ 0.2391, -1.3420, -0.2580,  ..., -0.7011, -1.5491, -1.0209]]],
       grad_fn=<NativeLayerNormBackward0>)

In [None]:
swin_model_before_training = AutoModel.from_pretrained(vision_model_name)
image_processor_before_training = AutoImageProcessor.from_pretrained(vision_model_name)

image_sample_path = os.path.join(image_paths, train_dataset.texts[10]['image_path'][1])
image_sample = Image.open(image_sample_path)
inputs_before_training = image_processor_before_training(image_sample, return_tensors="pt")
outputs_before_training = swin_model_before_training(**inputs_before_training)
last_hidden_state_before_training = outputs_before_training.last_hidden_state

In [None]:
last_hidden_state_before_training

tensor([[[ 0.6885, -0.1341,  0.0371,  ...,  0.5520, -0.4761, -0.1809],
         [ 1.6421,  0.4668, -0.7926,  ...,  0.6170, -0.9152,  0.0951],
         [-0.5252,  0.1861, -0.0294,  ...,  0.2185,  0.0766, -0.1176],
         ...,
         [ 0.5667, -0.2789, -0.2853,  ..., -1.1903, -0.7935, -1.4292],
         [-0.8639,  0.1013, -0.3427,  ...,  0.2189, -0.4068, -1.1835],
         [-0.3088, -0.1537,  0.8109,  ...,  0.2448, -0.7474,  0.1932]]],
       grad_fn=<NativeLayerNormBackward0>)

In [None]:
torch.all(last_hidden_state_before_training.eq(last_hidden_state_after_training))

tensor(False)