In [None]:
import torch
from transformers import Qwen2_5_VLForConditionalGeneration, AutoTokenizer, AutoProcessor
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import zipfile
import json
import os
from PIL import Image

model_id = "Qwen/Qwen2.5-VL-7B-Instruct"
device = 'cuda'

In [None]:
# load model
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
    model_id, torch_dtype="auto", device_map="auto"
)

processor = AutoProcessor.from_pretrained(model_id)



In [None]:
extract_path = "restored_images"
os.makedirs(extract_path, exist_ok=True)

with zipfile.ZipFile("dataset_images.zip", 'r') as zip_ref:
    zip_ref.extractall(extract_path)

with open("dataset.json", 'r') as f:
    loaded_metadata = json.load(f)

dataset = []
for entry in loaded_metadata:
    image_path = os.path.join(extract_path, f"{entry['index']}.jpg")
    image = Image.open(image_path)

    dst_item = {
        "text": entry["label"],
        "image": image,
        "label": entry["count"]
    }
    dataset.append(dst_item)

group_size = 56

dataset = dataset[:group_size*5]

In [None]:
# construct input/output
def generate_message(data):
    return [
        {
            "role": "user",
            "content": [
                {
                    "type": "image",
                },
                {
                    "type": "text",
                    "text": f"How many {data["text"]} are in the image? Only output an integer between 1 and 5."
                },

            ],
        }
    ]

In [None]:
print(len(dataset), dataset[0])

In [None]:

def probe(batch):
    # layers should be between 0 (input embedding) and 28 (final activations)
    inputs = processor(
        text = [processor.apply_chat_template(generate_message(item), tokenize=False, add_generation_prompt=True) for item in batch],
        images=[item["image"] for item in batch],
        padding=True,
        return_tensors="pt",
    ).to(device)

    with torch.no_grad():
        outputs = model(
            **inputs,
            output_hidden_states=True,
            return_dict=True
        )

    # print(len(outputs.hidden_states)) # 29
    # shape of outputs.hidden_states (batch_size, sequence_length, hidden_size)

    # get the predicted output
    last_layer = outputs.hidden_states[-1]
    final_vec_raw = last_layer[:, -1, :]
    final_vec_normed = model.model.language_model.norm(final_vec_raw)
    logits_manual = model.lm_head(final_vec_normed)
    token_ids = torch.argmax(logits_manual, dim=-1)
    predicted_tokens = processor.batch_decode(token_ids.unsqueeze(-1), skip_special_tokens=True)
    targets = [item["label"] for item in batch]

    return predicted_tokens, targets, [hs[:, -1, :] for hs in outputs.hidden_states]

In [None]:
processor.tokenizer.padding_side = "left"
layers = 29
batch_size = 2
activations = [[[] for _ in range(5)] for _ in range(layers)]
predictions = [[] for _ in range(5)]

for i in range(0, len(dataset), batch_size):
    batch = dataset[i:i+batch_size]
    predicted_tokens, targets, batch_acts = probe(batch)
    for j in range(batch_size):
        predictions[targets[j]-1].append(predicted_tokens[j])
    for j in range(layers):
        for k in range(batch_size):
            label = batch[k]["label"] - 1
            activations[j][label].append(batch_acts[j][k].float().cpu().numpy())
    print(f"batch {i//batch_size+1}/{len(dataset)//batch_size}")


In [None]:
# accuracies
for i in range(5):
    cnt = 0
    for j in range(len(predictions[i])):
        if predictions[i][j] == str(i+1):
            cnt += 1
    print(f"{i+1}: {cnt/len(predictions[i])}")

In [None]:
output_folder = "qwen_plots"
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

for i in range(layers):
    all_acts = []
    for x in activations[i]:
        all_acts.extend(x)
    pca = PCA(n_components=2)
    X_pca = pca.fit_transform(all_acts)
    plt.figure(figsize=(10, 6))

    for j in range(5):
        curr = X_pca[j*group_size:(j+1)*group_size]
        plt.scatter(curr[:, 0], curr[:, 1], label=f"{j+1}")

    plt.title(f"Layer {i}")
    plt.legend()
    save_path = os.path.join(output_folder, f"qwen_{i}.png")
    plt.savefig(save_path)
    plt.show()
    plt.close()

In [None]:
import shutil
from google.colab import files

shutil.make_archive(output_folder, 'zip', output_folder)
files.download(f'{output_folder}.zip')

In [None]:
# what if we reduce dimensions for all layers simultaneously?

all_acts = []
for x in activations:
    for y in x:
        all_acts.extend(y)
pca = PCA(n_components=2)
X_pca = pca.fit_transform(all_acts)

for i in range(layers):
    for j in range(5):
        curr = X_pca[i*5*group_size+j*group_size:i*5*group_size+(j+1)*group_size]
        plt.scatter(curr[:, 0], curr[:, 1], label=f"{j+1}")
    plt.legend()
    plt.show()
