In [1]:
!pip install ftfy regex tqdm
!pip install git+https://github.com/openai/CLIP.git

Collecting ftfy
  Downloading ftfy-6.3.1-py3-none-any.whl.metadata (7.3 kB)
Downloading ftfy-6.3.1-py3-none-any.whl (44 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.8/44.8 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: ftfy
Successfully installed ftfy-6.3.1
Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to /tmp/pip-req-build-c272ksla
  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git /tmp/pip-req-build-c272ksla
  Resolved https://github.com/openai/CLIP.git to commit dcba3cb2e2827b402d2701e7e1c7d9fed8a20ef1
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch->clip==1.0)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch->clip==1.0)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-

In [2]:
import torch
from torch.utils.data import DataLoader, Dataset
import numpy as np
from datasets import load_dataset
import os
import random

The following code snippet prepares the RefCOCOg dataset
1. It loads the dataset from Hugging Face, then  filters it to include only samples with images physically present on the local system.
2. From these validated samples, it selects a random subset of 7000 entries,
3. Saves this prepared subset to disk for quick access.

In [3]:

# Step 2: Load RefCOCOg HF dataset
ds = load_dataset("jxu124/refcocog")

# Step 3: Define image root and compatibility filter
image_root = "/kaggle/input/coco-2014-dataset-for-yolov3/coco2014/images"

def image_exists(example):
    rel_path = example['image_path']
    if rel_path.startswith("coco/"):
        rel_path = rel_path[len("coco/"):]
    img_path = os.path.join(image_root, rel_path)
    return os.path.exists(img_path)

# Step 4: Filter for samples with locally available images
print("Filtering for present images—this may take a moment...")
valid_samples = ds['train'].filter(image_exists)

print(f"Found {len(valid_samples)} valid samples with matching images.")

# Step 5: Select a random subset (e.g., 1000 samples)
num_subset = 7000
random_indices = random.sample(range(len(valid_samples)), num_subset)
subset_ds = valid_samples.select(random_indices)

print(f"Subset ready: {len(subset_ds)} samples.")

# Step 6: (Optional) Save subset for later fast reloading
subset_ds.save_to_disk("refcocog_train_subset_3000")

# Your new train_dataset for the model
# from your RefCOCOgClipDataset class (as in your previous notebook):
# train_dataset = RefCOCOgClipDataset(subset_ds, image_root, augment=True)
# train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2)


README.md: 0.00B [00:00, ?B/s]

data/train-00000-of-00001-4fe3e6340cfb69(…):   0%|          | 0.00/39.1M [00:00<?, ?B/s]

data/validation-00000-of-00001-15168dfe7(…):   0%|          | 0.00/2.31M [00:00<?, ?B/s]

data/test-00000-of-00001-2316f36b19cd7f7(…):   0%|          | 0.00/4.59M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/42226 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/2573 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/5023 [00:00<?, ? examples/s]

Filtering for present images—this may take a moment...


Filter:   0%|          | 0/42226 [00:00<?, ? examples/s]

Found 42226 valid samples with matching images.
Subset ready: 7000 samples.


Saving the dataset (0/1 shards):   0%|          | 0/7000 [00:00<?, ? examples/s]

Following code snippet verifies the local presence of image files referenced in a dataset subset. It iterates through each sample, constructs the full image path, and checks if the file exists, reporting any missing files to ensure data integrity before further processing.

In [4]:
# Assumptions:
# - subset_ds: HuggingFace Dataset containing the 3,000 subset samples
# - image_root: Your COCO images root directory
def check_correspondence(subset_ds, image_root):
    all_present = True
    missing_files = []
    for i, example in enumerate(subset_ds):
        rel_path = example['image_path']
        if rel_path.startswith("coco/"):
            rel_path = rel_path[len("coco/"):]
        img_path = os.path.join(image_root, rel_path)
        if not os.path.exists(img_path):
            all_present = False
            missing_files.append(img_path)
            print(f"Missing file for index {i}: {img_path}")
    print(f"\nChecked {len(subset_ds)} samples.")
    if all_present:
        print("All image files for the subset are present locally.")
    else:
        print(f"{len(missing_files)} image files are missing in your local dataset.")

