### Importing Necessary Libraries

In [1]:
import json
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from sentence_transformers import SentenceTransformer
import warnings
import torch.optim as optim
warnings.filterwarnings("ignore")

  _torch_pytree._register_pytree_node(


### Loading Dataset

In [2]:
class EmotionDataset(Dataset):
    def __init__(self, json_file, model_name="all-MiniLM-L6-v2"):
        self.data = []
        self.model = SentenceTransformer(model_name)
        self.speaker_encoder = LabelEncoder()
        self.emotion_class_to_idx = {'neutral': 0, 'joy': 1, 'sadness': 2, 'anger': 3, 'fear': 4, 'disgust': 5, 'surprise': 6}

        with open(json_file, 'r') as f:
            data = json.load(f)
            for entry in data:
                self.data.append(entry)
        
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        entry = self.data[idx]
        speakers = torch.tensor(self.speaker_encoder.fit_transform(entry["speakers"]), dtype=torch.long)
        emotions = torch.tensor([self.emotion_class_to_idx[emotion] for emotion in entry["emotions"]], dtype=torch.long)
        utterance_embeddings = torch.tensor(self.model.encode(entry["utterances"]), dtype=torch.float)
        return speakers, emotions, utterance_embeddings


# Initialize the dataset and dataloader
model_name = 'all-mpnet-base-v2' # all-MiniLM-L6-v2
train_dataset = EmotionDataset("../Data/train_file.json", model_name=model_name)
val_dataset = EmotionDataset("../Data/val_file.json", model_name=model_name)
print('Length of train dataset:', len(train_dataset))
print('Length of val dataset:', len(val_dataset))

Length of train dataset: 6740
Length of val dataset: 843


In [3]:
import torch.nn as nn
def collate_fn(batch):
    speakers, emotions, utterance_embeddings = zip(*batch)
    speakers = nn.utils.rnn.pad_sequence(speakers, batch_first=True)
    emotions = nn.utils.rnn.pad_sequence(emotions, batch_first=True)
    utterance_embeddings = nn.utils.rnn.pad_sequence(utterance_embeddings, batch_first=True)        
    return speakers, emotions, utterance_embeddings

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
print('Length of train_loader:', len(train_loader))
print('Length of val_loader:', len(val_loader))

Length of train_loader: 211
Length of val_loader: 27


In [4]:
for batch in train_loader:
    speakers, emotions, utterance_embeddings = batch
    print("Batch Size:", len(speakers))

    print("Speakers Shape:", speakers.shape)
    print("Emotions Shape:", emotions.shape)
    print("Utterance Embeddings Shape:", utterance_embeddings.shape)

    print("\nExample:")
    print("Speakers:", speakers[0])
    print("Emotions:", emotions[0])
    print("Utterance Embeddings:", utterance_embeddings[0])
    break  

Batch Size: 32
Speakers Shape: torch.Size([32, 17])
Emotions Shape: torch.Size([32, 17])
Utterance Embeddings Shape: torch.Size([32, 17, 768])

Example:
Speakers: tensor([1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
Emotions: tensor([0, 6, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
Utterance Embeddings: tensor([[ 0.0302,  0.0209, -0.0373,  ...,  0.0074, -0.0501, -0.0263],
        [-0.0238,  0.0645, -0.0242,  ...,  0.0659, -0.0023, -0.0005],
        [-0.0097,  0.0827, -0.0384,  ...,  0.0543, -0.0345, -0.0003],
        ...,
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000]])


In [8]:
class BiLIST(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(BiLIST, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_size*2, output_size)
    
    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        last_output = lstm_out[:, -1, :]  
        out = self.fc(last_output)
        return out

# Initialize your model
input_size = 768  # Assuming the size of your utterance embeddings
hidden_size = 128  # Adjust as needed
output_size = 7  # Number of emotion classes
model = BiLIST(input_size, hidden_size, output_size)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(3):
    model.train()
    running_loss = 0.0
    for batch in train_loader:
        utterance_embeddings = batch[2]
        emotions = batch[1]
        emotions = emotions.view(-1)
        optimizer.zero_grad()

        # Forward pass
        outputs = model(utterance_embeddings)
        outputs = outputs.view(-1, output_size)
        emotions = emotions[:outputs.size(0)]
        
        loss = criterion(outputs, emotions)
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}")


In [None]:
# import torch
# import torch.nn as nn
# import torch.optim as optim
# from sklearn.metrics import accuracy_score, f1_score

# class BiLSTMClassifier(nn.Module):
#     def __init__(self, embed_dim, hidden_size, output_size):
#         super(BiLSTMClassifier, self).__init__()
#         self.embed_dim = embed_dim
#         self.hidden_size = hidden_size
#         self.output_size = output_size
        
#         self.lstm = nn.LSTM(input_size=embed_dim, hidden_size=hidden_size, batch_first=True, bidirectional=True)
#         self.fc = nn.Linear(hidden_size * 2, output_size)  # *2 for bidirectional
#         self.softmax = nn.Softmax(dim=2)

#     def forward(self, utterance_embeddings):
#         lstm_output, _ = self.lstm(utterance_embeddings)
#         lstm_output = self.fc(lstm_output)
#         output = self.softmax(lstm_output)
#         return output

# def train_epoch(model, dataloader, criterion, optimizer, device):
#     model.train()
#     model = model.to(device)
#     running_loss = 0.0
#     running_corrects = 0
#     running_total = 0
#     y_true = []
#     y_pred = []
    
#     for i, (speakers, targets, utterance_embeddings) in enumerate(dataloader, 1):
#         speakers = speakers.to(device)
#         utterance_embeddings = utterance_embeddings.to(device)
#         targets = targets.to(device)
#         optimizer.zero_grad()
#         outputs = model(utterance_embeddings)
#         loss = criterion(outputs.transpose(1, 2), targets)
#         _, preds = torch.max(outputs, 2)
#         running_loss += loss.item() * speakers.size(0)
#         running_corrects += torch.sum(preds == targets).item()
#         running_total += speakers.size(0)
#         loss.backward()
#         optimizer.step()
        
#         if i % 20 == 0:
#             target_expanded = targets.view(-1).cpu().numpy()
#             preds_expanded = preds.view(-1).cpu().numpy()
#             f1 = f1_score(target_expanded, preds_expanded, average='macro')
#             accuracy = accuracy_score(target_expanded, preds_expanded)
#             print(f"Batch {i}/{len(dataloader)} Loss: {loss.item():.4f}, Accuracy: {accuracy:.4f}, F1 Score: {f1:.4f}")
#         y_true.extend(targets.view(-1).cpu().numpy())
#         y_pred.extend(preds.view(-1).cpu().numpy())
    
#     epoch_loss = running_loss / len(dataloader.dataset)
#     epoch_acc = accuracy_score(y_true, y_pred)
#     f1 = f1_score(y_true, y_pred, average='macro')
#     return epoch_loss, epoch_acc, f1


# def validate_model(model, dataloader, criterion, device):
#     model.eval()
#     model = model.to(device)
#     running_loss = 0.0
#     running_corrects = 0
#     running_total = 0
#     y_true = []
#     y_pred = []
#     with torch.no_grad():
#         for speakers, targets, utterance_embeddings in dataloader:
#             speakers = speakers.to(device)
#             utterance_embeddings = utterance_embeddings.to(device)
#             targets = targets.to(device)
#             outputs = model(utterance_embeddings)
#             loss = criterion(outputs.transpose(1, 2), targets)
#             _, preds = torch.max(outputs, 2)
#             running_loss += loss.item() * speakers.size(0)
#             running_corrects += torch.sum(preds == targets).item()
#             running_total += speakers.size(0)
#             y_true.extend(targets.view(-1).cpu().numpy())
#             y_pred.extend(preds.view(-1).cpu().numpy())
#     val_loss = running_loss / len(dataloader.dataset)
#     val_acc = accuracy_score(y_true, y_pred)
#     val_f1 = f1_score(y_true, y_pred, average='macro')
#     return val_loss, val_acc, val_f1

In [15]:
# import torch.optim as optim

# DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# NUM_EPOCH = 10
# EMBED_DIM = 768 
# HIDDEN_SIZE = 64
# OUTPUT_SIZE = 2

# model = BiLSTMClassifier(embed_dim=EMBED_DIM, hidden_size=HIDDEN_SIZE, output_size=OUTPUT_SIZE)
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=0.1)
# # print(DEVICE)

In [16]:
# NUM_EPOCH = 3
# for epoch in range(NUM_EPOCH):
#     print('-------------------------------------------------')
#     print(f"Epoch {epoch+1}/{NUM_EPOCH}")
#     train_loss, train_acc, train_f1 = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
#     print(f"==> Train Loss: {train_loss:.4f} Accuracy: {train_acc:.4f} F1 Score: {train_f1:.4f}")
#     val_loss, val_acc, val_f1 = validate_model(model, val_loader, criterion, DEVICE)
#     print(f"==> Validation Loss: {val_loss:.4f} Accuracy: {val_acc:.4f} F1 Score: {val_f1:.4f}")

-------------------------------------------------
Epoch 1/3


IndexError: Target 3 is out of bounds.

In [None]:
# model = model.to('cpu')
# torch.save(model.state_dict(), "M1.pt")