In [None]:
!nvidia-smi


Mon Dec 22 05:16:11 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   60C    P8             11W /   70W |       0MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [None]:
%pip install -q torch torchvision torchaudio
%pip install -q transformers datasets accelerate
%pip install -q faiss-cpu nltk pillow tqdm


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.7/23.7 MB[0m [31m109.3 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import torch
import faiss
import nltk
import numpy as np
from PIL import Image
from transformers import BlipProcessor, BlipForQuestionAnswering
from tqdm import tqdm

nltk.download("punkt")
nltk.download("averaged_perceptron_tagger")
nltk.download("wordnet")

device = "cuda" if torch.cuda.is_available() else "cpu"

processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
model = BlipForQuestionAnswering.from_pretrained(
    "Salesforce/blip-vqa-base"
).to(device)
model.train()


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is rec

preprocessor_config.json:   0%|          | 0.00/445 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/592 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/1.54G [00:00<?, ?B/s]

BlipForQuestionAnswering(
  (vision_model): BlipVisionModel(
    (embeddings): BlipVisionEmbeddings(
      (patch_embedding): Conv2d(3, 768, kernel_size=(16, 16), stride=(16, 16))
    )
    (encoder): BlipEncoder(
      (layers): ModuleList(
        (0-11): 12 x BlipEncoderLayer(
          (self_attn): BlipAttention(
            (dropout): Dropout(p=0.0, inplace=False)
            (qkv): Linear(in_features=768, out_features=2304, bias=True)
            (projection): Linear(in_features=768, out_features=768, bias=True)
          )
          (layer_norm1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
          (mlp): BlipMLP(
            (activation_fn): GELUActivation()
            (fc1): Linear(in_features=768, out_features=3072, bias=True)
            (fc2): Linear(in_features=3072, out_features=768, bias=True)
          )
          (layer_norm2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        )
      )
    )
    (post_layernorm): LayerNorm((768,), eps=1e-05, e

In [None]:
def get_question_embedding(question):
    inputs = processor(text=question, return_tensors="pt").to(device)
    with torch.no_grad():
        emb = model.text_encoder(
            inputs.input_ids,
            attention_mask=inputs.attention_mask
        ).last_hidden_state.mean(dim=1)
    return emb.cpu().numpy()


In [None]:
questions_db = [
    "Is the dog black?",
    "How many dogs are there?",
    "Is the cat white?",
    "What color is the car?"
]

q_embeddings = np.vstack([get_question_embedding(q) for q in questions_db])

# FAISS index
q_index = faiss.IndexFlatIP(q_embeddings.shape[1])
q_index.add(q_embeddings)


In [None]:
def get_image_embedding(image):
    inputs = processor(images=image, return_tensors="pt").to(device)
    with torch.no_grad():
        emb = model.vision_model(
            inputs.pixel_values
        ).last_hidden_state.mean(dim=1)
    return emb.cpu().numpy()


In [None]:
image_db = [
    Image.open("/content/cat.jpeg"),
    Image.open("/content/dog.jpg")
]

v_embeddings = np.vstack([get_image_embedding(img) for img in image_db])

v_index = faiss.IndexFlatIP(v_embeddings.shape[1])
v_index.add(v_embeddings)


In [None]:
def aggregate_features(p, q_index, v_index, q_embs, v_embs, Kq=4, Kv=4, wq=0.6, wv=0.4):
    # linguistic retrieval
    _, q_ids = q_index.search(p, Kq)
    q_feats = q_embs[q_ids[0]].mean(axis=0)

    # visual retrieval
    _, v_ids = v_index.search(p, Kv)
    v_feats = v_embs[v_ids[0]].mean(axis=0)

    aggregated = p + wq * q_feats + wv * v_feats
    return aggregated


In [None]:
def rag_vqa_forward(image, question):
    # original embeddings
    q_emb = get_question_embedding(question)
    v_emb = get_image_embedding(image)

    # aggregate
    q_agg = aggregate_features(q_emb, q_index, v_index, q_embeddings, v_embeddings)
    v_agg = aggregate_features(v_emb, q_index, v_index, q_embeddings, v_embeddings)

    # normal BLIP forward
    inputs = processor(images=image, text=question, return_tensors="pt").to(device)
    outputs = model(**inputs)
    return outputs


In [None]:
from PIL import Image

train_data = [
    {
        "image": Image.open("/content/cat.jpeg"),
        "question": "Is the dog black?",
        "answer": "yes"
    },
    {
        "image": Image.open("/content/dog.jpg"),
        "question": "What color is the cat?",
        "answer": "white"
    }
]


In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

for epoch in range(2):
    total_loss = 0

    for sample in train_data:
        inputs = processor(
            images=sample["image"],
            text=sample["question"],
            return_tensors="pt"
        ).to(device)

        labels = processor(
            text=sample["answer"],
            return_tensors="pt"
        ).input_ids.to(device)

        outputs = model(**inputs, labels=labels)
        loss = outputs.loss

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        total_loss += loss.item()

    print(f"Epoch {epoch+1} | Loss: {total_loss:.4f}")


Epoch 1 | Loss: 3.4180
Epoch 2 | Loss: 0.1477


In [None]:
def get_text_tokens(question):
    inputs = processor(text=question, return_tensors="pt").to(device)
    with torch.no_grad():
        outputs = model.text_encoder(
            inputs.input_ids,
            attention_mask=inputs.attention_mask,
            output_hidden_states=True
        )
    # shape: (num_tokens, hidden_dim)
    return outputs.last_hidden_state.squeeze(0)


In [None]:
def get_image_patches(image):
    inputs = processor(images=image, return_tensors="pt").to(device)
    with torch.no_grad():
        outputs = model.vision_model(
            inputs.pixel_values,
            output_hidden_states=True
        )
    # shape: (num_patches, hidden_dim)
    return outputs.last_hidden_state.squeeze(0)


In [None]:
def aggregate_feature(p, q_index, v_index, q_embs, v_embs, Kq=2, Kv=2, wq=0.6, wv=0.4):
    p_np = p.cpu().numpy().reshape(1, -1)

    _, q_ids = q_index.search(p_np, Kq)
    _, v_ids = v_index.search(p_np, Kv)

    q_feat = q_embs[q_ids[0]].mean(axis=0)
    v_feat = v_embs[v_ids[0]].mean(axis=0)

    p_agg = p + wq * torch.tensor(q_feat).to(device) + wv * torch.tensor(v_feat).to(device)
    return p_agg


In [None]:
def apply_rag(image, question):
    text_feats = get_text_tokens(question)
    img_feats = get_image_patches(image)

    rag_text = torch.stack([
        aggregate_feature(t, q_index, v_index, q_embeddings, v_embeddings)
        for t in text_feats
    ])

    rag_img = torch.stack([
        aggregate_feature(v, q_index, v_index, q_embeddings, v_embeddings)
        for v in img_feats
    ])

    return rag_text, rag_img


In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

for epoch in range(2):
    total_loss = 0

    for sample in train_data:
        rag_text, rag_img = apply_rag(sample["image"], sample["question"])

        inputs = processor(
            images=sample["image"],
            text=sample["question"],
            return_tensors="pt"
        ).to(device)

        labels = processor(
            text=sample["answer"],
            return_tensors="pt"
        ).input_ids.to(device)

        outputs = model(**inputs, labels=labels)
        loss = outputs.loss

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        total_loss += loss.item()

    print(f"[RAG] Epoch {epoch+1} | Loss: {total_loss:.4f}")


[RAG] Epoch 1 | Loss: 0.0889
[RAG] Epoch 2 | Loss: 1.2858
