# Multi-VLM Comprehensive Comparison Framework
## Testing 7 Vision-Language Models on COCO Dataset
Optimized for RTX 5080 (16GB VRAM) with all models fully implemented

In [None]:
# Only needed for Google Colab
# from google.colab import drive
# drive.mount('/content/drive')

# Install all required packages
!pip install transformers torch torchvision Pillow datasets pycocotools
!pip install -q accelerate bitsandbytes qwen-vl-utils

## Setup and Imports

In [None]:
from pycocotools.coco import COCO
import requests
import os
from PIL import Image
import json
import pandas as pd
from tqdm import tqdm
import time
import torch
import gc
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Configuration
ANNOTATIONS_PATH = '/home/vortex/CSE 468 AFE/Project/annotations'
IMAGES_DIR = 'coco_images'
RESULTS_DIR = 'results'
NUM_IMAGES = 200

# Create directories
os.makedirs(IMAGES_DIR, exist_ok=True)
os.makedirs(RESULTS_DIR, exist_ok=True)

print(f"Setup complete.")
print(f"GPU Available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")

## Load COCO Dataset

In [None]:
# Load annotations from local copy
print("Loading COCO annotations...")
coco = COCO(os.path.join(ANNOTATIONS_PATH, 'captions_val2017.json'))

# Select random images
import random
random.seed(42)
all_img_ids = coco.getImgIds()
selected_img_ids = random.sample(all_img_ids, NUM_IMAGES)

print(f"Selected {len(selected_img_ids)} images")

# Prepare images
image_files = []
for img_id in tqdm(selected_img_ids, desc="Checking images"):
    img_info = coco.loadImgs(img_id)[0]
    img_path = os.path.join(IMAGES_DIR, img_info['file_name'])
    
    if not os.path.exists(img_path):
        try:
            img_url = img_info['coco_url']
            img_data = requests.get(img_url).content
            with open(img_path, 'wb') as f:
                f.write(img_data)
        except Exception as e:
            print(f"Failed to download {img_info['file_name']}: {e}")
            continue
    
    image_files.append(img_info['file_name'])

print(f"Ready with {len(image_files)} images")

## VLM Model 1: Qwen2-VL-2B-Instruct

In [None]:
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor
from qwen_vl_utils import process_vision_info

class QwenVLM:
    """Qwen2-VL-2B - Alibaba's efficient vision-language model
    Good balance of speed and quality, great for general image understanding"""
    
    def __init__(self):
        self.model_name = "Qwen/Qwen2-VL-2B-Instruct"
        self.display_name = "Qwen2-VL-2B"
        self.vram_estimate = "5-6 GB"
        
        print(f"Loading {self.display_name}...")
        self.processor = AutoProcessor.from_pretrained(self.model_name, trust_remote_code=True)
        self.model = Qwen2VLForConditionalGeneration.from_pretrained(
            self.model_name,
            dtype=torch.float16,
            device_map="auto",
            trust_remote_code=True
        )
        print(f"Loaded {self.display_name}")
    
    def generate_caption(self, image):
        try:
            messages = [{
                "role": "user",
                "content": [
                    {"type": "image", "image": image},
                    {"type": "text", "text": "Describe this image in detail."}
                ]
            }]
            
            text = self.processor.apply_chat_template(
                messages, tokenize=False, add_generation_prompt=True
            )
            
            image_inputs, video_inputs = process_vision_info(messages)
            device = 'cuda' if torch.cuda.is_available() else 'cpu'
            
            inputs = self.processor(
                text=[text], images=image_inputs, videos=video_inputs,
                padding=True, return_tensors="pt"
            ).to(device)
            
            with torch.no_grad():
                outputs = self.model.generate(**inputs, max_new_tokens=256)
            
            full_output = self.processor.batch_decode(outputs, skip_special_tokens=True)[0]
            
            if "assistant" in full_output:
                caption = full_output.split("assistant")[-1].strip()
            else:
                caption = full_output
            
            return caption
        except Exception as e:
            return f"Error: {str(e)[:100]}"
    
    def unload(self):
        del self.model
        del self.processor
        torch.cuda.empty_cache()
        gc.collect()

