In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import numpy as np
import re
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tqdm import tqdm
import pickle
import os
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [None]:
from google.colab import drive
drive.mount('/content/drive')

dataset_path = "/content/drive/MyDrive/Graduation-Project/Dataset/Dataset/MELD/"
meld_dataset = dataset_path+"MELD.Raw/"

Mounted at /content/drive


In [None]:
train = pd.read_csv(meld_dataset+'train_sent_emo.csv')
cv = pd.read_csv(meld_dataset+'dev_sent_emo.csv')
test = pd.read_csv(meld_dataset+'test_sent_emo.csv')

In [None]:
# Preprocess data
def preprocess_data(data):
    ps = PorterStemmer()
    stop_words = set(stopwords.words('english'))
    cleaned_data = []
    for sentence in data:
        review = re.sub('[^a-zA-Z]', ' ', sentence)
        review = review.lower()
        review = review.split()
        review = [ps.stem(word) for word in review if word not in stop_words]
        review = ' '.join(review)
        cleaned_data.append(review)
    return cleaned_data

In [None]:
# Load GloVe embeddings
def load_glove_embeddings(file_path):
    with open(file_path, 'rb') as f:
        glove_embeddings = pickle.load(f)
    return glove_embeddings

In [None]:
# Map words to GloVe vectors
def map_words_to_vectors(word_to_idx, glove_embeddings):
    embedding_matrix = np.zeros((len(word_to_idx) + 1, 300))  # Assuming GloVe vectors are of dimension 300
    for word, idx in word_to_idx.items():
        if word in glove_embeddings:
            embedding_matrix[idx] = glove_embeddings[word]
    return embedding_matrix


In [None]:
class BiLSTMWithGloVe(nn.Module):
    def __init__(self, embedding_matrix, hidden_size, num_layers, output_size, dropout=0.5):
        super(BiLSTMWithGloVe, self).__init__()
        self.embedding = nn.Embedding.from_pretrained(torch.tensor(embedding_matrix), freeze=True)
        self.embedding.float()  # Ensure embedding matrix is of float data type
        self.bilstm = nn.LSTM(embedding_matrix.shape[1], hidden_size, num_layers, bidirectional=True, dropout=dropout)
        self.fc1 = nn.Linear(hidden_size * 2, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        embedded = self.dropout(self.embedding(x).float())  # Cast input to float data type
        outputs, _ = self.bilstm(embedded)
        outputs = torch.cat((outputs[:, -1, :self.hidden_size], outputs[:, 0, self.hidden_size:]), dim=1)
        outputs = F.relu(self.fc1(outputs))
        outputs = self.fc2(outputs)
        return outputs


In [None]:
X_train, y_train = train['Utterance'], train['Emotion']
X_train = preprocess_data(X_train)
X_cv, y_cv = cv['Utterance'], cv['Emotion']
X_cv = preprocess_data(X_cv)
X_test, y_test = test['Utterance'], test['Emotion']
X_test = preprocess_data(X_test)

In [None]:
# Encode labels
label_encoder = LabelEncoder()
y_train = label_encoder.fit_transform(y_train)
y_cv = label_encoder.transform(y_cv)
y_test = label_encoder.transform(y_test)

In [None]:
# Create word-to-index mapping
all_words = ' '.join(X_train + X_cv + X_test).split()
word_to_idx = {word: i + 1 for i, word in enumerate(set(all_words))}
idx_to_word = {i + 1: word for i, word in enumerate(set(all_words))}
num_words = len(word_to_idx) + 1

In [None]:
# Load GloVe embeddings
glv_path = dataset_path + "Glove/glove_vectors.pkl"
glove_embeddings = load_glove_embeddings(glv_path)
glove_words = set(glove_embeddings.keys())

# Map words to GloVe vectors
embedding_matrix = map_words_to_vectors(word_to_idx, glove_embeddings)

In [None]:
class EmotionDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index):
        sentence = self.X[index]
        label = self.y[index]
        return torch.LongTensor([word_to_idx[word] for word in sentence.split()]), torch.LongTensor([label])


In [None]:
from torch.nn.utils.rnn import pad_sequence

def custom_collate_fn(batch):
    inputs = [item[0] for item in batch]
    labels = [item[1] for item in batch]
    padded_inputs = pad_sequence(inputs, batch_first=True, padding_value=0)
    padded_labels = torch.stack(labels)
    return padded_inputs, padded_labels


In [None]:
from torch.nn.utils.rnn import pad_sequence
# Create datasets and dataloaders
train_dataset = EmotionDataset(X_train, y_train)
cv_dataset = EmotionDataset(X_cv, y_cv)
test_dataset = EmotionDataset(X_test, y_test)

# Create dataloaders with collate_fn
train_loader = DataLoader(train_dataset, batch_size=512, shuffle=True, collate_fn=custom_collate_fn)
cv_loader = DataLoader(cv_dataset, batch_size=512, collate_fn=custom_collate_fn)
test_loader = DataLoader(test_dataset, batch_size=512, collate_fn=custom_collate_fn)


In [None]:
# Initialize model parameters
input_size = num_words
hidden_size = 300
num_layers = 2
output_size = len(label_encoder.classes_)
dropout = 0.5
num_epochs = 200

# Initialize model, loss function, and optimizer
model = BiLSTMWithGloVe(embedding_matrix, hidden_size, num_layers, output_size, dropout)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

In [None]:
# Directory to save models
model_dir = '/content/drive/MyDrive/Graduation-Project/Phase 2/Detection/Training Model/'
if not os.path.exists(model_dir):
    os.makedirs(model_dir)
# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels.squeeze(1))
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader)}')

    # Save model for each epoch
    torch.save(model.state_dict(), os.path.join(model_dir, f'epoch_{epoch + 1}.pt'))

RuntimeError: mat1 and mat2 must have the same dtype, but got Double and Float

In [None]:
# Evaluation on validation set
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in cv_loader:
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels.squeeze(1)).sum().item()
print(f'Accuracy on validation set: {100 * correct / total}%')

In [None]:
# Predictions on test set
predictions = []
true_labels = []
with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        predictions.extend(predicted.numpy())
        true_labels.extend(labels.squeeze(1).numpy())

# Decode labels
idx_to_label = {i: label for i, label in enumerate(label_encoder.classes_)}
predictions = [idx_to_label[pred] for pred in predictions]
true_labels = [idx_to_label[label] for label in true_labels]
print("Predictions:", predictions)
print("True labels:", true_labels)

In [None]:
# Example prediction
example_sentence = "I'm feeling happy today"
indexed_sentence = [word_to_idx[word] for word in example_sentence.split()]
tensor_sentence = torch.LongTensor(indexed_sentence).unsqueeze(1)
predicted_emotion = model(tensor_sentence)
predicted_emotion = torch.argmax(predicted_emotion).item()
print(f"The predicted emotion for '{example_sentence}' is '{idx_to_label[predicted_emotion]}'.")

In [None]:
torch.cuda.empty_cache()

In [None]:
torch.cuda.is_available()

False