In [1]:
!pip install git+https://github.com/openai/CLIP.git

Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to /tmp/pip-req-build-r3ffzxhz
  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git /tmp/pip-req-build-r3ffzxhz
  Resolved https://github.com/openai/CLIP.git to commit dcba3cb2e2827b402d2701e7e1c7d9fed8a20ef1
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting ftfy (from clip==1.0)
  Downloading ftfy-6.3.1-py3-none-any.whl.metadata (7.3 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch->clip==1.0)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch->clip==1.0)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch->clip==1.0)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting 

In [None]:
# !unzip UTF-8sample_images.zip
!unzip counteranimal.zip

Archive:  UTF-8sample_images.zip
   creating: sample_images/
  inflating: sample_images/bee_eater.JPEG  
  inflating: sample_images/black_swan.JPEG  
  inflating: sample_images/hen.JPEG  
  inflating: sample_images/kite.JPEG  
  inflating: sample_images/tiger_shark.JPEG  


In [3]:
import os
import time
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import clip
import numpy as np
import matplotlib.pyplot as plt
from torchvision.datasets import ImageNet
import requests
import torchvision
from io import BytesIO

device = "cuda" if torch.cuda.is_available() else "cpu"

In [4]:
#############################################################
# Task 1: Inference using CLIP and ImageNet pretrained ResNet-50
#############################################################

# Load ImageNet pretrained ResNet-50
imagenet_model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
imagenet_model = imagenet_model.to(device)
imagenet_model.eval()

# Load CLIP model
clip_model, clip_preprocess = clip.load("RN50", device=device)
clip_model.eval()

# Extract CLIP's visual encoder
clip_visual_encoder = clip_model.visual

print("Model comparison:")
print(f"ImageNet ResNet-50 architecture: {type(imagenet_model)}")
print(f"CLIP visual encoder architecture: {type(clip_visual_encoder)}")

# Compare architectures
def compare_architectures():
    print("\nArchitecture Comparison:")
    print("ImageNet ResNet-50 layers:")
    for name, module in imagenet_model.named_children():
        print(f"  {name}: {type(module)}")

    print("\nCLIP visual encoder layers:")
    for name, module in clip_visual_encoder.named_children():
        print(f"  {name}: {type(module)}")

compare_architectures()

Model comparison:
ImageNet ResNet-50 architecture: <class 'torchvision.models.resnet.ResNet'>
CLIP visual encoder architecture: <class 'clip.model.ModifiedResNet'>

Architecture Comparison:
ImageNet ResNet-50 layers:
  conv1: <class 'torch.nn.modules.conv.Conv2d'>
  bn1: <class 'torch.nn.modules.batchnorm.BatchNorm2d'>
  relu: <class 'torch.nn.modules.activation.ReLU'>
  maxpool: <class 'torch.nn.modules.pooling.MaxPool2d'>
  layer1: <class 'torch.nn.modules.container.Sequential'>
  layer2: <class 'torch.nn.modules.container.Sequential'>
  layer3: <class 'torch.nn.modules.container.Sequential'>
  layer4: <class 'torch.nn.modules.container.Sequential'>
  avgpool: <class 'torch.nn.modules.pooling.AdaptiveAvgPool2d'>
  fc: <class 'torch.nn.modules.linear.Linear'>

CLIP visual encoder layers:
  conv1: <class 'torch.nn.modules.conv.Conv2d'>
  bn1: <class 'torch.nn.modules.batchnorm.BatchNorm2d'>
  relu1: <class 'torch.nn.modules.activation.ReLU'>
  conv2: <class 'torch.nn.modules.conv.Conv2

In [3]:
#############################################################
# Task 2: Setup data and understand ImageNet dataset
#############################################################

In [5]:
#############################################################
# Task 3: Setup zero-shot CLIP
#############################################################


imagenet_classes = torchvision.models.ResNet50_Weights.IMAGENET1K_V1.meta["categories"]
text_inputs = torch.cat([clip.tokenize(f"a photo of a {c}") for c in imagenet_classes]).to(device)