# Run the check on your current subset
check_correspondence(subset_ds, image_root)
print("done")


Checked 7000 samples.
All image files for the subset are present locally.
done


Following code snippet filters the dataset splits for existing images, then randomly samples and saves subsets for training, validation, and testing, ensuring a balanced distribution and local availability of data for model development.

In [5]:
import random

# Step 1: Filter for present images in each split
train_valid_samples = ds['train'].filter(image_exists)
val_valid_samples = ds['validation'].filter(image_exists)
test_valid_samples = ds['test'].filter(image_exists)

num_total = 7000

# Step 2: Calculate subset sizes (adjust as needed)
num_train = int(num_total * 0.7)
num_val = int(num_total * 0.15)
num_test = num_total - num_train - num_val  # ensures sum = 3000

# Step 3: Sample subsets from each split (do not exceed available samples)
num_train = min(num_train, len(train_valid_samples))
num_val = min(num_val, len(val_valid_samples))
num_test = min(num_test, len(test_valid_samples))

train_indices = random.sample(range(len(train_valid_samples)), num_train)
val_indices = random.sample(range(len(val_valid_samples)), num_val)
test_indices = random.sample(range(len(test_valid_samples)), num_test)

train_subset_ds = train_valid_samples.select(train_indices)
val_subset_ds = val_valid_samples.select(val_indices)
test_subset_ds = test_valid_samples.select(test_indices)

# Step 4: Save subsets to disk
train_subset_ds.save_to_disk("refcocog_train_subset")
val_subset_ds.save_to_disk("refcocog_val_subset")
test_subset_ds.save_to_disk("refcocog_test_subset")

print(f"Train subset size: {len(train_subset_ds)}")
print(f"Val subset size: {len(val_subset_ds)}")
print(f"Test subset size: {len(test_subset_ds)}")
print("Saved all subsets.")


Filter:   0%|          | 0/2573 [00:00<?, ? examples/s]

Filter:   0%|          | 0/5023 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/4900 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/1050 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/1050 [00:00<?, ? examples/s]

Train subset size: 4900
Val subset size: 1050
Test subset size: 1050
Saved all subsets.


Following code defines RefCOCOgClipDataset, a custom PyTorch dataset class that prepares image-text pairs and bounding box annotations for a machine learning model. For each data example,
1. it loads and preprocesses images using CLIP's transforms,
2. tokenizes the associated text query,
3. normalizes the bounding box coordinates, making the data ready for model input.

In [6]:
from torch.utils.data import Dataset
from PIL import Image
import os
import torch
import clip

class RefCOCOgClipDataset(Dataset):
    def __init__(self, hf_dataset, image_root_dir, clip_model_name="ViT-B/32", device="cpu", augment=False):
        self.dataset = hf_dataset
        self.image_root_dir = image_root_dir
        self.augment = augment
        self.device = device
        self.clip_model, self.clip_preprocess = clip.load(clip_model_name, device=device)
        # No augmentation means only default preprocessing

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        example = self.dataset[idx]
        rel_path = example['image_path']
        if rel_path.startswith("coco/"):
            rel_path = rel_path[len("coco/"):]
        img_path = os.path.join(self.image_root_dir, rel_path)
        image = Image.open(img_path).convert('RGB')
        width, height = image.size

        # Get the query text from captions or sentences fields
        if 'captions' in example and example['captions']:
            query = example['captions'][0]
        elif 'sentences' in example and 'sent' in example['sentences']:
            query = example['sentences']['sent']
        else:
            query = ""

        clip_text = clip.tokenize([query], truncate=True).squeeze(0)

        # Normalize bbox coordinates
        x, y, w, h = example['bbox']
        x_min = x / width
        y_min = y / height
        x_max = (x + w) / width
        y_max = (y + h) / height
        norm_bbox = torch.tensor([x_min, y_min, x_max, y_max], dtype=torch.float)

        # Preprocess image using CLIP's preprocess function
        clip_image = self.clip_preprocess(image)

        return {
            'clip_image': clip_image,
            'clip_text': clip_text,
            'query_text': query,
            'bbox': norm_bbox,
            'orig_size': (width, height),
            'image_path': img_path
        }
