In [None]:
!pip install matplotlib==3.4
!pip install transformers
!pip install datasets
!pip install timm

In [None]:
import shutil

# Source file or directory path
source_path = '/content/drive/MyDrive/meva'

# Destination directory path within /content
destination_path = '/content/meva'

# Copy the file or directory from the source to the destination
shutil.copytree(source_path, destination_path)



In [None]:
import sys
sys.path.append('/content/meva')

In [None]:
import build_meva

In [None]:
from build_meva import meva

In [None]:
# Create an instance of the meva class
dataset_meva = meva()

# Download and prepare the dataset
dataset_meva.download_and_prepare()

# Load the dataset
meva_test = dataset_meva.as_dataset(split='test')

Generating test split: 0 examples [00:00, ? examples/s]

In [None]:
import timm
from timm.data import resolve_data_config, create_transform
import os
import os.path
import torch
from PIL import Image
from collections import defaultdict

INPUT_NAMES = ["A", "A'", "B"]
device = "cuda" if torch.cuda.is_available() else "cpu"

model = timm.create_model('convnext_large', pretrained=True)
config = resolve_data_config({}, model=model)
transform = create_transform(**config)
model = model.to(device)

In [None]:
class ZeroShot:

    def __init__(self, model, core_model_preprocess_func):
        """
        Parameters
        ----------
        model : Pretrained model for feature extraction
        core_model_preprocess_func : A torchvision transform that converts a PIL image into a tensor that the returned
        model can take as its input
        """

        self.model = model
        if core_model_preprocess_func:
            self.core_model_preprocess_func = core_model_preprocess_func

    def get_scores(self, all_image_features):
        """

        Parameters
        ----------
        all_image_features : (dict) key is the image file name and the value is a preprocessed image Tensor

        Returns
        -------
        (dict) where the key is the image file name and the value is the list cosine scores for each candidate

        """
        scores = defaultdict(list)
        candidates = all_image_features['candidates']

        with torch.no_grad():
            chosen_features_list = {img_name: self.forward_core_model(all_image_features[img_name])
                                    for img_name in INPUT_NAMES}

            for k, im in enumerate(candidates):
                D_features = self.forward_core_model(im)

                for img_name, features in chosen_features_list.items():
                    score = self.get_img_cosine_similarity(D_features, features)
                    scores[img_name].append(float(score))

        return {img_name: scores[img_name] for img_name in INPUT_NAMES}

    def get_analogies_scores(self, all_image_features, candidates):
        """
        Parameters
        ----------
        all_image_features : (dict) including extracted features
        candidates : (list) image file names of the candidates

        Returns
        -------
        Cosine similarities between any given candidate to C+(B-A)

        """
        scores = {}
        candidate_features = all_image_features['candidate_features']

        with torch.no_grad():
            all_inp_feats = {img_name: self.forward_core_model(all_image_features[img_name])
                             for img_name in INPUT_NAMES}
            # The goal is to find candidate B' ~ B + (A'-A)
            B_plus_A_prime_minus_A = all_inp_feats['B'] + all_inp_feats["A'"] - all_inp_feats['A']

            # For each candidate D calculate cosine (B',B + (A'-A))
            for k, (cand_name, im) in enumerate(zip(candidates, candidate_features)):
                B_prime_features = self.forward_core_model(im)
                score = self.get_img_cosine_similarity(B_plus_A_prime_minus_A, B_prime_features)
                scores[cand_name] = float(score)

        return scores

    def forward_core_model(self, img):
        """
        Parameters
        ----------
        img : (Tensor) image to extract features from

        Returns
        -------
        Features extracted from the given image

        """

        x = self.model(img)
        return x

    def preprocess(self, img):
        """
        Parameters
        ----------
        img : file name of the image

        Returns
        -------
        (Tensor) preprocessed image

        """
        img_preprocessed = self.core_model_preprocess_func(img).unsqueeze(0).to(device)
        return img_preprocessed

    @staticmethod
    def get_img_cosine_similarity(im1_feats, im2_feats):
        """
        Parameters
        ----------
        im1_feats : (Tensor) shape [1,embedding size]
        im2_feats : (Tensor) shape [1,embedding size]

        Returns
        -------
        Cosine similarity between the two given vectors

        """

        # Normalize the features
        im1_feats /= im1_feats.norm(dim=-1, keepdim=True)
        im2_feats /= im2_feats.norm(dim=-1, keepdim=True)

        # Calculate cosine similarity
        similarity = im2_feats.detach().cpu().numpy() @ im1_feats.detach().cpu().numpy().T
        return similarity

def run_trained_scores(model, row):
    """
    Parameters
    ----------
    model : ZeroShot model
    row : pandas DataFrame row contains information on the inputs images and the candidates
    Returns
    -------
    (dict)  where the key is the image file name of a candidate and the value is  cosine of (candidate,C+(B-A)

    """
    all_image_features = {k: model.preprocess(row[k]) for k in INPUT_NAMES}
    all_image_features['candidate_features'] = [model.preprocess(candidate) for candidate in row['candidates_images']]
    return model.get_analogies_scores(all_image_features, row['candidates'])

zero_shot_model = ZeroShot(model, core_model_preprocess_func=transform)

In [None]:
# Define an empty dictionary to store the predictions and scores for each instance
predictions = {}

# Loop over the instances in the VASR test dataset
for i in range(len(meva_test['A'])):
    # Create a dictionary for the instance
    row = {
        'A': meva_test['A'][i],
        "A'": meva_test["A'"][i],
        'B': meva_test['B'][i],
        "B'": meva_test["B'"][i],
        'candidates_images': meva_test['candidates_images'][i],
        'candidates': meva_test['candidates'][i]
    }

    # Predict scores for the current instance
    scores = run_trained_scores(zero_shot_model, row)

    # Store the prediction and scores in the dictionary
    predictions[f"Instance {i+1}"] = {'prediction': max(scores, key=scores.get), 'scores': scores}

# Print the predictions and scores for all instances
print("Predictions and Scores:")
for instance, data in predictions.items():
    print(f"{instance}: Prediction - {data['prediction']}")
    print("Scores:")
    for candidate, score in data['scores'].items():
        print(f"    {candidate}: {score}")
