## Requirements

In [None]:
!pip install transformers
!pip install datasets
!pip install pytorch_metric_learning
!pip install opendatasets
!pip install pydub
!pip install gdown
!pip install tabulate

In [None]:
!pip install git+https://github.com/huggingface/transformers

In [182]:
import torch
from transformers import AutoProcessor, Wav2Vec2Model
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModel
import numpy as np
import torch.nn.functional as F
import torch.nn as nn
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
from time import time
from matplotlib import pyplot as plt
from sklearn.metrics import accuracy_score
import os
from pydub import AudioSegment
import opendatasets as od
import pandas as pd
import gc
import random
import pickle as pkl
import gdown
from tabulate import tabulate

## Load Files

In [76]:
url = "https://drive.google.com/file/d/1-390QxYWgKkhxQLttpEPfQMk0xr8Lb2Y/view?usp=share_link"
output = "test_dataset.pkl"
gdown.download(url, output, quiet=False, fuzzy=True)

url = "https://drive.google.com/file/d/1MWAWrtDt2iS5OhKlsIIt-k1ROSP5M4iM/view?usp=share_link"
output = "final_model.pt"
gdown.download(url, output, quiet=False, fuzzy=True)

url = "https://drive.google.com/file/d/1-02DOpowTyAZULS38dUq3E4e4qYxzXU1/view?usp=share_link"
output = "multimodal_model.pt"
gdown.download(url, output, quiet=False, fuzzy=True)

Downloading...
From: https://drive.google.com/uc?id=1-390QxYWgKkhxQLttpEPfQMk0xr8Lb2Y
To: /kaggle/working/test_dataset.pkl
100%|██████████| 476M/476M [00:04<00:00, 114MB/s]  
Downloading...
From: https://drive.google.com/uc?id=1MWAWrtDt2iS5OhKlsIIt-k1ROSP5M4iM
To: /kaggle/working/final_model.pt
100%|██████████| 846M/846M [00:07<00:00, 115MB/s]  
Downloading...
From: https://drive.google.com/uc?id=1-02DOpowTyAZULS38dUq3E4e4qYxzXU1
To: /kaggle/working/multimodal_model.pt
100%|██████████| 10.7M/10.7M [00:00<00:00, 117MB/s]


'multimodal_model.pt'

## Preparing Data

In [145]:
class TestDataset(Dataset):
    def __init__(self, data):
        self.data = data
        self.id_to_idx = {}
        for i in range(len(self.data)):
            d_id = self.data[i]['id']
            self.id_to_idx[d_id] = i

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        sample = self.data[index]
        text = sample['keywords']
        candidate_idxs = sample['candidates']
        label = sample['label']

        # Get the audio embeddings of the candidate instances
        candidates_audio_embeddings = []
        for d_id in candidate_idxs:
            candidate = self.data[self.id_to_idx[d_id]]
            candidate_audio_embedding = torch.tensor(candidate['audio_embedding'])
            candidates_audio_embeddings.append(candidate_audio_embedding)
        candidates_audio_embeddings = torch.stack(candidates_audio_embeddings)

        return text, candidates_audio_embeddings, label

In [146]:
with open('test_dataset.pkl', 'rb') as f:
    test_data = pkl.load(f)

In [147]:
test_dataset = TestDataset(test_data)
test_loader = DataLoader(dataset=test_dataset, batch_size=1, shuffle=False)

## Model

In [131]:
class JointNN(nn.Module):
    def __init__(self, in_features):
        super(JointNN, self).__init__()
        self.text_seq = nn.Sequential(
            nn.Linear(in_features, 576),
            nn.BatchNorm1d(576),
            nn.LeakyReLU(),
            nn.Dropout(p=0.15),
            nn.Linear(576, 384),
            nn.BatchNorm1d(384),
            nn.LeakyReLU(),
            nn.Dropout(p=0.1),
            nn.Linear(384, 576),
            nn.LeakyReLU(),
            nn.Linear(576, in_features),
        )
        self.audio_seq = nn.Sequential(
            nn.Linear(in_features, 576),
            nn.BatchNorm1d(576),
            nn.LeakyReLU(),
            nn.Dropout(p=0.15),
            nn.Linear(576, 384),
            nn.BatchNorm1d(384),
            nn.LeakyReLU(),
            nn.Dropout(p=0.1),
            nn.Linear(384, 576),
            nn.LeakyReLU(),
            nn.Linear(576, in_features),
        )

    def forward(self, x_text, x_audio):
        x1 = self.text_seq(x_text)
        x2 = self.audio_seq(x_audio)
        return x1, x2

