In [32]:
import torch
import torch.nn as nn
import numpy as np
from torchsummaryX import summary
import sklearn
import gc
import zipfile
import pandas as pd
from tqdm.auto import tqdm
import os
import datetime
import wandb
from torch.nn.utils.rnn import pad_sequence
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Device: ", device)
torch.manual_seed(0)

from sentence_transformers import SentenceTransformer
model_text = SentenceTransformer('paraphrase-MiniLM-L6-v2')


import json

class RASDataset(torch.utils.data.Dataset):

    def __init__(self, root, file_pth, partition = 'train', subset= None): 
        # Load the directory and all files in them
        f = open(file_pth)
        self.data_json = json.load(f)
        f.close()

        self.length = len(self.data_json)       
        self.base_path = os.path.join(root, partition)

    def __len__(self):
        return self.length

    def __getitem__(self, idx):
        cur_dict = self.data_json[idx]
        name = str(cur_dict['id'])

        audio_feat = np.load(os.path.join(self.base_path, name+'_feat.npy'))
        print(os.path.join(self.base_path, name+'_feat.npy'))
        # audio_feat = np.load(os.path.join(self.base_path, name+'_hidden_state.npy'))
        # print('**************',audio_feat.shape)
        
        audio_feat = np.squeeze(audio_feat)

        text = [cur_dict['description']]
        text_feat = model_text.encode(text)
        text_feat = np.squeeze(text_feat)

        target = np.load(os.path.join(self.base_path, name+'_target.npy'))
        target = np.squeeze(target)
        
        audio_feat = torch.FloatTensor(audio_feat)
        text_feat = torch.FloatTensor(text_feat)
        target = torch.LongTensor(target)

        sample = {
                  "audio_feat": audio_feat,
                  "text_feat": text_feat,
                  "target": target
                }

        return sample


    def collate_fn(self,batch):

        batch_audio = [i["audio_feat"] for i in batch]
        batch_text = [i["text_feat"] for i in batch]
        batch_target = [i["target"] for i in batch]

        batch_audio_pad = pad_sequence(batch_audio, batch_first=True)
        lengths_audio = [i.shape[0] for i in batch_audio]

        batch_target_pad = pad_sequence(batch_target, batch_first=True)
        lengths_target = [i.shape[0] for i in batch_target]

        batch_audio_pad = torch.FloatTensor(batch_audio_pad)
        batch_text = torch.stack(batch_text)
        batch_target_pad = torch.LongTensor(batch_target_pad)

        return batch_audio_pad, batch_text, batch_target_pad, torch.tensor(lengths_audio), torch.tensor(lengths_target)


root = './'

test_data = RASDataset(root, 'test_combined.json', partition= "test")

test_loader = torch.utils.data.DataLoader(
    dataset     = test_data, 
    num_workers = 0,
    batch_size  = 1,
    collate_fn = test_data.collate_fn,
    pin_memory  = True,
    shuffle     = False
)

class Attention(nn.Module):
    def __init__(self, input_size: int, hidden_size: int):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.Tanh(),
        )
        self.linear = nn.Linear(hidden_size, 1, bias=False)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        outputs = self.linear(self.fc(x))
        # print(outputs.size())
        alpha = torch.softmax(outputs, dim=2)
        x = (x * alpha)
        return x

class RASModel(torch.nn.Module):

    def __init__(self, embed_dim, num_heads, dropout):
        super().__init__()

        self.num_heads = num_heads
        self.embed_dim = embed_dim
        self.dropout = dropout
        self.audio_linear = nn.Linear(self.embed_dim, self.embed_dim)
        self.text_linear = nn.Linear(384, self.embed_dim)
        self.mha_a_t = nn.MultiheadAttention(embed_dim=self.embed_dim, num_heads=self.num_heads,
                                               dropout=self.dropout, batch_first=True)
        self.mha_t_a = nn.MultiheadAttention(embed_dim=self.embed_dim, num_heads=self.num_heads,
                                               dropout=self.dropout, batch_first=True)
        self.fc1 = nn.Linear(self.embed_dim*2, 2)
        self.fc2 = nn.Linear(self.embed_dim,124)
        self.fc3 = nn.Linear(124,2)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
        self.rnn = nn.LSTM(self.embed_dim,self.embed_dim,2,batch_first=True)
        self.concat_linear = nn.Linear(in_features=2 * self.embed_dim, out_features= self.embed_dim)
        self.classifier = nn.Linear(in_features= self.embed_dim, out_features=2)
        
    
    def forward(self, audio_fea, text_fea):

        B, T, D = audio_fea.size()
        text_fea = text_fea[:,None,:]
        text_fea_rep = text_fea.repeat(1, T, 1) #B,1, 384 -> B, T, 384

        audio_fea = self.audio_linear(audio_fea)
        text_fea = self.text_linear(text_fea_rep)

        x_a2t, _ = self.mha_a_t(text_fea, audio_fea, audio_fea)
        # x_a2t = torch.mean(x_a2t, dim=2)

        x_t2a, _ = self.mha_t_a(audio_fea, text_fea, text_fea)
        # x_t2a = torch.mean(x_t2a, dim=2)

        x = torch.stack((x_a2t, x_t2a), dim=2)
        x_mean, x_std = torch.std_mean(x, dim=2)
        x = torch.cat((x_mean, x_std), dim=2)  
        x = self.concat_linear(x)
        # x,_ = self.rnn(x)
        x = self.classifier(x)
        return x

model = RASModel(embed_dim = 512, num_heads = 8, dropout=0.2).to(device)
model.load_state_dict(torch.load('best.pkl'))

class weighted_log_loss(nn.Module):    
    def __init__(self):
        super(weighted_log_loss,self).__init__()
        self.LOSS_BIAS = 0.2

    def forward(self, yt, yp):   
        pos_loss = -(0 + yt) * torch.log(0 + yp + 1e-7)
        neg_loss = -(1 - yt) * torch.log(1 - yp + 1e-7)

        return self.LOSS_BIAS * torch.mean(neg_loss) + (1. - self.LOSS_BIAS) * torch.mean(pos_loss)


import numpy as np

from sklearn.metrics import f1_score

def eval(model, dataloader):

    model.eval() # set model in evaluation mode
    vloss, vacc = 0, 0 # Monitoring loss and accuracy

    prob_all = []
    label_all = []

    for i, (audio_fea, text_fea, target, audio_len, target_len) in enumerate(dataloader):
        if i>1:
            return np.argmax(logits, axis=2).squeeze()
        ### Move Data to Device (Ideally GPU)
        np.save('input_audio.npy', audio_fea)
        audio_fea = audio_fea.to(device)
        text_fea  = text_fea.to(device)
        target = target.to(device)

        # makes sure that there are no gradients computed as we are not training the model now
        with torch.inference_mode(): 
            ### Forward Propagation
            logits  = model(audio_fea, text_fea).detach().cpu().numpy()


    return logits


output   = eval(model, test_loader)
    





Device:  cuda
./test/0_feat.npy
./test/1_feat.npy
./test/2_feat.npy


In [36]:
a = np.load('input_audio.npy')
a.shape

(1, 747, 512)