In [None]:
from transformers import BertTokenizer, BertModel
import torch
import json
import torch
import torch.nn as nn
import torch.optim as optim

In [None]:
with open('/kaggle/input/train-json/Subtask_1_train.json', 'r') as train_file:
    train_data = json.load(train_file)

for i in train_data:
    print(i)
    break

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

temp_texts = [item['conversation'] for item in train_data]
temp_labels = [item['conversation'] for item in train_data]

train_texts = []
train_labels = []

for x in temp_texts:
    temp_t = []
    temp_l = []
    for y in x:
        temp_t.append(y['text'])
        temp_l.append(y['emotion'])
    train_texts.append(temp_t)
    train_labels.append(temp_l)

for i in train_texts:
    print(i)
    break
for i in train_labels:
    print(i)
    break

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
bert_model = BertModel.from_pretrained('bert-base-cased')
bert_model = bert_model.to(device)

In [None]:
labels_dict = {'anger'  :1,
               'joy'    :2,
               'fear'   :3,
               'disgust':4,
               'neutral':5,
               'surprise':6,
               'sadness':7
              }

inverse_dict = {}
for i in labels_dict:
    inverse_dict[labels_dict[i]] = i
inverse_dict[0] = 'neutral'

In [None]:
train_text = train_texts[:1100]
val_text = train_texts[1100:]
train_label = train_labels[:1100]
val_label = train_labels[1100:]
print(len(train_text))
print(len(val_text))

In [None]:
converted_inputs = []
converted_labels = []
maxs = 0
for i in train_label:
    temp = []
    for j in i:
        temp.append(labels_dict[j])
    if(len(temp)>maxs):
        maxs = len(temp)
    converted_labels.append(temp)
print(maxs)

In [None]:
for i, j in enumerate(train_text):
    if(i%100==0):
        print(i)
    temp_len = len(train_text[i])
    if(temp_len<35):
        for lol in range(35-temp_len):
            train_text[i].append("")
            converted_labels[i].append(0)
    inputs = tokenizer(train_text[i], return_tensors="pt", padding='max_length', truncation=True, max_length=512)
    inputs = {key: val.to(device) for key, val in inputs.items()}
    with torch.no_grad():
        outputs = bert_model(**inputs)
    converted_inputs.append(outputs[0].mean(dim=1))

In [None]:
converted_val_inputs = []
converted_val_labels = []

for i in val_label:
    temp = []
    for j in i:
        temp.append(labels_dict[j])
    converted_val_labels.append(temp)

for i, j in enumerate(val_text):
    temp_len = len(val_text[i])
    if(temp_len<35):
        for lol in range(35-temp_len):
            val_text[i].append("")
            converted_val_labels[i].append(0)

    inputs = tokenizer(val_text[i], return_tensors="pt", padding='max_length', truncation=True, max_length=512)
    inputs = {key: val.to(device) for key, val in inputs.items()}
    with torch.no_grad():
        outputs = bert_model(**inputs)
    converted_val_inputs.append(outputs[[0].mean(dim=1)])

In [None]:
print(len(converted_val_labels))
print(len(converted_val_inputs))
print(len(converted_inputs))
print(len(converted_labels))

In [None]:
# import pickle

# file_path = "converted_inputs.pkl"
# with open(file_path, 'wb') as file:
#     pickle.dump(converted_inputs, file)
    
# file_path = "converted_labels.pkl"
# with open(file_path, 'wb') as file:
#     pickle.dump(converted_labels, file)

In [None]:
# with open("converted_inputs.pkl", 'rb') as file:
#     converted_inputs = pickle.load(file)
# with open("converted_labels.pkl", 'rb') as file:
#     converted_labels = pickle.load(file)

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.lstm1 = nn.LSTM(hidden_size, hidden_size//2)
        self.fc2 = nn.Linear(hidden_size//2, hidden_size//4)
        self.fc3 = nn.Linear(hidden_size//4, output_size)

    def forward(self, x):
        out, _ = self.lstm(x)
        out, _ = self.lstm1(out)
        out = self.fc2(out)
        out = torch.relu(out)
        out = self.fc3(out)
        return out

input_size = 768
hidden_size = 512
output_size = 8

model = LSTMModel(input_size, hidden_size, output_size)
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
from sklearn.metrics import f1_score
num_epochs = 10
max_f1 = 0

for epoch in range(num_epochs):

    model.train()
    running_loss = 0.0
    STEPS = 0
    for i, j in enumerate(train_text):

        inputs = converted_inputs[i].to(device)
        labels = torch.tensor(converted_labels[i]).to(device)
        output = model(inputs.to(device))
        loss = criterion(output,labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        STEPS += 1

    model.eval()
    epoch_val_loss = 0.0
    VALSTEPS=0
    final_label = []
    final_predicted = []
    
    for i in range(len(val_text)):
        VALSTEPS+=1
        inputs = converted_val_inputs[i]
        labels = converted_val_labels[i]
        with torch.no_grad():
            output = model(inputs.to(device))
        probabilities = torch.nn.functional.softmax(output, dim=0)
        predicted_class = torch.argmax(probabilities, dim=1)
        final_label.extend(labels)
        final_predicted.extend(predicted_class.tolist())
    
    print('EPOCH: ' + str(epoch))
    unique_classes = [i for i in range(8)]

    f1_scores_per_class = {}
    for clss in unique_classes:
        true_labels_cls = [1 if label == clss else 0 for label in final_label]
        predicted_labels_cls = [1 if label == clss else 0 for label in final_predicted]
        f1_cls = f1_score(true_labels_cls, predicted_labels_cls)
        f1_scores_per_class[clss] = f1_cls

    for clss, f1_cls in f1_scores_per_class.items():
        print(f"Class {clss}: F1 Score = {f1_cls:.4f}")
    f1 = f1_score(final_label, final_predicted, average='macro')
    print("Average F1 Score = ",f1)
    if(f1>max_f1):
        max_f1 = f1
        torch.save(model.state_dict(), 'lstm.pth')
        

In [None]:
model.eval()
text = ['Alright , so I am back in high school , I am standing in the middle of the cafeteria , and I realize I am totally naked .', 'Oh , yeah . Had that dream .', 'Then I look down , and I realize there is a phone ... there .', 'Instead of ... ?', 'That is right .', 'Never had that dream .', 'No .', 'All of a sudden , the phone starts to ring .']
temp_len = len(text)
if(temp_len<35):
    for lol in range(35-temp_len):
        text.append("")

inputs = tokenizer(text, return_tensors="pt", padding='max_length', truncation=True, max_length=512)
inputs = {key: val.to(device) for key, val in inputs.items()}
with torch.no_grad():
    outputs = bert_model(**inputs)

output = model(outputs[0].mean(dim=1))
final_result = []
for i in range(temp_len):
    x = output[i]
    probabilities = torch.nn.functional.softmax(x, dim=0)
    predicted_class = torch.argmax(probabilities).item()
    final_result.append(inverse_dict[predicted_class])
print(final_result)