print(f"Qwen2-VL-2B class ready (VRAM: 5-6 GB)")

## VLM Model 2: MobileVLM V2 (3B)

In [None]:
class MobileVLMV2:
    """MobileVLM V2 (3B) - Optimized for mobile/edge devices
    Lightweight but still provides decent quality, great for low-latency applications"""
    
    def __init__(self):
        self.model_name = "mtgv/MobileVLM_V2-3B"
        self.display_name = "MobileVLM-V2-3B"
        self.vram_estimate = "6-8 GB"
        
        print(f"Loading {self.display_name}...")
        from transformers import AutoModelForCausalLM, AutoTokenizer
        
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name, trust_remote_code=True)
        self.model = AutoModelForCausalLM.from_pretrained(
            self.model_name,
            dtype=torch.float16,
            device_map="auto",
            trust_remote_code=True
        )
        print(f"Loaded {self.display_name}")
    
    def generate_caption(self, image):
        try:
            device = 'cuda' if torch.cuda.is_available() else 'cpu'
            
            # MobileVLM uses different format
            prompt = "Describe this image in detail."
            
            inputs = self.tokenizer(
                prompt, images=image, return_tensors='pt'
            ).to(device)
            
            with torch.no_grad():
                outputs = self.model.generate(**inputs, max_new_tokens=256)
            
            caption = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
            return caption
        except Exception as e:
            return f"Error: {str(e)[:100]}"
    
    def unload(self):
        del self.model
        del self.tokenizer
        torch.cuda.empty_cache()
        gc.collect()

print(f"MobileVLM V2 class ready (VRAM: 6-8 GB)")

## VLM Model 3: LLaVA-1.5 (7B)

In [None]:
class LLaVA15:
    """LLaVA-1.5 (7B) - Popular open-source model by Meta researchers
    Larger model with better reasoning and understanding capabilities
    Tight fit on 16GB VRAM but works with proper memory management"""
    
    def __init__(self):
        self.model_name = "llava-hf/llava-1.5-7b-hf"
        self.display_name = "LLaVA-1.5-7B"
        self.vram_estimate = "14-16 GB"
        
        print(f"Loading {self.display_name}...")
        from transformers import LlavaProcessor, LlavaForConditionalGeneration
        
        self.processor = LlavaProcessor.from_pretrained(self.model_name)
        self.model = LlavaForConditionalGeneration.from_pretrained(
            self.model_name,
            dtype=torch.float16,
            device_map="auto"
        )
        print(f"Loaded {self.display_name}")
    
    def generate_caption(self, image):
        try:
            device = 'cuda' if torch.cuda.is_available() else 'cpu'
            
            prompt = "USER: <image>\nDescribe this image in detail.\nASSISTANT:"
            inputs = self.processor(prompt, image, return_tensors="pt").to(device)
            
            with torch.no_grad():
                outputs = self.model.generate(**inputs, max_new_tokens=256)
            
            caption = self.processor.decode(outputs[0], skip_special_tokens=True)
            
            if "ASSISTANT:" in caption:
                caption = caption.split("ASSISTANT:")[-1].strip()
            
            return caption
        except Exception as e:
            return f"Error: {str(e)[:100]}"
    
    def unload(self):
        del self.model
        del self.processor
        torch.cuda.empty_cache()
        gc.collect()

print(f"LLaVA-1.5 class ready (VRAM: 14-16 GB - largest model, tight fit)")

## VLM Model 4: Phi-3-Vision (4.2B)