print("done")

done


Following code snippet instantiates dataset objects for training, validation, and testing using the RefCOCOgClipDataset class, then creates corresponding data loaders to efficiently batch and shuffle data for model training and evaluation.

In [7]:
train_dataset = RefCOCOgClipDataset(train_subset_ds, image_root)
val_dataset = RefCOCOgClipDataset(val_subset_ds, image_root)
test_dataset = RefCOCOgClipDataset(test_subset_ds, image_root)

from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=2)
print("done")

100%|███████████████████████████████████████| 338M/338M [00:13<00:00, 27.0MiB/s]


done


Following code defines the CrossModalClipFusionModel class, which is the core neural network for scene localization. 
1. It integrates a frozen CLIP backbone for extracting image and text features,
2. uses linear projections and a Transformer Encoder for cross-modal fusion
3. employs a feed-forward network to predict bounding box coordinates.
4. The snippet then instantiates this model, preparing it for training or inference.

In [8]:
import torch
import torch.nn as nn
import clip

class CrossModalClipFusionModel(nn.Module):
    def __init__(self, clip_model_name="ViT-B/32", fusion_hidden_dim=512, transformer_layers=2, nhead=8, device="cuda"):
        super().__init__()
        self.device = device
        self.clip_model, _ = clip.load(clip_model_name, device=device)
        for param in self.clip_model.parameters():
            param.requires_grad = False  # freeze CLIP backbone

        self.visual_proj = nn.Linear(512, fusion_hidden_dim)
        self.text_proj = nn.Linear(512, fusion_hidden_dim)

        encoder_layer = nn.TransformerEncoderLayer(d_model=fusion_hidden_dim, nhead=nhead)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=transformer_layers)

        self.bbox_head = nn.Sequential(
            nn.Linear(fusion_hidden_dim, fusion_hidden_dim // 2),
            nn.ReLU(),
            nn.Linear(fusion_hidden_dim // 2, 4)
        )

    def forward(self, clip_image, clip_text):
        image_feat = self.clip_model.encode_image(clip_image).float()
        text_feat = self.clip_model.encode_text(clip_text).float()
        v_proj = self.visual_proj(image_feat)
        t_proj = self.text_proj(text_feat)
        fuse = torch.stack([v_proj, t_proj], dim=0)
        fuse = self.transformer_encoder(fuse)
        joint_emb = fuse.mean(dim=0)
        bbox_pred = self.bbox_head(joint_emb)
        bbox_pred = torch.sigmoid(bbox_pred)  # normalized bbox 0~1
        return bbox_pred

# Instantiate model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CrossModalClipFusionModel(device=device).to(device)
print("done")

done




Following code defines a custom loss function for bounding box regression, combining two key components. 
1. It includes bbox_iou, a utility function to calculate the Intersection over Union (IoU) between predicted and ground truth bounding boxes.
2. The main bbox_mixed_loss then combines Smooth L1 Loss (for standard regression accuracy) with a negative log IoU loss (to directly optimize for better overlap), providing a robust metric for training object localization models.

In [10]:
import torch.nn.functional as F

def bbox_iou(box1, box2, eps=1e-6):
    x1 = torch.max(box1[..., 0], box2[..., 0])
    y1 = torch.max(box1[..., 1], box2[..., 1])
    x2 = torch.min(box1[..., 2], box2[..., 2])
    y2 = torch.min(box1[..., 3], box2[..., 3])
    inter_area = (x2 - x1).clamp(min=0) * (y2 - y1).clamp(min=0)
    box1_area = (box1[..., 2] - box1[..., 0]).clamp(min=0) * (box1[..., 3] - box1[..., 1]).clamp(min=0)
    box2_area = (box2[..., 2] - box2[..., 0]).clamp(min=0) * (box2[..., 3] - box2[..., 1]).clamp(min=0)
    union_area = box1_area + box2_area - inter_area + eps
    return inter_area / union_area

def bbox_mixed_loss(pred_bboxes, gt_bboxes, alpha=1.0, beta=2.0):
    reg_loss = F.smooth_l1_loss(pred_bboxes, gt_bboxes)
    ious = bbox_iou(pred_bboxes, gt_bboxes)
    iou_loss = -torch.log(ious + 1e-6).mean()
    return alpha * reg_loss + beta * iou_loss
criterion = bbox_mixed_loss
print("done")

done


Following code snippet implements the main training loop for the neural network. 
1. It initializes an AdamW optimizer and a learning rate scheduler, then
2. iteratively trains the model for a set number of epochs.
3. Within each epoch, it calculates training and validation losses, performs backpropagation and optimization, and incorporates early stopping to save the best-performing model based on validation loss, ensuring efficient and effective training.

In [11]:
import torch
import copy
from tqdm import tqdm

# --- ADVANCED OPTIMIZER ---
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-2)  # typical wd for AdamW

# --- LEARNING RATE SCHEDULER ---
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=2, verbose=True
)