def predict_with_clip(image_path):
    # Load and preprocess image
    image = clip_preprocess(Image.open(image_path)).unsqueeze(0).to(device)

    with torch.no_grad():
        # Get image features
        image_features = clip_model.encode_image(image)

        # Get text features for all ImageNet classes
        text_features = clip_model.encode_text(text_inputs)

        # Normalize features
        image_features = image_features / image_features.norm(dim=-1, keepdim=True)
        text_features = text_features / text_features.norm(dim=-1, keepdim=True)

        # Calculate similarity (cosine similarity as logits)
        similarity = (100.0 * image_features @ text_features.T).softmax(dim=-1)

        # Get top predictions
        values, indices = similarity[0].topk(5)

    return [(imagenet_classes[idx], values[i].item()) for i, idx in enumerate(indices)]



imagenet_preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Function to make predictions with ImageNet pretrained model
def predict_with_imagenet(image_path):
    # Load and preprocess image
    image = imagenet_preprocess(Image.open(image_path)).unsqueeze(0).to(device)

    with torch.no_grad():
        output = imagenet_model(image)
        probabilities = torch.nn.functional.softmax(output[0], dim=0)

        # Get top predictions
        values, indices = probabilities.topk(5)

    return [(imagenet_classes[idx], values[i].item()) for i, idx in enumerate(indices)]



print("\nTesting models on sample image:")

for path in os.listdir('sample_images'):

    img_path = os.path.join('sample_images', path)
    print(f"Image: {img_path}")
    clip_predictions = predict_with_clip(img_path)
    print("CLIP top-5 predictions:")
    for class_name, score in clip_predictions:
        print(f"  {class_name}: {score:.4f}")

    imagenet_predictions = predict_with_imagenet(img_path)
    print("\nImageNet top-5 predictions:")
    for class_name, score in imagenet_predictions:
        print(f"  {class_name}: {score:.4f}")
    print("-" * 50)



Testing models on sample image:


FileNotFoundError: [Errno 2] No such file or directory: 'sample_images'

In [8]:
test_images = [
    # ("ladybug.jpg", "ladybug"),
    # ("tennis_ball.jpg", "tennis_ball"),
    # ("golden_retreiver.jpg", "golden retriever"),
    # ("laptop.jpg", "laptop"),
    ("counteranimal/42 agama/counter-tree/medium - 2023-09-08T135742.071.jpeg", "agama"),
]

for img_path, label in test_images:
    print(f"\n--- Predictions for: {label} ---")

    print("CLIP Top-5:")
    for cls, score in predict_with_clip(img_path):
        print(f"  {cls} ({score:.4f})")

    print("ImageNet RN50 Top-5:")
    for cls, score in predict_with_imagenet(img_path):
        print(f"  {cls} ({score:.4f})")



--- Predictions for: agama ---
CLIP Top-5:


OutOfMemoryError: CUDA out of memory. Tried to allocate 152.00 MiB. GPU 0 has a total capacity of 5.76 GiB of which 2.81 MiB is free. Process 6995 has 2.07 GiB memory in use. Process 25710 has 2.78 GiB memory in use. Including non-PyTorch memory, this process has 912.00 MiB memory in use. Of the allocated memory 578.10 MiB is allocated by PyTorch, and 223.90 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [6]:
import os
import difflib
import cv2  # Only used to check number of channels

# from your_module import predict_with_clip, predict_with_imagenet

dataset_root = "counteranimal"
imagenet_classes_file = "imagenet_classes.txt"

with open(imagenet_classes_file, "r") as f:
    imagenet_classes = [line.strip().lower() for line in f.readlines()]

def match_imagenet_label(folder_name, classes, cutoff=0.6):
    parts = folder_name.split(' ', 1)
    if len(parts) < 2:
        raise ValueError(f"Unexpected folder name format: {folder_name}")
    synonyms = [syn.strip().lower() for syn in parts[1].split(',')]
    for syn in synonyms:
        if not syn:
            continue
        matches = difflib.get_close_matches(syn, classes, n=1, cutoff=cutoff)
        if matches:
            return matches[0]
    raise ValueError(f"No close Imagenet match for any of {synonyms} (folder {folder_name})")