In [None]:
class Phi3Vision:
    """Phi-3-Vision (4.2B) - Microsoft's efficient model with good reasoning
    Sweet spot between quality and efficiency, excellent for reasoning tasks"""
    
    def __init__(self):
        self.model_name = "microsoft/Phi-3-vision-128k-instruct"
        self.display_name = "Phi-3-Vision-4.2B"
        self.vram_estimate = "8-10 GB"
        
        print(f"Loading {self.display_name}...")
        from transformers import AutoModelForCausalLM, AutoTokenizer
        
        self.model = AutoModelForCausalLM.from_pretrained(
            self.model_name,
            dtype=torch.float16,
            device_map="auto",
            trust_remote_code=True
        )
        self.tokenizer = AutoTokenizer.from_pretrained(
            self.model_name, trust_remote_code=True
        )
        print(f"Loaded {self.display_name}")
    
    def generate_caption(self, image):
        try:
            device = 'cuda' if torch.cuda.is_available() else 'cpu'
            
            messages = [{
                "role": "user",
                "content": "<|image_1|>\nDescribe this image in detail.",
            }]
            
            text = self.tokenizer.apply_chat_template(
                messages, tokenize=False, add_generation_prompt=True
            )
            
            inputs = self.tokenizer(
                text, return_tensors="pt", padding=True
            ).to(device)
            
            with torch.no_grad():
                outputs = self.model.generate(**inputs, max_new_tokens=256)
            
            caption = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
            return caption
        except Exception as e:
            return f"Error: {str(e)[:100]}"
    
    def unload(self):
        del self.model
        del self.tokenizer
        torch.cuda.empty_cache()
        gc.collect()

print(f"Phi-3-Vision class ready (VRAM: 8-10 GB)")

## VLM Model 5: InternVL2 (2B)

In [None]:
class InternVL2:
    """InternVL2 (2B) - OpenGVLab's compact and efficient model
    Very lightweight but surprisingly strong performance for its size
    Great for resource-constrained scenarios"""
    
    def __init__(self):
        self.model_name = "OpenGVLab/InternVL2-2B"
        self.display_name = "InternVL2-2B"
        self.vram_estimate = "4-6 GB"
        
        print(f"Loading {self.display_name}...")
        from transformers import AutoModel, AutoTokenizer
        
        self.model = AutoModel.from_pretrained(
            self.model_name,
            dtype=torch.float16,
            device_map="auto",
            trust_remote_code=True
        )
        self.tokenizer = AutoTokenizer.from_pretrained(
            self.model_name, trust_remote_code=True
        )
        print(f"Loaded {self.display_name}")
    
    def generate_caption(self, image):
        try:
            device = 'cuda' if torch.cuda.is_available() else 'cpu'
            
            prompt = "Describe this image in detail."
            text = f"<image>\n{prompt}"
            
            inputs = self.tokenizer(
                text, images=image, return_tensors="pt"
            ).to(device)
            
            with torch.no_grad():
                outputs = self.model.generate(**inputs, max_new_tokens=256)
            
            caption = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
            return caption
        except Exception as e:
            return f"Error: {str(e)[:100]}"
    
    def unload(self):
        del self.model
        del self.tokenizer
        torch.cuda.empty_cache()
        gc.collect()

print(f"InternVL2 class ready (VRAM: 4-6 GB - one of the smallest)")

## VLM Model 6: SmolVLM2 (2.2B)

In [None]:
class SmolVLM2:
    """SmolVLM2 (2.2B) - HuggingFace's ultra-efficient model
    Unique feature: supports both images AND videos in same model
    Excellent efficiency, leaves plenty of VRAM headroom"""
    
    def __init__(self):
        self.model_name = "HuggingFaceTB/SmolVLM2-2.2B-Instruct"
        self.display_name = "SmolVLM2-2.2B"
        self.vram_estimate = "5-5.2 GB"
        
        print(f"Loading {self.display_name}...")
        from transformers import AutoModelForVision2Seq, AutoProcessor
        
        self.processor = AutoProcessor.from_pretrained(
            self.model_name, trust_remote_code=True
        )
        self.model = AutoModelForVision2Seq.from_pretrained(
            self.model_name,
            dtype=torch.float16,
            device_map="auto",
            trust_remote_code=True
        )
        print(f"Loaded {self.display_name}")
    
    def generate_caption(self, image):
        try:
            messages = [{
                "role": "user",
                "content": [
                    {"type": "image"},
                    {"type": "text", "text": "Describe this image in detail."}
                ]
            }]
            
            device = 'cuda' if torch.cuda.is_available() else 'cpu'
            
            text = self.processor.apply_chat_template(
                messages, add_generation_prompt=True
            )
            
            inputs = self.processor(
                text=text, images=[image], return_tensors="pt"
            ).to(device)
            
            with torch.no_grad():
                outputs = self.model.generate(**inputs, max_new_tokens=256)
            
            caption = self.processor.decode(outputs[0], skip_special_tokens=True)
            return caption
        except Exception as e:
            return f"Error: {str(e)[:100]}"
    
    def unload(self):
        del self.model
        del self.processor
        torch.cuda.empty_cache()
        gc.collect()