num_epochs = 10  # max epochs
patience = 3     # for early stopping
best_val_loss = float('inf')
epochs_no_improve = 0
best_model_wts = copy.deepcopy(model.state_dict())

for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    loop = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")
    for batch_idx, batch in enumerate(loop):
        clip_images = batch['clip_image'].to(device)
        clip_texts = batch['clip_text'].to(device)
        gt_bboxes  = batch['bbox'].to(device)

        optimizer.zero_grad()
        pred_bboxes = model(clip_images, clip_texts)
        loss = criterion(pred_bboxes, gt_bboxes)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        avg_loss_so_far = total_loss / (batch_idx + 1)
        loop.set_postfix(batch_loss=loss.item(), avg_loss=avg_loss_so_far)

    print(f"Epoch [{epoch+1}/{num_epochs}] Training Loss: {avg_loss_so_far:.4f}")

    # Validation loss computation
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch in val_loader:
            clip_images = batch['clip_image'].to(device)
            clip_texts = batch['clip_text'].to(device)
            gt_bboxes  = batch['bbox'].to(device)
            pred_bboxes = model(clip_images, clip_texts)
            val_loss = criterion(pred_bboxes, gt_bboxes)
            val_losses.append(val_loss.item())
    avg_val_loss = sum(val_losses) / len(val_losses)
    print(f"Epoch [{epoch+1}/{num_epochs}] Validation Loss: {avg_val_loss:.4f}")

    scheduler.step(avg_val_loss)  # --- STEP THE SCHEDULER ---

    # Early stopping check
    if avg_val_loss < best_val_loss - 1e-4:  # minimal improvement threshold
        best_val_loss = avg_val_loss
        epochs_no_improve = 0
        best_model_wts = copy.deepcopy(model.state_dict())
        print(f"Validation loss improved, saving best model at epoch {epoch+1}.")
    else:
        epochs_no_improve += 1
        print(f"No improvement in validation loss for {epochs_no_improve} epochs.")
        if epochs_no_improve >= patience:
            print("Early stopping triggered.")
            break

# Load best model weights after training
model.load_state_dict(best_model_wts)
print("Training complete. Best model loaded.")


Epoch 1/10: 100%|██████████| 307/307 [00:48<00:00,  6.38it/s, avg_loss=3.21, batch_loss=2.03]

Epoch [1/10] Training Loss: 3.2111





Epoch [1/10] Validation Loss: 2.9788
Validation loss improved, saving best model at epoch 1.


Epoch 2/10: 100%|██████████| 307/307 [00:30<00:00, 10.18it/s, avg_loss=2.95, batch_loss=2.36]

Epoch [2/10] Training Loss: 2.9516





Epoch [2/10] Validation Loss: 2.9398
Validation loss improved, saving best model at epoch 2.


Epoch 3/10: 100%|██████████| 307/307 [00:30<00:00, 10.00it/s, avg_loss=2.91, batch_loss=3.82]

Epoch [3/10] Training Loss: 2.9060





Epoch [3/10] Validation Loss: 2.9628
No improvement in validation loss for 1 epochs.


Epoch 4/10: 100%|██████████| 307/307 [00:31<00:00,  9.77it/s, avg_loss=2.86, batch_loss=2.33]

Epoch [4/10] Training Loss: 2.8613





