In [2]:
import pandas as pd

import numpy as np

import concurrent.futures as cf

from tqdm.notebook import tqdm

from PIL import Image

import torch

from sklearn.preprocessing import StandardScaler

from transformers import CLIPProcessor, CLIPModel

from torch.utils.data import DataLoader, Dataset

from torch.optim import Adam
from torch import nn

## Initialize CLIP model

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
net = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

cuda




## Read Metadata

In [4]:
instagram_df = pd.read_csv('instagram_data.csv')
instagram_df['image_path'] = instagram_df['image_path'].str.replace('../Data/', '')
instagram_df["class"] = instagram_df["likes"].apply(lambda x: min(len(str(x)), 6) - 4).reset_index(drop=True)
print(instagram_df['class'].value_counts())

class
2    2137
1    1521
0     127
Name: count, dtype: int64


In [5]:
likes_upper_bound = instagram_df['likes'].quantile(0.9)
comments_upper_bound = instagram_df['no_of_comments'].quantile(0.9)
t_lower_bound = instagram_df['t'].quantile(0.1)
instagram_df = instagram_df[(instagram_df['likes'] < likes_upper_bound) & (instagram_df['no_of_comments'] < comments_upper_bound) & (instagram_df['t'] > t_lower_bound)].reset_index(drop=True)

## Generate Label Embedding

In [6]:
clip_labels = [
    "a photo that received low number of likes on instagram", 
    "a photo that received medium number of likes on instagram", 
    "a photo that received high number of likes on instagram"
]
label_tokens = processor(clip_labels, return_tensors="pt", padding=True).to(device)
label_emb = net.get_text_features(**label_tokens)
label_emb = label_emb.detach().cpu().numpy()
label_emb = StandardScaler().fit_transform(label_emb)

  attn_output = torch.nn.functional.scaled_dot_product_attention(


## Dataset Class Implementation

In [7]:
class instagram_dataset(Dataset):
    
    def __init__(self, df):
        self.df = df

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        text = f"""For this photo the number of comments is {row['no_of_comments']} and the photo uploader has {row['follower_count_at_t']} followers at time {row['t']} on instagram"""
        image = processor(text = text, images=Image.open(row['image_path']), return_tensors="pt", padding=True)['pixel_values']
        return image[0], row['class']

In [8]:
train_df = instagram_df.sample(frac=0.8, random_state=0)
test_df = instagram_df.drop(train_df.index).reset_index(drop=True)
train_df = train_df.reset_index(drop=True)
train_dataset = instagram_dataset(train_df)
test_dataset = instagram_dataset(test_df)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

## Zero-shot Classification

In [9]:
def classification(df):
    preds = []
    batch_size = 32
    for i in tqdm(range(0, len(df), batch_size)):
        i_end = min(i + batch_size, len(df))
        text = [f"""For this photo the number of comments is {df.loc[j, "no_of_comments"]} and the photo uploader has {df.loc[j, "follower_count_at_t"]} followers at time {df.loc[j, "t"]} on instagram""" for j in range(i, i_end)]
        images = [Image.open(df.loc[j, "image_path"]) for j in range(i, i_end)]
        image = processor(text = text, images=images, return_tensors="pt", padding=True)['pixel_values'].to(device)
        image_emb = net.get_image_features(image)
        image_emb = image_emb.detach().cpu().numpy()
        scores = np.dot(image_emb, label_emb.T)
        pred = np.argmax(scores, axis=1)
        preds.extend(pred)
    return np.array(preds)

preds = classification(test_df)

  0%|          | 0/19 [00:00<?, ?it/s]

In [10]:
true_preds = test_df['class'].values
accuracy = (preds == true_preds).mean()
accuracy

0.33671742808798644

## Finetuning

In [None]:
optimizer = Adam(net.parameters(), lr=5e-5, betas=(0.9, 0.98), eps=1e-6, weight_decay=0.2)
loss_img = nn.CrossEntropyLoss()

num_epochs = 100
for epoch in range(1, num_epochs + 1):
    net.train()
    for images, labels in tqdm(train_loader, desc=f"Epoch {epoch}/{num_epochs}"):
        optimizer.zero_grad()
        images = images.to(device)
        labels = labels.to(device)
        image_emb = net.get_image_features(images)
        scores = torch.matmul(image_emb, torch.tensor(label_emb.T).to(device))
        loss = loss_img(scores, labels)
        loss.backward()
        optimizer.step()
    
    print(f"Loss: {loss.item()}")
    if epoch % 1 == 0:
        torch.save({
            'epoch': epoch,
            'model_state_dict': net.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss,
            'clip_labels': clip_labels,
            'label_emb': label_emb
        }, f"finetuned_clip/clip_{epoch + 1}.pt")
        accuracy = (classification(test_df) == test_df['class'].values).mean()
        print(f"Accuracy: {accuracy}")

## Prediction with Best CLIP Model

In [13]:
trained_Net = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
checkpoint = torch.load("finetuned_clip/clip_50.pt")
trained_Net.load_state_dict(checkpoint['model_state_dict'])

  checkpoint = torch.load("finetuned_clip/clip_50.pt")


<All keys matched successfully>

In [16]:
df = test_df
preds = []
batch_size = 32
for i in tqdm(range(0, len(df), batch_size)):
    i_end = min(i + batch_size, len(df))
    text = [f"""For this photo the number of comments is {df.loc[j, "no_of_comments"]} and the photo uploader has {df.loc[j, "follower_count_at_t"]} followers at time {df.loc[j, "t"]} on instagram""" for j in range(i, i_end)]
    images = [Image.open(df.loc[j, "image_path"]) for j in range(i, i_end)]
    image = processor(text = text, images=images, return_tensors="pt", padding=True)['pixel_values'].to(device)
    image_emb = trained_Net.get_image_features(image)
    image_emb = image_emb.detach().cpu().numpy()
    scores = np.dot(image_emb, label_emb.T)
    pred = np.argmax(scores, axis=1)
    preds.extend(pred)

  0%|          | 0/19 [00:00<?, ?it/s]

In [17]:
true_preds = test_df['class'].values
accuracy = (np.array(preds) == true_preds).mean()
accuracy

0.5245346869712352