clip_fails_imagenet_passes = []
imagenet_fails_clip_passes = []

clip_fail_rn50_pass_count = 0
rn50_fail_clip_pass_count = 0

# Iterate through folders
for class_folder in os.listdir(dataset_root):
    class_path = os.path.join(dataset_root, class_folder)
    if not os.path.isdir(class_path):
        continue

    try:
        gt_label = match_imagenet_label(class_folder, imagenet_classes)
    except ValueError as e:
        print(e)
        continue

    # animals = ['agama', 'beaver', 'bighorn', "black grouse", "bulbul", "cicada", "flamingo", "loggerhead", "water ouzel"]
    # if not any(gt_label.endswith(animal) for animal in animals):
    #     continue

    print(f"\nProcessing folder: {class_folder} (GT: {gt_label})")
    for root, _, files in os.walk(class_path):
        clip_fail_rn50_pass_count = 0
        rn50_fail_clip_pass_count = 0
        print(f"  Processing subfolder: {root}")
        for fname in files:
            if clip_fail_rn50_pass_count >= 2 and rn50_fail_clip_pass_count >= 4:
                break

            if not fname.lower().endswith(('.jpg', '.jpeg')):
                continue

            img_path = os.path.join(root, fname)
            img = cv2.imread(img_path)
            if img is None or len(img.shape) != 3 or img.shape[2] != 3:
                continue  # Not a 3-channel image

            clip_preds = [cls.lower() for cls, _ in predict_with_clip(img_path)]
            rn50_preds = [cls.lower() for cls, _ in predict_with_imagenet(img_path)]

            clip_pass = gt_label in clip_preds
            rn50_pass = gt_label in rn50_preds

            # print(f"Image: {img_path}")
            # print(f"  CLIP predictions: {clip_preds} (Pass: {clip_pass})")
            # print(f"  RN-50 predictions: {rn50_preds} (Pass: {rn50_pass})")
            if (clip_fail_rn50_pass_count < 2) and (not clip_pass and rn50_pass):
                clip_fails_imagenet_passes.append(img_path)
                clip_fail_rn50_pass_count += 1
                print(f"CLIP fails but RN-50 passes: {img_path}")

            elif (rn50_fail_clip_pass_count < 4) and (not rn50_pass and clip_pass):
                imagenet_fails_clip_passes.append(img_path)
                rn50_fail_clip_pass_count += 1
                print(f"RN-50 fails but CLIP passes: {img_path}")

            if clip_fail_rn50_pass_count >= 2 and rn50_fail_clip_pass_count >= 4:
                break

def print_results(title, paths):
    print(f"\n--- {title} (Total: {len(paths)}) ---")
    for p in paths:
        print(p)

print_results("CLIP fails but RN-50 passes", clip_fails_imagenet_passes)
print_results("RN-50 fails but CLIP passes", imagenet_fails_clip_passes)


Processing folder: 293 cheetah, chetah, Acinonyx jubatus (GT: 293, cheetah)
  Processing subfolder: counteranimal/293 cheetah, chetah, Acinonyx jubatus
  Processing subfolder: counteranimal/293 cheetah, chetah, Acinonyx jubatus/counter-tree

Processing folder: 9 ostrich, Struthio camelus (GT: 9, ostrich)
  Processing subfolder: counteranimal/9 ostrich, Struthio camelus
  Processing subfolder: counteranimal/9 ostrich, Struthio camelus/counter-water

Processing folder: 102 echidna, spiny anteater, anteater (GT: 102, echidna)
  Processing subfolder: counteranimal/102 echidna, spiny anteater, anteater
  Processing subfolder: counteranimal/102 echidna, spiny anteater, anteater/counter-tree

Processing folder: 81 ptarmigan (GT: 81, ptarmigan)
  Processing subfolder: counteranimal/81 ptarmigan
  Processing subfolder: counteranimal/81 ptarmigan/counter-grass

Processing folder: 89 sulphur-crested cockatoo, Kakatoe galerita, Cacatua galerita (GT: 89, sulphur-crested_cockatoo)
  Processing subf

