In [None]:
import json

# Load the test data
with open("test_file.json", 'r') as file:
    val = json.load(file)

In [None]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
import string
import nltk
import re



class PreProcessor():
    
    def __init__(self) -> None:
        self.numbers = {
            "0":"zero",
            "1":"one",
            "2":"two",
            "3":"three",
            "4":"four",
            "5":"five",
            "6":"six",
            "7":"seven",
            "8":"eight",
            "9":"nine"
        }
        
    def remove_puntuations(self, txt):
        punct = set(string.punctuation)
        txt = " ".join(txt.split("."))
        txt = " ".join(txt.split("!"))
        txt = " ".join(txt.split("?"))
        txt = " ".join(txt.split(":"))
        txt = " ".join(txt.split(";"))

        txt = "".join(ch for ch in txt if ch not in punct)
        return txt
    
    def number_to_words(self, txt):
        for k in self.numbers.keys():
            txt = txt.replace(k,self.numbers[k]+" ")
        return txt
    
    def preprocess_text(self, text):
        text = text.lower()
        text = re.sub(r'_',' ',text)
        text = self.number_to_words(text)
        text = self.remove_puntuations(text)
        text = ''.join([i if ord(i) < 128 else '' for i in text])
        text = ' '.join(text.split())
        return text


In [None]:
from transformers import BertTokenizer, BertModel
from tqdm import tqdm
import numpy as np

preprocessor = PreProcessor()

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')
model.to(device)
model.eval()

def get_embeddings(model , preprocessor , tokenizer , sentences):
    model.to(device)  # Move the model to the same device as the data
    embeddings = []
    for sentence in tqdm(sentences, desc="Computing BERT embeddings"):
        with torch.no_grad():
            sentence = preprocessor.preprocess_text(sentence)
            inputs = tokenizer(sentence, return_tensors="pt", padding=True, truncation=True, max_length=64)
            inputs = inputs.to(device)  # Move the inputs to the same device as the model
            outputs = model(**inputs)
            embeddings.append(outputs.pooler_output.squeeze().cpu().numpy())
    return np.array(embeddings)

In [None]:
def prepare_data(data):
    
    emo2idx = {
        'anger': 0,
        'joy': 1,
        'fear': 2,
        'disgust': 3,
        'neutral': 4,
        'surprise': 5,
        'sadness': 6
    }
    
    seq_len = 25
    
    embeddings_utt = get_embeddings(model , preprocessor , tokenizer , [u for episode in tqdm(data, desc="Processing Episodes Data") for u in episode['utterances']])

    X = []
    Y = []
    
    # Process each episode for emotion flips
    for episode in tqdm(data, desc="Processing episodes"):
        episode_utterances = episode['utterances']
        episode_emotions = episode['emotions']

        episode_X, episode_Y = [], []
        previous_emo_idx = None

        for i, (utterance, emotion) in enumerate(zip(episode_utterances, episode_emotions)):
            # Convert utterance to BERT embedding index
            utterance_idx = i  
            
            emo_idx = emo2idx.get(emotion, 4)

            flip = 1 if previous_emo_idx is not None and previous_emo_idx != emo_idx else 0
            previous_emo_idx = emo_idx

            # Append data
            episode_X.append(embeddings_utt[utterance_idx])
            episode_Y.append(flip)

        if len(episode_X) < seq_len:
            padding_length = seq_len - len(episode_X)
            episode_X.extend([np.zeros_like(episode_X[0])] * padding_length)
            episode_Y.extend([0] * padding_length)

        elif len(episode_X) > seq_len:
            episode_X = episode_X[-seq_len:]
            episode_Y = episode_Y[-seq_len:]

        X.append(episode_X)
        Y.append(episode_Y)

    X = torch.FloatTensor(X)  
    Y = torch.FloatTensor(Y) 

    return X.to(device), Y.to(device)

In [None]:
X_test, Y_test = prepare_data(val)

In [None]:
import torch.nn as nn

class Model(nn.Module):
    def __init__(self, input_size=768, hidden_size=128, num_layers=2, dropout=0.2):
        super(Model, self).__init__()
        self.relu = nn.ReLU()
        self.lstm = nn.LSTM(input_size, input_size, num_layers, 
                              dropout=dropout, bidirectional=False, batch_first=True)
        self.linear = nn.Linear(input_size, hidden_size)
        self.final_linear = nn.Linear(hidden_size, 1)

    def forward(self, text_encoding):
        x = text_encoding.float().squeeze()
        x, _ = self.lstm(x)

        x = self.linear(x)
        x = self.relu(x)
        x = self.final_linear(x)

        x = torch.sigmoid(x)
        return x.squeeze(-1)

In [None]:
model = torch.load("lstm_t2.pth")

In [None]:
from torch.utils.data import TensorDataset, DataLoader

criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)

test_dataset = TensorDataset(X_test, Y_test)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [None]:
from sklearn.metrics import classification_report   

model.eval()
test_loss = 0
val_loss = []
with torch.no_grad():
    Predit_Y = model(X_test.to(device))
    Predict_Y_Binary = (Predit_Y >= 0.5).int()
    test_loss = criterion(Predit_Y, Y_test.to(device))
    val_loss.append(test_loss.item())
    Y_test_np = Y_test.cpu().numpy()
    Predict_Y_Binary_Np = Predict_Y_Binary.cpu().numpy()
    
print(f"Validation Loss: {test_loss.item()}")

# Calculate classification report
report = classification_report(Y_test_np.flatten(), Predict_Y_Binary_Np.flatten())

# Extract F1 score from the report
f1_score_str = report.split('\n')[-2].split()[3]
f1_score = float(f1_score_str)

print("F1 Score:", f1_score)