# Get Pretrained Models
Okay

In [15]:
!mkdir -p checkpoints/

!wget https://docs-assets.developer.apple.com/ml-research/datasets/mobileclip/mobileclip_s0.pt -P checkpoints

--2025-04-30 18:31:27--  https://docs-assets.developer.apple.com/ml-research/datasets/mobileclip/mobileclip_s0.pt
Resolving docs-assets.developer.apple.com (docs-assets.developer.apple.com)... 17.253.73.202, 17.253.73.201
Connecting to docs-assets.developer.apple.com (docs-assets.developer.apple.com)|17.253.73.202|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 215934653 (206M) [application/octet-stream]
Saving to: ‘checkpoints/mobileclip_s0.pt’


2025-04-30 18:32:07 (5,34 MB/s) - ‘checkpoints/mobileclip_s0.pt’ saved [215934653/215934653]



# Create the models

In [None]:
!pip install torch
!pip install torchvision
!pip install timm
!pip install open-clip-torch
!pip install datasets
!pip install clip-benchmark

# Model Testing

In [49]:
import torch
import torch.nn.functional as F
from torch import optim
from torch.utils.data import Dataset, DataLoader
import os
import json
from PIL import Image
from tqdm import tqdm
import mobileclip
import random

# --- Configuration ---
# Using relative paths for better portability
BASE_DATA_DESTINATION = os.path.join(os.getcwd(), "data")
FLICKR8K_IMAGES_FOLDER_NAME = "Images"
CAPTIONS_JSON_FILENAME = "captions.json"
CHECKPOINT_DIR = os.path.join(os.getcwd(), "checkpoints")
BATCH_SIZE = 32
LEARNING_RATE = 1e-4
NUM_EPOCHS = 1
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the MobileCLIP model
model_path = os.path.join(CHECKPOINT_DIR, 'mobileclip_s0.pt')
model, _, preprocess = mobileclip.create_model_and_transforms(
    'mobileclip_s0', 
    pretrained=model_path
)
model.to(DEVICE)

# Load the tokenizer
tokenizer = mobileclip.get_tokenizer('mobileclip_s0')


image = preprocess(Image.open("/Users/erenyavuz/Desktop/KU/25 Spring/COMP447/Project/Repo/FlightVision/pngwing.com.png").convert('RGB')).unsqueeze(0)
text = tokenizer(["a brown dog", "a white dog", "a black dog"])

with torch.no_grad(), torch.cuda.amp.autocast():
    image_features = model.encode_image(image)
    text_features = model.encode_text(text)
    image_features /= image_features.norm(dim=-1, keepdim=True)
    text_features /= text_features.norm(dim=-1, keepdim=True)

    text_probs = (100.0 * image_features @ text_features.T).softmax(dim=-1)

# Set the print options for PyTorch to avoid scientific notation and limit decimal places
torch.set_printoptions(sci_mode=False, precision=4)

print("Label probs:", text_probs)

Label probs: tensor([[    0.9936,     0.0004,     0.0060]])


# Changing the Positional Embeddings

In [50]:
print(model.get_positional_embedding() )

def get_positional_embedding(self, lambda2: int = 4):
    """
    Get modified positional embedding for text encoder based on the given formula.
    """
    pos_embed = self.text_encoder.get_positional_embedding().pos_embed.pos_embed
    if pos_embed is None:
        raise ValueError("Positional embedding not found in text encoder.")

    max_pos, embed_dim = pos_embed.shape[2], pos_embed.shape[3]
    modified_pos_embed = torch.zeros((1, 1, max_pos, embed_dim), device=pos_embed.device)

    for pos in range(max_pos):
        if pos <= 20:
            modified_pos_embed[:, :, pos, :] = pos_embed[:, :, pos, :]
        else:
            lower_idx = pos // lambda2
            upper_idx = min(lower_idx + 1, max_pos - 1)  # Ensure upper_idx is within bounds
            alpha = (pos % lambda2) / lambda2
            modified_pos_embed[:, :, pos, :] = (1 - alpha) * pos_embed[:, :, lower_idx, :] + alpha * pos_embed[:, :, upper_idx, :]
    # turn the torch tensor into nn parameter
    modified_pos_embed = torch.nn.Parameter(modified_pos_embed, requires_grad=False)
    return modified_pos_embed

# Example usage
lambda2 = 4
new_pos_embed = get_positional_embedding(model, lambda2)
print("Modified Positional Embedding:", new_pos_embed)

# set the models pos embedding to the new one
model.text_encoder.get_positional_embedding().pos_embed.pos_embed = new_pos_embed