print(f"SmolVLM2 class ready (VRAM: 5.2 GB - efficient with video support)")

## VLM Model 7: DeepSeek-VL (1.3B)

In [None]:
class DeepSeekVL:
    """DeepSeek-VL (1.3B) - Original DeepSeek vision model
    Very lightweight, good for text in images and OCR tasks
    Great for document understanding and text extraction"""
    
    def __init__(self):
        self.model_name = "deepseek-ai/deepseek-vl-1.3b-chat"
        self.display_name = "DeepSeek-VL-1.3B"
        self.vram_estimate = "4-5 GB"
        
        print(f"Loading {self.display_name}...")
        from transformers import AutoModelForCausalLM, AutoTokenizer
        
        self.model = AutoModelForCausalLM.from_pretrained(
            self.model_name,
            dtype=torch.float16,
            device_map="auto",
            trust_remote_code=True
        )
        self.tokenizer = AutoTokenizer.from_pretrained(
            self.model_name, trust_remote_code=True
        )
        print(f"Loaded {self.display_name}")
    
    def generate_caption(self, image):
        try:
            device = 'cuda' if torch.cuda.is_available() else 'cpu'
            
            # DeepSeek uses specific image token format
            image_tokens = "<|vision_start|><|image_pad|><|vision_end|>"
            prompt = f"{image_tokens}\nDescribe this image in detail."
            
            # DeepSeek tokenizer expects special handling for images
            inputs = self.tokenizer.encode_plus(
                prompt, images=[image], return_tensors="pt"
            ).to(device)
            
            with torch.no_grad():
                outputs = self.model.generate(**inputs, max_new_tokens=256)
            
            caption = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
            return caption
        except Exception as e:
            return f"Error: {str(e)[:100]}"
    
    def unload(self):
        del self.model
        del self.tokenizer
        torch.cuda.empty_cache()
        gc.collect()

print(f"DeepSeek-VL class ready (VRAM: 4-5 GB - smallest, good for OCR)")

## Configuration: Select Which Models to Run

In [None]:
# Define all available models
AVAILABLE_MODELS = {
    "Qwen2-VL-2B": QwenVLM,
    "MobileVLM-V2-3B": MobileVLMV2,
    "LLaVA-1.5-7B": LLaVA15,
    "Phi-3-Vision-4.2B": Phi3Vision,
    "InternVL2-2B": InternVL2,
    "SmolVLM2-2.2B": SmolVLM2,
    "DeepSeek-VL-1.3B": DeepSeekVL,
}

# ========================================
# CUSTOMIZE THIS LIST TO SELECT MODELS
# ========================================
# Comment/uncomment models to enable/disable
MODELS_TO_RUN = [
    "Qwen2-VL-2B",          # Fast, good quality, 5-6 GB
    # "MobileVLM-V2-3B",     # Lightweight, 6-8 GB
    # "LLaVA-1.5-7B",        # Larger, better reasoning, 14-16 GB (tight!)
    # "Phi-3-Vision-4.2B",   # Sweet spot, 8-10 GB
    # "InternVL2-2B",        # Very compact, 4-6 GB
    # "SmolVLM2-2.2B",       # Efficient with video support, 5.2 GB
    # "DeepSeek-VL-1.3B",    # Smallest, good OCR, 4-5 GB
]

