In [None]:
from PIL import Image
import requests
import matplotlib.pyplot as plt
import numpy as np
import scipy
import pandas as pd
import os
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from tqdm import tqdm
from transformers import CLIPProcessor, CLIPModel
from transformers import logging
logging.set_verbosity_error()

In [None]:
# model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14")
# processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14")
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

In [None]:
print(model.num_parameters()) # 427616513 params for large-patch14
                              # 151277313 params for base-patch32
                              # 149620737 params for base-patch16

In [None]:
# # boilerplate code from huggingface docs
# url = "http://images.cocodataset.org/val2017/000000039769.jpg"
# image = Image.open(requests.get(url, stream=True).raw)
# plt.imshow(image)
# inputs = processor(text=["a photo of a cat", "a photo of a dog"], images=image, return_tensors="pt", padding=True)

# outputs = model(**inputs)
# logits_per_image = outputs.logits_per_image # this is the image-text similarity score
# probs = logits_per_image.softmax(dim=1) # we can take the softmax to get the label probabilities
# print(outputs.text_embeds.shape)

In [None]:
# Loading data
caption_path = "/Users/echen/Desktop/CSE_6242.nosync/CSE6242-Columnists/data_vis/summaries"
cartoon_path = "/Users/echen/Desktop/CSE_6242.nosync/CSE6242-Columnists/data_vis/cartoons"
ids = set()
cap_paths = []
img_paths = []
for file in os.listdir(caption_path):
    file_path = os.path.join(caption_path, file)
    if not os.path.isfile(file_path) or file[-3:]!='csv' or file[:3] in ids:
        continue
    ids.add(file[:3])
    cap_paths.append(file_path)
for file in os.listdir(cartoon_path):
    file_path = os.path.join(cartoon_path, file)
    if not os.path.isfile(file_path) or file[-3:]!='jpg':
        continue
    if file[:3] not in ids: 
        print("extra_img: ", file)
        continue
    img_paths.append(file_path)
id_list = list(ids)
id_list.sort()
cap_paths.sort()
img_paths.sort()
assert(len(img_paths) == len(cap_paths))

In [None]:
# pca = PCA(n_components=50)
# condensed_data = pca.fit_transform(img1_cap)
# print(condensed_data.shape, np.max(condensed_data), np.min(condensed_data))
# tsne = TSNE()
# tsne_embeds = tsne.fit_transform(condensed_data)
# norm_tsne_embeds = np.zeros(tsne_embeds.shape)
# mins = np.min(tsne_embeds, axis=0, keepdims=True)
# maxes = np.max(tsne_embeds, axis=0, keepdims=True)
# norm_tsne_embeds = (tsne_embeds-mins)/(maxes-mins)
# print(norm_tsne_embeds.shape, np.max(norm_tsne_embeds), np.min(norm_tsne_embeds))
# print(norm_tsne_embeds[:5])

# TSNE Processing Functions

In [None]:
def get_embeds(model, image, caption_list):
    inputs = processor(text=caption_list, images=image, return_tensors="pt", padding=True)
    outputs = model(**inputs)
    text_embeds = outputs.text_embeds.detach().numpy()
    img_embeds = outputs.image_embeds.detach().numpy()
    return text_embeds, img_embeds

In [None]:
pca = PCA(n_components=50)
tsne = TSNE()
def get_tsne_embeds(img_df):
    caps = img_df["cap_feat"]
    cap_data = np.vstack(caps)
    condensed_data = pca.fit_transform(cap_data)
    tsne_embeds = tsne.fit_transform(condensed_data)
    norm_tsne_embeds = np.zeros(tsne_embeds.shape)
    mins = np.min(tsne_embeds, axis=0, keepdims=True)
    maxes = np.max(tsne_embeds, axis=0, keepdims=True)
    norm_tsne_embeds = (tsne_embeds-mins)/(maxes-mins)
    norm_tsne_embeds_df = pd.DataFrame({'X': norm_tsne_embeds[:,0], 'Y': norm_tsne_embeds[:,1], "caption": img_df["caption"]})
    return norm_tsne_embeds_df

In [None]:
def filter_long_captions(cap_df, filter_len=50):
    cap_df["sentence_len"] = cap_df["caption"].apply(lambda x: len(x.split()))
    return cap_df[cap_df["sentence_len"] <=filter_len].reset_index()
    