In [None]:
img_path = "counteranimal/42 agama/counter-tree/medium - 2023-09-08T135742.071.jpeg"
clip_preds = [cls.lower() for cls, _ in predict_with_clip(img_path)]
rn50_preds = [cls.lower() for cls, _ in predict_with_imagenet(img_path)]
print(clip_preds)
print(rn50_preds)
gt_label = "42, agama"

clip_pass = gt_label in clip_preds
rn50_pass = gt_label in rn50_preds

print(clip_pass)
print(rn50_pass)

# Case: CLIP fails but RN-50 passes
if not clip_pass and rn50_pass:
    print(f"CLIP fails but RN-50 passes: {img_path}")



['35, mud_turtle', '63, indian_cobra', '38, banded_gecko', '78, tick', '44, alligator_lizard']
['37, box_turtle', '36, terrapin', '38, banded_gecko', '42, agama', '35, mud_turtle']
False
True
CLIP fails but RN-50 passes: counteranimal/42 agama/counter-tree/medium - 2023-09-08T135742.071.jpeg


In [None]:
#############################################################
# Task 4: CLIP vs ImageNet pretraining
#############################################################

# Select 10 diverse ImageNet classes
selected_classes = [
    "goldfish",       # Aquatic animal
    "airliner",       # Vehicle
    "grand_piano",    # Musical instrument
    "coffee_mug",     # Household object
    "golden_retriever", # Dog breed
    "monarch",        # Butterfly/insect
    "ambulance",      # Emergency vehicle
    "volcano",        # Natural formation
    "strawberry",     # Fruit
    "desktop_computer" # Electronics
]

def download_test_images():
    """Download test images for our comparison"""
    # Create directories for images
    os.makedirs("test_images", exist_ok=True)
    os.makedirs("test_images/clip_better", exist_ok=True)
    os.makedirs("test_images/imagenet_better", exist_ok=True)

download_test_images()

In [None]:
import torch
import clip
import time
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import subprocess
import copy

# Load the original FP32 model
# device = "cuda" if torch.cuda.is_available() else "cpu"
# clip_visual_fp32, preprocess = clip.load("RN50", device=device)

# model_fp32 = model_fp32.visual.float()

# # Create FP16 version
# model_fp16 = copy.deepcopy(model_fp32).half()

# # model_fp16 = copy.deepcopy(model_fp32).to(device)
# # model_fp32 = model_fp32.to(device).float()
# # Prepare test images (5 images from different classes)
image_paths = [
    'sample_images/bee_eater.JPEG'
]

def load_clip_fp32():
    clip_model, preprocess = clip.load("RN50", device=device)
    return clip_model.visual.float()

def load_clip_fp16():
    clip_model, preprocess = clip.load("RN50", device=device)
    return clip_model.visual.half()

images = [clip_preprocess(Image.open(path)).unsqueeze(0).to(device) for path in image_paths]