print(f"\n{'='*80}")
print("MODELS CONFIGURATION")
print(f"{'='*80}")
print(f"\nModels to run: {len(MODELS_TO_RUN)}")
for model_name in MODELS_TO_RUN:
    print(f"  ✓ {model_name}")

print(f"\nAvailable models not selected: {len(AVAILABLE_MODELS) - len(MODELS_TO_RUN)}")
for model_name in AVAILABLE_MODELS:
    if model_name not in MODELS_TO_RUN:
        print(f"  ○ {model_name}")
print(f"\nEdit MODELS_TO_RUN list above to change selection")

## Process All Selected Models

In [None]:
all_results = []

for model_name in MODELS_TO_RUN:
    if model_name not in AVAILABLE_MODELS:
        print(f"Warning: {model_name} not found in available models")
        continue
    
    model_class = AVAILABLE_MODELS[model_name]
    
    print(f"\n{'='*80}")
    print(f"Processing with: {model_name}")
    print(f"{'='*80}")
    
    try:
        # Load model
        model = model_class()
        
        model_results = []
        start_time = time.time()
        
        # Process images
        for idx, img_file in enumerate(tqdm(image_files, desc=f"Processing with {model_name}")):
            try:
                img_path = os.path.join(IMAGES_DIR, img_file)
                image = Image.open(img_path).convert('RGB')
                
                img_id = img_file.replace('.jpg', '')
                img_size = image.size
                
                start_inference = time.time()
                caption = model.generate_caption(image)
                inference_time = time.time() - start_inference
                
                result = {
                    'image_id': img_id,
                    'model_name': model_name,
                    'caption': caption,
                    'processing_time_sec': round(inference_time, 2),
                    'image_width': img_size[0],
                    'image_height': img_size[1],
                    'timestamp': datetime.now().isoformat()
                }
                
                model_results.append(result)
                all_results.append(result)
                
                # Checkpoint every 50 images
                if (idx + 1) % 50 == 0:
                    checkpoint_df = pd.DataFrame(model_results)
                    checkpoint_path = os.path.join(
                        RESULTS_DIR,
                        f"checkpoint_{model_name.replace('/', '_')}_{idx+1}.csv"
                    )
                    checkpoint_df.to_csv(checkpoint_path, index=False)
                    elapsed = (time.time() - start_time) / 60
                    tqdm.write(f"Checkpoint {idx+1}/{len(image_files)}: {elapsed:.1f}m elapsed")
            
            except Exception as e:
                tqdm.write(f"Error processing {img_file}: {str(e)[:50]}")
                continue
        
        # Save model results
        model_df = pd.DataFrame(model_results)
        result_path = os.path.join(RESULTS_DIR, f"results_{model_name.replace('/', '_')}.csv")
        model_df.to_csv(result_path, index=False)
        
        elapsed_time = (time.time() - start_time) / 60
        successful = len([r for r in model_results if not r['caption'].startswith('Error')])
        
        print(f"\nCompleted {model_name}:")
        print(f"  Processed: {len(model_results)}/{len(image_files)} images")
        print(f"  Successful: {successful}/{len(model_results)}")
        print(f"  Time: {elapsed_time:.1f} minutes")
        if len(model_results) > 0:
            print(f"  Avg per image: {elapsed_time * 60 / len(model_results):.1f}s")
        print(f"  Saved to: {result_path}")
        
        # Unload model to free VRAM
        model.unload()
        time.sleep(2)
    
    except Exception as e:
        print(f"Failed to process {model_name}: {str(e)}")
        continue

# Save combined results
if all_results:
    combined_df = pd.DataFrame(all_results)
    combined_path = os.path.join(RESULTS_DIR, 'all_models_comparison.csv')
    combined_df.to_csv(combined_path, index=False)
    
    print(f"\n{'='*80}")
    print(f"ALL PROCESSING COMPLETE")
    print(f"{'='*80}")
    print(f"Total results: {len(all_results)}")
    print(f"Saved to: {combined_path}")

## Results Analysis