Epoch [4/10] Validation Loss: 2.9168
Validation loss improved, saving best model at epoch 4.


Epoch 5/10: 100%|██████████| 307/307 [00:30<00:00, 10.06it/s, avg_loss=2.86, batch_loss=2.83]

Epoch [5/10] Training Loss: 2.8602





Epoch [5/10] Validation Loss: 2.9328
No improvement in validation loss for 1 epochs.


Epoch 6/10: 100%|██████████| 307/307 [00:30<00:00, 10.01it/s, avg_loss=2.87, batch_loss=3.6] 

Epoch [6/10] Training Loss: 2.8727





Epoch [6/10] Validation Loss: 3.0420
No improvement in validation loss for 2 epochs.


Epoch 7/10: 100%|██████████| 307/307 [00:30<00:00, 10.18it/s, avg_loss=2.9, batch_loss=2.8]  

Epoch [7/10] Training Loss: 2.8953





Epoch [7/10] Validation Loss: 2.9410
No improvement in validation loss for 3 epochs.
Early stopping triggered.
Training complete. Best model loaded.


Following code snippet handles saving and loading the trained model's weights.

In [12]:
# Save the best model weights to disk
torch.save(model.state_dict(), "best_model.pt")
print("Saved best model weights to 'best_model.pt'")
# Load the best model weights from disk
model.load_state_dict(torch.load("best_model.pt"))
model.eval()
print("Best model weights loaded.")

Saved best model weights to 'best_model.pt'
Best model weights loaded.


This code snippet provides utility functions for evaluating object detection models. 
It includes 
1. safe_unpack_orig_size for safely extracting image dimensions,
2. denormalize_bbox to convert normalized bounding box predictions back to original image coordinates,
3. box_iou to calculate the Intersection over Union metric.
4. The main evaluate_model_iou function then uses these utilities to compute the mean IoU of a model's predictions against ground truth bounding boxes over a given dataset.

In [14]:
def safe_unpack_orig_size(orig_size):
    import numpy as np
    if isinstance(orig_size, torch.Tensor):
        # Convert tensor to numpy array and flatten
        orig_size = orig_size.detach().cpu().numpy().flatten()
    
    # Check if the array has at least two elements
    if len(orig_size) < 2:
        return 0, 0
    
    return int(orig_size[0]), int(orig_size[1])

def denormalize_bbox(bbox, width, height):
    import numpy as np
    if isinstance(bbox, torch.Tensor):
        bbox = bbox.detach().cpu().numpy()
    bbox = np.array(bbox).flatten()[:4]
    return [
        float(bbox[0]) * width,
        float(bbox[1]) * height,
        float(bbox[2]) * width,
        float(bbox[3]) * height
    ]

def box_iou(box1, box2):
    x1 = float(max(box1[0], box2[0]))
    y1 = float(max(box1[1], box2[1]))
    x2 = float(min(box1[2], box2[2]))
    y2 = float(min(box1[3], box2[3]))
    inter_w = max(0., x2 - x1)
    inter_h = max(0., y2 - y1)
    inter_area = inter_w * inter_h
    box1_area = max(0., box1[2] - box1[0]) * max(0., box1[3] - box1[1])
    box2_area = max(0., box2[2] - box2[0]) * max(0., box2[3] - box2[1])
    union_area = box1_area + box2_area - inter_area + 1e-6
    if union_area == 0.:
        return 0.
    return inter_area / union_area

def evaluate_model_iou(model, data_loader, device, max_batches=None):
    model.eval()
    all_ious = []
    import numpy as np
    with torch.no_grad():
        for batch_idx, batch in enumerate(data_loader):
            clip_images = batch['clip_image'].to(device)
            clip_texts = batch['clip_text'].to(device)
            pred_bboxes = model(clip_images, clip_texts)
            gt_bboxes = batch['bbox']
            orig_sizes = batch['orig_size']
            
            # Use zip for safe iteration over batch elements
            for pred_box_norm, gt_box_norm, orig_size_item in zip(pred_bboxes, gt_bboxes, orig_sizes):
                width, height = safe_unpack_orig_size(orig_size_item)
                
                # Denormalize bounding boxes
                pred_box_list = denormalize_bbox(pred_box_norm, width, height)
                gt_box_list = denormalize_bbox(gt_box_norm, width, height)
                
                # Calculate IoU and append
                iou = box_iou(pred_box_list, gt_box_list)
                all_ious.append(iou)

            if max_batches is not None and batch_idx + 1 >= max_batches:
                break
    
    mean_iou = np.mean(all_ious) if all_ious else 0.0
    print(f"Mean IoU: {mean_iou:.4f} on {len(all_ious)} samples")
    return mean_iou