def test_fp16_performance_improved():
    print("\nImproved FP16 vs FP32 Performance Test:")

    # Print device info
    if torch.cuda.is_available():
        print(f"GPU: {torch.cuda.get_device_name()}")
        print(f"CUDA Version: {torch.version.cuda}")

    print(f"PyTorch Version: {torch.__version__}")

    # Sample image for testing
    sample_image = clip_preprocess(Image.open('sample_images/bee_eater.JPEG')).unsqueeze(0).to(device)
    sample_image = sample_image.repeat(64, 1, 1, 1)
    # Create proper copies to avoid interference
    clip_visual_fp32 = load_clip_fp32()
    torch.cuda.empty_cache()  # Clear cache between model loads

    clip_visual_fp16 = load_clip_fp16()
    torch.cuda.empty_cache()  # Clear cache between model loads

    print("Model-F32 dtype:", next(clip_visual_fp32.parameters()).dtype)  # Should be torch.float32
    print("Model-F16 dtype:", next(clip_visual_fp16.parameters()).dtype)

    # Warm-up runs - more extensive
    print("Warming up models...")
    with torch.no_grad():
        for _ in range(20):
            _ = clip_visual_fp32(sample_image.to(torch.float32))

        for _ in range(20):
            _ = clip_visual_fp16(sample_image.to(torch.float16))

    # Memory test first (before timing tests)
    print("\nMemory analysis:")
    torch.cuda.empty_cache()
    torch.cuda.reset_peak_memory_stats()

    # Isolate FP32 memory usage with a clear pattern
    with torch.autocast(device_type='cuda', dtype=torch.float32):
        with torch.no_grad():
            _ = clip_visual_fp32(sample_image.to(torch.float32))
    fp32_memory = torch.cuda.max_memory_allocated() / (1024 ** 2)

    # Clear completely between tests
    del clip_visual_fp32
    torch.cuda.empty_cache()
    torch.cuda.reset_peak_memory_stats()

    # Load a fresh FP16 model
    clip_visual_fp16 = load_clip_fp16()

    # Isolate FP16 memory usage
    with torch.autocast(device_type='cuda', dtype=torch.float16):
        with torch.no_grad():
            _ = clip_visual_fp16(sample_image.to(torch.float16))
        fp16_memory = torch.cuda.max_memory_allocated() / (1024 ** 2)

    print(f"FP32 peak memory usage: {fp32_memory:.2f} MB")
    print(f"FP16 peak memory usage: {fp16_memory:.2f} MB")
    print(f"Memory change: {(fp16_memory/fp32_memory - 1) * 100:.2f}%")

    # Reload models for timing tests
    clip_visual_fp32 = load_clip_fp32()
    clip_visual_fp16 = load_clip_fp16()
    torch.cuda.empty_cache()

    # Timing with torch.cuda.Event for more accurate GPU timing
    print("\nTiming tests (using CUDA events for accuracy):")

    # Function for accurate timing
    def measure_time(model, input_tensor, precision, iterations=100):
        times = []
        for _ in range(iterations):
            if torch.cuda.is_available():
                start_event = torch.cuda.Event(enable_timing=True)
                end_event = torch.cuda.Event(enable_timing=True)

                # Synchronize before starting
                torch.cuda.synchronize()
                start_event.record()

                with torch.no_grad():
                    _ = model(input_tensor)

                end_event.record()
                torch.cuda.synchronize()

                # Convert to milliseconds, then to seconds
                times.append(start_event.elapsed_time(end_event) / 1000)
            else:
                # Fallback to time.time() for CPU
                start = time.time()
                with torch.no_grad():
                    _ = model(input_tensor)
                times.append(time.time() - start)

        return np.mean(times), np.std(times)

    # Measure FP32
    fp32_mean, fp32_std = measure_time(
        clip_visual_fp32,
        sample_image.to(torch.float32),
        "fp32"
    )

    # Measure FP16
    fp16_mean, fp16_std = measure_time(
        clip_visual_fp16,
        sample_image.to(torch.float16),
        "fp16"
    )

    print(f"FP32 inference time: {fp32_mean:.6f} ± {fp32_std:.6f} seconds")
    print(f"FP16 inference time: {fp16_mean:.6f} ± {fp16_std:.6f} seconds")
    print(f"Speed ratio (FP32/FP16): {fp32_mean/fp16_mean:.2f}x")

    # Output comparison
    print("\nOutput comparison:")
    with torch.no_grad():
        # Get outputs
        fp32_output = clip_visual_fp32(sample_image.to(torch.float32))
        fp16_output = clip_visual_fp16(sample_image.to(torch.float16))

        # Convert FP16 output to FP32 for comparison
        fp16_as_fp32 = fp16_output.to(torch.float32)

        # Calculate difference
        abs_diff = torch.abs(fp32_output - fp16_as_fp32)
        rel_diff = abs_diff / (torch.abs(fp32_output) + 1e-8)

        print(f"Mean absolute difference: {abs_diff.mean().item():.8f}")
        print(f"Max absolute difference: {abs_diff.max().item():.8f}")
        print(f"Mean relative difference: {rel_diff.mean().item():.8f}")
        print(f"Max relative difference: {rel_diff.max().item():.8f}")
        print(f"Are there significant differences? {'Yes' if abs_diff.max().item() > 0.01 else 'No'}")

test_fp16_performance_improved()