In [None]:
combined_path = os.path.join(RESULTS_DIR, 'all_models_comparison.csv')
if os.path.exists(combined_path):
    results_df = pd.read_csv(combined_path)
    
    print(f"\n{'='*80}")
    print("DETAILED RESULTS SUMMARY")
    print(f"{'='*80}")
    
    for model_name in results_df['model_name'].unique():
        model_data = results_df[results_df['model_name'] == model_name]
        caption_lengths = model_data['caption'].str.len()
        
        successful = len([c for c in model_data['caption'] if not c.startswith('Error')])
        
        print(f"\n{model_name}:")
        print(f"  Total images: {len(model_data)}")
        print(f"  Successful: {successful}/{len(model_data)}")
        print(f"  Success rate: {successful/len(model_data)*100:.1f}%")
        print(f"  Avg caption length: {caption_lengths.mean():.0f} characters")
        print(f"  Caption range: {caption_lengths.min()}-{caption_lengths.max()}")
        print(f"  Avg inference time: {model_data['processing_time_sec'].mean():.2f}s")
        print(f"  Min/Max inference: {model_data['processing_time_sec'].min():.2f}s / {model_data['processing_time_sec'].max():.2f}s")
    
    # Export to both CSV and JSON
    json_path = os.path.join(RESULTS_DIR, 'all_models_comparison.json')
    results_df.to_json(json_path, orient='records', indent=2)
    
    print(f"\n{'='*80}")
    print("RESULTS EXPORTED")
    print(f"{'='*80}")
    print(f"CSV: {combined_path}")
    print(f"JSON: {json_path}")
    print(f"{'='*80}\n")
else:
    print("No results file found. Run processing section first.")

## Sample Results Comparison

In [None]:
combined_path = os.path.join(RESULTS_DIR, 'all_models_comparison.csv')
if os.path.exists(combined_path):
    results_df = pd.read_csv(combined_path)
    
    # Show results for first 2 unique images
    unique_images = results_df['image_id'].unique()[:2]
    
    for img_id in unique_images:
        print(f"\n{'='*80}")
        print(f"Image ID: {img_id}")
        img_data = results_df[results_df['image_id'] == img_id]
        
        if len(img_data) > 0:
            first_row = img_data.iloc[0]
            print(f"Size: {first_row['image_width']}x{first_row['image_height']}")
            print(f"{'='*80}")
            
            for _, row in img_data.iterrows():
                print(f"\n{row['model_name']} ({row['processing_time_sec']:.2f}s):")
                caption = row['caption']
                if len(caption) > 400:
                    caption = caption[:400] + "..."
                print(f"{caption}")
else:
    print("No results available. Run processing section first.")

## OLD CODE - Gemini Processing (Commented Out)

In [None]:
# ============================================
# Using Gemini API for image captioning
# ============================================
# Switched to using open-source models instead for better control
# Keeping this code for reference - shows what we replaced

# import google.generativeai as genai
#
# GEMINI_API_KEY = "your_api_key_here"
# genai.configure(api_key=GEMINI_API_KEY)
#
# def generate_gemini_caption(image):
#     """Generate caption using Gemini 2.5 Pro"""
#     try:
#         gemini_model = genai.GenerativeModel('models/gemini-2.5-pro')
#         response = gemini_model.generate_content([
#             "Describe this image in detail.",
#             image
#         ])
#         return response.text
#     except Exception as e:
#         return f"Error: {str(e)}"
# ============================================
# Old manual patch extraction approach
# ============================================
# Modern VLMs handle preprocessing internally, no need for manual patches
#
# def extract_patches(image, patch_size=224, stride=112):
#     patches = []
#     width, height = image.size
#     for y in range(0, height - patch_size + 1, stride):
#         for x in range(0, width - patch_size + 1, stride):
#             patch = image.crop((x, y, x + patch_size, y + patch_size))
#             patches.append(patch)
#     return patches

print("Old code sections kept for reference but not used")
print("We're now using 7 different open-source VLM models instead")
print("Benefits: No API costs, no rate limiting, full local control, easy to compare")