# Example usage to demonstrate a successful run:
# # Load best model weights before evaluation
# model.load_state_dict(torch.load("best_model.pt"))
# model.eval()
#
# # Evaluate on validation set
# val_mean_iou = evaluate_model_iou(model, val_loader, device)
# print("Validation Mean IoU:", val_mean_iou)
print("done")

done


Following code snippet evaluates the trained model's performance on both validation and test datasets. It calculates and prints the Mean IoU (Intersection over Union) for both the validation and test sets, providing a quantitative assessment of the model's localization accuracy.

In [29]:
# Load best model weights before evaluation
model.load_state_dict(torch.load("best_model.pt"))
model.eval()

# Evaluate on validation set
val_mean_iou = evaluate_model_iou(model, val_loader, device)
print("Validation Mean IoU:", val_mean_iou)

# Evaluate on test set
test_mean_iou = evaluate_model_iou(model, test_loader, device)
print("Test Mean IoU:", test_mean_iou)

Mean IoU: 0.2896 on 132 samples
Validation Mean IoU: 0.289642835112249
Mean IoU: 0.2981 on 132 samples
Test Mean IoU: 0.2981048999688267


Following code defines a function visualize_prediction_and_crop that takes a trained model, an image path, and a text query to visualize the model's object localization. 
1. It loads and preprocesses the image and text,
2. uses the model to predict a bounding box,
3. denormalizes the coordinates,
4. draws the box on the original image,
5. crops the region.
6. The snippet then demonstrates its usage by applying the function to a specific image and query, saving the results as PNG files.

In [None]:
import torch
from PIL import Image, ImageDraw
import numpy as np
import clip

def visualize_prediction_and_crop(model, clip_preprocess, device, image_path, text_query):
    """
    Takes a model, image path, and text query, and returns the cropped image
    along with a visualization of the predicted bounding box.

    Args:
        model: The trained CrossModalClipFusionModel.
        clip_preprocess: The CLIP image preprocessing function.
        device: The device to run the model on ('cuda' or 'cpu').
        image_path (str): The path to the input image.
        text_query (str): The text description of the object to find.

    Returns:
        image_with_box (PIL.Image): The original image with the bounding box drawn.
        cropped_image (PIL.Image): The cropped region of the image.
    """
    # 1. Load and preprocess the image and text
    image = Image.open(image_path).convert('RGB')
    width, height = image.size
    
    clip_image = clip_preprocess(image).unsqueeze(0).to(device)
    clip_text = clip.tokenize([text_query], truncate=True).to(device)

    # 2. Get the model's bounding box prediction
    model.eval()
    with torch.no_grad():
        pred_bbox_norm = model(clip_image, clip_text)
    
    # 3. Denormalize the predicted bounding box
    x_min_norm, y_min_norm, x_max_norm, y_max_norm = pred_bbox_norm[0].cpu().numpy()
    
    x_min = int(x_min_norm * width)
    y_min = int(y_min_norm * height)
    x_max = int(x_max_norm * width)
    y_max = int(y_max_norm * height)
    
    # Ensure coordinates are within image bounds
    x_min = max(0, x_min)
    y_min = max(0, y_min)
    x_max = min(width, x_max)
    y_max = min(height, y_max)

    print("Predicted Bounding Box (x_min, y_min, x_max, y_max):", (x_min, y_min, x_max, y_max))

    # 4. Create a copy to draw on and the cropped image
    image_with_box = image.copy()
    draw = ImageDraw.Draw(image_with_box)
    draw.rectangle([x_min, y_min, x_max, y_max], outline='red', width=3)
    
    # 5. Crop the image based on the predicted bounding box
    cropped_image = image.crop((x_min, y_min, x_max, y_max))

    return image_with_box, cropped_image