LearnablePositionalEmbedding(num_embeddings=77, embedding_dim=512, padding_idx=None)
Modified Positional Embedding: Parameter containing:
tensor([[[[     0.0000,      0.0000,      0.0000,  ...,      0.0000,
                0.0000,      0.0000],
          [     0.0041,      0.0016,     -0.0007,  ...,      0.0007,
                0.0036,     -0.0076],
          [     0.0077,      0.0026,      0.0012,  ...,      0.0001,
                0.0013,     -0.0043],
          ...,
          [     0.0075,      0.0051,      0.0000,  ...,      0.0011,
                0.0001,     -0.0003],
          [     0.0031,      0.0068,     -0.0008,  ...,      0.0024,
               -0.0002,     -0.0025],
          [    -0.0013,      0.0086,     -0.0016,  ...,      0.0036,
               -0.0004,     -0.0046]]]])


# Testing the model after changing the positional embeddings

In [51]:
image = preprocess(Image.open("/Users/erenyavuz/Desktop/KU/25 Spring/COMP447/Project/Repo/FlightVision/pngwing.com.png").convert('RGB')).unsqueeze(0)
text = tokenizer(["a brown dog", "a white dog", "a black dog"])

with torch.no_grad(), torch.cuda.amp.autocast():
    image_features = model.encode_image(image)
    text_features = model.encode_text(text)
    image_features /= image_features.norm(dim=-1, keepdim=True)
    text_features /= text_features.norm(dim=-1, keepdim=True)

    text_probs = (100.0 * image_features @ text_features.T).softmax(dim=-1)

# Set the print options for PyTorch to avoid scientific notation and limit decimal places
torch.set_printoptions(sci_mode=False, precision=4)

print("Label probs:", text_probs)

Label probs: tensor([[0.4203, 0.3212, 0.2585]])


# Downloading the captioned images


In [7]:
# --- Configuration ---
import os

# Using relative paths for better portability
# This creates a 'data' directory in the project folder
BASE_DATA_DESTINATION = os.path.join(os.getcwd(), "data")
KAGGLE_FLICKR8K_URL = "https://www.kaggle.com/api/v1/datasets/download/adityajn105/flickr8k"
FLICKR8K_ZIP_FILENAME = "flickr8k.zip"
FLICKR8K_IMAGES_FOLDER_NAME = "Images"
CAPTIONS_CSV_FILENAME = "captions.csv"
OUTPUT_FOLDER_NAME = "output"

import zipfile
import requests
import io

def download_file(url: str, destination_path: str):
    print(f"Downloading from {url} to {destination_path}...")
    try:
        response = requests.get(url, stream=True)
        response.raise_for_status()
        with open(destination_path, "wb") as file:
            for chunk in response.iter_content(chunk_size=1024):
                if chunk:
                    file.write(chunk)
        print("Download complete.")
    except Exception as e:
        print(f"Error: {e}")
        raise

def extract_zip_file(zip_path: str, destination_folder: str):
    print(f"Extracting {zip_path} to {destination_folder}...")
    try:
        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            zip_ref.extractall(destination_folder)
        print("Extraction complete.")
    except Exception as e:
        print(f"Extraction error: {e}")
        raise

def setup_data_directory(base_data_path: str):
    images_path = os.path.join(base_data_path, FLICKR8K_IMAGES_FOLDER_NAME)
    output_path = os.path.join(base_data_path, OUTPUT_FOLDER_NAME)
    os.makedirs(base_data_path, exist_ok=True)
    os.makedirs(output_path, exist_ok=True)
    return base_data_path, images_path, output_path


print(f"Setting up data directories in: {BASE_DATA_DESTINATION}")
base_dir, images_dir, output_dir = setup_data_directory(BASE_DATA_DESTINATION)

zip_file_path = os.path.join(base_dir, FLICKR8K_ZIP_FILENAME)

if not os.path.exists(images_dir):
    print("Images not found. Attempting download...")
    try:
        download_file(KAGGLE_FLICKR8K_URL, zip_file_path)
        extract_zip_file(zip_file_path, base_dir)
        os.remove(zip_file_path)
    except Exception as e:
        print(f"Failed to set up dataset: {e}")
        raise FileNotFoundError(f"Please manually download and extract to: {base_dir}")
else:
    print(f"Images already exist at {images_dir}.")

Setting up data directories in: /Users/erenyavuz/Desktop/KU/25 Spring/COMP447/Project/Repo/FlightVision/data
Images already exist at /Users/erenyavuz/Desktop/KU/25 Spring/COMP447/Project/Repo/FlightVision/data/Images.


# Train the model using the downloaded images and custom captions

Importing the required libraries
setting parameters and lookups


## Load the dataset