In [189]:
class SimilarityModel:
    def __init__(self, path_to_multimodal_model, sampling_rate=16000, threshold=0.5):
      self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
      self.audio_processor = AutoProcessor.from_pretrained("facebook/wav2vec2-base-960h")
      self.audio_model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base-960h").to(self.device)
      self.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
      self.text_model = AutoModel.from_pretrained("bert-base-uncased").to(self.device)
      self.threshold = threshold
      self.nn_model = torch.load(path_to_multimodal_model)
      self.nn_model.eval()
      self.nn_model = self.nn_model.to(self.device)
      self.sampling_rate = sampling_rate

    def cosine_similarity(self, embedding1, embedding2):
      dim = 1
      embedding1 = F.normalize(embedding1, p=2, dim=dim)
      embedding2 = F.normalize(embedding2, p=2, dim=dim)

      dot_product = torch.sum(embedding1 * embedding2, dim=dim)

      magnitude1 = torch.norm(embedding1, p=2, dim=dim)
      magnitude2 = torch.norm(embedding2, p=2, dim=dim)

      cosine_sim = dot_product / (magnitude1 * magnitude2)

      return cosine_sim

    def predict(self, audios, texts, threshold=None, is_query=False):
      audio_embs = []
      for audio in audios:
        inputs = self.audio_processor(audio, sampling_rate=self.sampling_rate, return_tensors="pt", padding=True).to(device)
        outputs = self.audio_model(**inputs)
        last_hidden_states = outputs.last_hidden_state.squeeze(0)
        audio_embedding = last_hidden_states.mean(dim=0)
        audio_embs.append(audio_embedding)
      

      text_embs = []
      for text in texts:
        inputs = self.tokenizer(text, return_tensors="pt").to(device)
        outputs = self.text_model(**inputs)
        embedding = outputs.last_hidden_state.squeeze(0)
        text_embedding = embedding.mean(dim=0)
        text_embs.append(text_embedding)
      if is_query:
        item = text_embs[0]
        for i in range(len(audios) - 1):
          text_embs.append(item.clone())


      audio_embedding = torch.stack(audio_embs)
      text_embedding = torch.stack(text_embs)
      text_embedding = text_embedding.to(self.device)
      audio_embedding = audio_embedding.to(self.device)

      text_final_emb, audio_final_emb = self.nn_model(text_embedding, audio_embedding)
      cosine_similarity = self.cosine_similarity(text_final_emb, audio_final_emb)

      final_threshold = threshold if threshold else self.threshold

      return cosine_similarity, (cosine_similarity >= final_threshold).int()

    def retrieve_relevant_audios(self, audios, query, threshold=None):
      return self.predict(audios, [query], is_query=True, threshold=threshold)

In [190]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [213]:
PATH = 'final_model.pt'
model = torch.load(PATH)

### CLAP

## Test and Evaluation