# Assume 'model', 'clip_preprocess', and 'device' are already defined from previous steps
# The following code block will run the test on the specified image.

# Image path and text query
image_path = "/kaggle/input/coco-2014-dataset-for-yolov3/coco2014/images/test2014/COCO_test2014_000000000016.jpg"
text_query = "a baseball player swinging a bat"

# Run the function
try:
    image_with_box, cropped_region = visualize_prediction_and_crop(
        model=model,
        clip_preprocess=test_dataset.clip_preprocess,
        device=device,
        image_path=image_path,
        text_query=text_query
    )
    
    # Display the results
    image_with_box.save("prediction_visualization.png")
    cropped_region.save("cropped_region.png")
    print("Saved visualization to 'prediction_visualization.png' and cropped region to 'cropped_region.png'")

except Exception as e:
    print(f"An error occurred: {e}")

In [16]:
import torch
from PIL import Image, ImageDraw
import numpy as np
import clip

def visualize_prediction_and_crop(model, clip_preprocess, device, image_path, text_query):
    """
    Takes a model, image path, and text query, and returns the cropped image
    along with a visualization of the predicted bounding box.

    Args:
        model: The trained CrossModalClipFusionModel.
        clip_preprocess: The CLIP image preprocessing function.
        device: The device to run the model on ('cuda' or 'cpu').
        image_path (str): The path to the input image.
        text_query (str): The text description of the object to find.

    Returns:
        image_with_box (PIL.Image): The original image with the bounding box drawn.
        cropped_image (PIL.Image): The cropped region of the image.
    """
    # 1. Load and preprocess the image and text
    image = Image.open(image_path).convert('RGB')
    width, height = image.size
    
    clip_image = clip_preprocess(image).unsqueeze(0).to(device)
    clip_text = clip.tokenize([text_query], truncate=True).to(device)

    # 2. Get the model's bounding box prediction
    model.eval()
    with torch.no_grad():
        pred_bbox_norm = model(clip_image, clip_text)
    
    # 3. Denormalize the predicted bounding box
    x_min_norm, y_min_norm, x_max_norm, y_max_norm = pred_bbox_norm[0].cpu().numpy()
    
    x_min = int(x_min_norm * width)
    y_min = int(y_min_norm * height)
    x_max = int(x_max_norm * width)
    y_max = int(y_max_norm * height)
    
    # Ensure coordinates are within image bounds
    x_min = max(0, x_min)
    y_min = max(0, y_min)
    x_max = min(width, x_max)
    y_max = min(height, y_max)

    print("Predicted Bounding Box (x_min, y_min, x_max, y_max):", (x_min, y_min, x_max, y_max))

    # 4. Create a copy to draw on and the cropped image
    image_with_box = image.copy()
    draw = ImageDraw.Draw(image_with_box)
    draw.rectangle([x_min, y_min, x_max, y_max], outline='red', width=3)
    
    # 5. Crop the image based on the predicted bounding box
    cropped_image = image.crop((x_min, y_min, x_max, y_max))

    return image_with_box, cropped_image

# Assume 'model', 'clip_preprocess', and 'device' are already defined from previous steps
# The following code block will run the test on the specified image.

# Image path and text query
image_path = "/kaggle/input/coco-2014-dataset-for-yolov3/coco2014/images/test2014/COCO_test2014_000000000069.jpg"
text_query = "clown dancing"

# Run the function
try:
    image_with_box, cropped_region = visualize_prediction_and_crop(
        model=model,
        clip_preprocess=test_dataset.clip_preprocess,
        device=device,
        image_path=image_path,
        text_query=text_query
    )
    
    # Display the results
    image_with_box.save("prediction_visualization.png")
    cropped_region.save("cropped_region.png")
    print("Saved visualization to 'prediction_visualization.png' and cropped region to 'cropped_region.png'")

except Exception as e:
    print(f"An error occurred: {e}")

Predicted Bounding Box (x_min, y_min, x_max, y_max): (24, 56, 639, 428)
Saved visualization to 'prediction_visualization.png' and cropped region to 'cropped_region.png'