In [None]:
# Custom Dataset for Flickr8k with the specific JSON caption format
class Flickr8kCaptionedDataset(Dataset):
    def __init__(self, image_dir, captions_file, preprocess_fn, pull_from_json=True):
        self.image_dir = image_dir
        self.preprocess_fn = preprocess_fn
        
        self.num_samples = 0
        # Create list of samples
        self.samples = []
        
        if pull_from_json:
            # Load captions from JSON file
            with open(captions_file, 'r') as f:
                self.captions_data = json.load(f)
            
            # Process JSON with format {"image.jpg": {"long_caption": "...", "short_caption": "..."}, ...}
            for image_name, captions in self.captions_data.items():
                if "long_caption" in captions and "short_caption" in captions:
                    #if image is not in the image directory, skip
                    image_path = os.path.join(self.image_dir, image_name)
                    if not os.path.exists(image_path):
                        print(f"Image {image_path} not found, skipping.")
                        continue
                    # Add both caption types for each image
                    self.samples.append((image_name, captions["long_caption"], "long"))
                    self.samples.append((image_name, captions["short_caption"], "short"))
        else:
            # Use the default Flickr8k captions file
            captions_file = "data/captions.txt"
            with open(captions_file, 'r') as f:
                lines = f.readlines()
                
            # Process the standard Flickr8k format 
            # Typically each line has format: "image_name#caption" or "image_name,caption"
            for line in lines:
                line = line.strip()
                if line:
                    # Try to split by common delimiters
                    if '#' in line:
                        parts = line.split('#', 1)
                    else:
                        parts = line.split(',', 1)
                        
                    if len(parts) == 2:
                        image_name, caption = parts
                        print(f"Image name: {image_name.strip()}, Caption: {caption.strip()}")
                        # Add this check before appending to self.samples
                        image_path = os.path.join(self.image_dir, image_name.strip())
                        if not os.path.exists(image_path):
                            continue  # Skip this caption if image doesn't exist
                        self.samples.append((image_name.strip(), caption.strip(), "standard"))
                        self.num_samples += 1
                        
        print(f"Loaded {len(self.samples)} samples from {captions_file}.")
        
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        image_name, caption, caption_type = self.samples[idx]
        image_path = os.path.join(self.image_dir, image_name)
        
        # Load and preprocess the image
        try:
            image = Image.open(image_path).convert('RGB')
            image = self.preprocess_fn(image)
        except Exception as e:
            print(f"Error loading image {image_path}: {e}")
            # Return a random valid sample instead
            return self.__getitem__(random.randint(0, len(self) - 1))
        
        return image, caption, caption_type
    
    def __reduce__(self):
        return (self.__class__, (self.image_dir, self.captions_file, self.preprocess_fn, True))


## Loss function

In [28]:
# Contrastive Loss Function
def contrastive_loss(image_embeds, text_embeds, temperature=0.07):
    # Normalize embeddings
    image_embeds = F.normalize(image_embeds, dim=1)
    text_embeds = F.normalize(text_embeds, dim=1)
    
    # Compute similarity matrix
    logits = torch.matmul(image_embeds, text_embeds.T) / temperature
    
    # Labels are the positions of the positive pairs
    labels = torch.arange(logits.size(0), device=logits.device)
    
    # Compute loss in both directions (image->text and text->image)
    loss_i2t = F.cross_entropy(logits, labels)
    loss_t2i = F.cross_entropy(logits.T, labels)
    
    return (loss_i2t + loss_t2i) / 2

## Training the model

In [None]:
# Set up dataset and dataloader
images_dir = os.path.join(BASE_DATA_DESTINATION, FLICKR8K_IMAGES_FOLDER_NAME)
captions_file = os.path.join(BASE_DATA_DESTINATION, CAPTIONS_JSON_FILENAME)

pull_from_json = True

# Check if files exist
if not os.path.exists(images_dir):
    raise FileNotFoundError(f"Images directory not found: {images_dir}")
if not os.path.exists(captions_file):
    print(f"Captions file not found: {captions_file}")
    pull_from_json = False

# Create dataset and dataloader
dataset = Flickr8kCaptionedDataset(images_dir, captions_file, preprocess, pull_from_json=pull_from_json)
dataloader = DataLoader(
    dataset, 
    batch_size=BATCH_SIZE, 
    shuffle=True, 
    num_workers=0,
    drop_last=True
)

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)



# Training loop
def train():
    model.train()
    
    for epoch in range(NUM_EPOCHS):
        total_loss = 0.0
        progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS}")
        
        for batch_idx, (images, captions, caption_types) in enumerate(progress_bar):
            images = images.to(DEVICE)
            
            # Tokenize the captions
            tokenized_captions = tokenizer(captions).to(DEVICE)
            
            # Forward pass with mixed precision
            with torch.cuda.amp.autocast():
                image_features = model.encode_image(images)
                text_features = model.encode_text(tokenized_captions)
                
                # Compute contrastive loss
                loss = contrastive_loss(image_features, text_features)
            
            # Backward pass and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            # Update progress bar
            total_loss += loss.item()
            avg_loss = total_loss / (batch_idx + 1)
            progress_bar.set_postfix(loss=f"{avg_loss:.4f}")
            if batch_idx >= MAX_TRAINING_STEPS:
                break
        # Print average loss for the epoch
            
        print(f"Epoch {epoch+1}/{NUM_EPOCHS}, Loss: {avg_loss:.4f}")
        
        # Save checkpoint
        checkpoint_path = os.path.join(CHECKPOINT_DIR, f"mobileclip_finetuned_epoch{epoch+1}.pt")
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': avg_loss,
        }, checkpoint_path)
        print(f"Checkpoint saved: {checkpoint_path}")