In [None]:
batch_size = 32
def get_embeds_from_path(img_id, img_path, cap_path, batch_size=32):
    df_list = []
    img = Image.open(img_path)
    inputs = processor(text=['test'], images=img, return_tensors="pt", padding=True)
    img_feat = model(**inputs).image_embeds.detach().numpy()
    cap_csv = pd.read_csv(cap_path)
    cap_csv = filter_long_captions(cap_csv)
    cap_csv["img_id"] = img_id
    cap_csv["cap_feat"] = None
    cap_csv["img_feat"] = [img_feat]*len(cap_csv.index)
    caption_list = []
    for idx,row in cap_csv.iterrows():
        caption_list.append(row['caption'])
        if (idx == (len(cap_csv.index)-1)):
            text_embeds, _ = get_embeds(model, img, caption_list)
            for i,embed in enumerate(text_embeds):
                cap_csv.at[idx-len(text_embeds)+1+i,"cap_feat"] = text_embeds[i]
            caption_list = []
        if (len(caption_list) != batch_size):continue
        text_embeds, _ = get_embeds(model, img, caption_list)
        for i,embed in enumerate(text_embeds):
            cap_csv.at[idx-batch_size+1+i,"cap_feat"] = text_embeds[i]
        caption_list = []
    df_list.append(cap_csv)
    return pd.concat(df_list)

In [None]:
relevant_columns = ["caption", "img_id", "mean","votes", "img_feat", "cap_feat"]
def process_dataset_tsne(id_list, img_paths, cap_paths):
    tsne_list = []
    cap_list = []
    count = 0
    for img_id, img_path, cap_path in zip(id_list, img_paths, cap_paths):
        # process for TSNE data

        img_df = get_embeds_from_path(img_id, img_path, cap_path)
        tsne_feats = get_tsne_embeds(img_df)
        tsne_feats["img_id"] = img_id
        tsne_list.append(tsne_feats)

        # save complete dataset
        cap_csv = pd.read_csv(cap_path)
        cap_csv["img_id"] = img_id
        cap_csv["img_feat"] = img_df["img_feat"]
        cap_csv["cap_feat"] = img_df["cap_feat"]
        cap_list.append(cap_csv[relevant_columns])
        break
        # if count >= 1:break
        # count+=1
    return pd.concat(tsne_list), pd.concat(cap_list)

# Create complete TSNE DataFrame

In [None]:
# tsne_df = process_dataset_tsne(id_list, img_paths, cap_paths)
tsne_df, condensed_cap_df = process_dataset_tsne(id_list, img_paths, cap_paths)
tsne_df.head(5)


In terms of preprocessing, we will likely want to save the original dataset dataframes without the precison or score breakdown columns. The TSNE dataframe above can be saved as is since it should be possible to filter for the correct img_id when necessary. 

In [None]:
condensed_cap_df.head(5)

# Building FF Scoring Model

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader

In [None]:

# Assuming df is your DataFrame

# Combine the embeddings into a single feature array
# Note: This assumes 'cap_feat' and 'img_feat' are each a list or array of 512 floats.
X = np.hstack([np.vstack(df['img_feat'].values), np.vstack(df['cap_feat'].values)])

# Your target variable
y = df['mean'].values/2

# Split into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
class HumorRatingNN(nn.Module):
    def __init__(self):
        super(HumorRatingNN, self).__init__()
        
        self.fc1 = nn.Linear(1024, 512) # 1024 inputs (512 from image + 512 from caption), to 512 outputs
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(512, 256)  # 512 inputs to 256 outputs
        self.fc3 = nn.Linear(256, 1)    # 256 inputs to 1 output (your mean humor rating)
        
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)  # No activation here, as we're predicting a continuous value
        return x

In [None]:
# Convert data to PyTorch tensors
train_data = TensorDataset(torch.Tensor(X_train), torch.Tensor(y_train))
test_data = TensorDataset(torch.Tensor(X_test), torch.Tensor(y_test))

# Create data loaders
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

# Initialize the model, loss function, and optimizer
model = HumorRatingNN()
criterion = nn.MSELoss()  # Mean Squared Error Loss
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), targets)
        loss.backward()
        optimizer.step()
    
    print(f'Epoch {epoch+1}, Loss: {loss.item()}')