In [220]:
def evaluate(model, dataloader):
    total_hits_1 = 0
    total_mrr = 0
    total_instances = 0
    total_labels = []
    total_predictions = []

    with torch.no_grad():
        for text, candidates, label in tqdm(dataloader):
            batch_size = label.size(0)
            text = text[0]
            candidates = candidates.to(device)
            candidates = candidates.squeeze()
            label = label.to(device)

            # Compute text-to-candidates similarities
            text_candidate_cosine_similarities, res = model.retrieve_relevant_audios(candidates, text, threshold=0.6)

            # Compute Hits@1
            _, predicted_idx = torch.max(text_candidate_cosine_similarities, dim=0)
            hits_1 = torch.sum(predicted_idx == label)
            total_hits_1 += hits_1.item()

            # Compute MRR
            candidate_ranks = torch.argsort(text_candidate_cosine_similarities, descending=True)
            candidate_ranks = candidate_ranks.tolist()
            label_rank = candidate_ranks.index(label.item())
            reciprocal_rank = 1 / (label_rank + 1)
            total_mrr += reciprocal_rank

            # Record predictions and labels
            predictions = res[label[0]].cpu()
            total_labels.append(1)
            total_predictions.append(predictions)

            total_instances += batch_size

    # Compute average metrics over all instances
    avg_hits_1 = total_hits_1 / total_instances
    avg_mrr = total_mrr / total_instances
    precision = precision_score(total_labels, total_predictions, average='macro')
    recall = recall_score(total_labels, total_predictions, average='macro')
    f1 = f1_score(total_labels, total_predictions, average='macro')
    accuracy = accuracy_score(total_labels, total_predictions)

    return {
        'Hits@1': avg_hits_1,
        'MRR': avg_mrr,
        'Precision': precision,
        'Recall': recall,
        'F1': f1,
        'Accuracy': accuracy
    }

In [221]:
results = evaluate(model, test_loader)
table = []
for i in range(len(results)):
    table.append([list(results.keys())[i], list(results.values())[i]])
print(tabulate(table, ['Metrics', 'Values'], tablefmt="grid"))

100%|██████████| 300/300 [00:28<00:00, 10.53it/s]

+-----------+-----------+
| Metrics   |    Values |
| Hits@1    | 0.163333  |
+-----------+-----------+
| MRR       | 0.406278  |
+-----------+-----------+
| Precision | 0.5       |
+-----------+-----------+
| Recall    | 0.05      |
+-----------+-----------+
| F1        | 0.0909091 |
+-----------+-----------+
| Accuracy  | 0.1       |
+-----------+-----------+



  _warn_prf(average, modifier, msg_start, len(result))


## Finding best hyper parameteres

In [None]:
# find best hyperparameter

size = len(audios)
labels = []
text_series = []
audio_series = []

for i in range(size):
  for j in range(size):
    text_series.append(texts[i])
    audio_series.append(audios[j])
    labels.append(1 if i == j else 0)              

In [None]:
def accuracy_of_model(model, audios, texts, labels, threshold):
  result = model.predict(audios, texts, threshold=threshold)
  accuracy = accuracy_score(result.detach().cpu(), labels)
  return accuracy

In [None]:
    torch.cuda.empty_cache()
    torch.cuda.ipc_collect()
    gc.collect()

In [None]:
zipped_lists = list(zip(audio_series, text_series, labels))

n = 10
sampled_tuples = random.sample(zipped_lists, n)

# unzip the sampled tuples into separate lists
audios_test, texts_test, labels_test = zip(*sampled_tuples)

In [None]:
threshold_list = [0.05 * i for i in range(4,18)]
best_accuracy = 0
best_threshold = 0
for threshold in threshold_list:
    torch.cuda.empty_cache()
    torch.cuda.ipc_collect()
    accuracy = accuracy_of_model(final_model, audios_test, texts_test, labels_test, threshold)
    print(f"Accuracy with threshold {threshold:.2f} = {accuracy * 100:.2f}%")
    if accuracy > best_accuracy:
        best_threshold = threshold
        best_accuracy = accuracy
    gc.collect()

print(f"Best threshold is {best_threshold:.2f}")
print(f"Best accuracy is {best_accuracy:.2f}")

In [None]:
PATH = "multimodal_model.pt"
best_model = SimilarityModel(PATH, threshold=best_threshold)

In [None]:
PATH = "final_relevant_model_v2.pt"

# Save
torch.save(best_model, PATH)

In [None]:
# Load
PATH = "final_relevant_model.pt"
best_model = torch.load(PATH)
best_model.eval()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
best_model = best_model.to(device)

In [None]:
from IPython.display import FileLink
FileLink(r'final_relevant_model_v2.pt')

In [None]:
from IPython.display import FileLink
FileLink(r'multimodal_model.pt')