Captions file not found: /Users/erenyavuz/Desktop/KU/25 Spring/COMP447/Project/Repo/FlightVision/data/captions.json
Image name: 1000268201_693b08cb0e.jpg, Caption: A child in a pink dress is climbing up a set of stairs in an entry way .
Image name: 1000268201_693b08cb0e.jpg, Caption: A girl going into a wooden building .
Image name: 1000268201_693b08cb0e.jpg, Caption: A little girl climbing into a wooden playhouse .
Image name: 1000268201_693b08cb0e.jpg, Caption: A little girl climbing the stairs to her playhouse .
Image name: 1000268201_693b08cb0e.jpg, Caption: A little girl in a pink dress going into a wooden cabin .
Image name: 1001773457_577c3a7d70.jpg, Caption: A black dog and a spotted dog are fighting
Image name: 1001773457_577c3a7d70.jpg, Caption: A black dog and a tri-colored dog playing with each other on the road .
Image name: 1001773457_577c3a7d70.jpg, Caption: A black dog and a white dog with brown spots are staring at each other in the street .
Image name: 1001773457_577c

## Running the training loop

In [None]:
# Run training
print(f"Training on device: {DEVICE}")
print(f"Dataset size: {len(dataset)} samples")
print(f"Batch size: {BATCH_SIZE}")
print(f"Learning rate: {LEARNING_RATE}")
print(f"Number of epochs: {NUM_EPOCHS}")

MAX_TRAINING_STEPS = 1000


print(f"Max training steps: {MAX_TRAINING_STEPS}")

train()

# Evaluate model after training
model.eval()



Training on device: cpu
Dataset size: 40443 samples
Batch size: 1
Learning rate: 0.0001
Number of epochs: 1


Epoch 1/1:   2%|▏         | 1000/40443 [12:01<7:54:36,  1.39it/s, loss=0.0000]


Epoch 1/1, Loss: 0.0000
Checkpoint saved: /Users/erenyavuz/Desktop/KU/25 Spring/COMP447/Project/Repo/FlightVision/checkpoints/mobileclip_finetuned_epoch1.pt


CLIP(
  (image_encoder): MCi(
    (model): FastViT(
      (patch_embed): Sequential(
        (0): MobileOneBlock(
          (se): Identity()
          (activation): GELU(approximate='none')
          (reparam_conv): Conv2d(3, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        )
        (1): MobileOneBlock(
          (se): Identity()
          (activation): GELU(approximate='none')
          (reparam_conv): Conv2d(64, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=64)
        )
        (2): MobileOneBlock(
          (se): Identity()
          (activation): GELU(approximate='none')
          (reparam_conv): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1))
        )
      )
      (network): ModuleList(
        (0): Sequential(
          (0): RepMixerBlock(
            (token_mixer): RepMixer(
              (reparam_conv): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=64)
            )
            (convffn): ConvFFN(
              (con

Evaluate the model

In [53]:
# load the finetuned model from the checkpoint
checkpoint_path = os.path.join(CHECKPOINT_DIR, 'mobileclip_finetuned_epoch1.pt')
checkpoint = torch.load(checkpoint_path, map_location=DEVICE)

model.load_state_dict(checkpoint['model_state_dict'])


# Example evaluation
test_image_path = "/Users/erenyavuz/Desktop/KU/25 Spring/COMP447/Project/Repo/FlightVision/pngwing.com.png"
test_texts = ["a brown dog", "a white dog", "a black dog"]

test_image = preprocess(Image.open(test_image_path).convert('RGB')).unsqueeze(0).to(DEVICE)
test_text = tokenizer(test_texts).to(DEVICE)

with torch.no_grad(), torch.cuda.amp.autocast():
    image_features = model.encode_image(test_image)
    text_features = model.encode_text(test_text)
    image_features /= image_features.norm(dim=-1, keepdim=True)
    text_features /= text_features.norm(dim=-1, keepdim=True)
    
    text_probs = (100.0 * image_features @ text_features.T).softmax(dim=-1)

torch.set_printoptions(sci_mode=False, precision=4)
print("Label probabilities after training:", text_probs)



Label probabilities after training: tensor([[0.7822, 0.0102, 0.2076]])
