In [35]:
import torch
import pickle
import numpy as np
from collections import Counter
import matplotlib.pyplot as plt
import random
import matplotlib.cm as cm
from wordcloud import WordCloud
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import r2_score
import torch.nn.functional as F
from tqdm import tqdm

In [2]:
device = torch.device("cuda")

In [3]:
from google.colab import drive
drive.mount('/content/My_Drive')

Mounted at /content/My_Drive


In [4]:
%cd "/content/My_Drive/MyDrive/2025 Spring/11777/Datasets"

/content/My_Drive/MyDrive/2025 Spring/11777/Datasets


### Load Labels

In [11]:
train_label = torch.from_numpy(np.load("label_train.npy"))
train_label = train_label[:, 0, 0]

In [12]:
test_label = torch.from_numpy(np.load("label_test.npy"))
test_label = test_label[:, 0, 0]

### Create DataLoader

In [13]:
class SentimentDataset(Dataset):
    def __init__(self, embeddings, labels):
        self.embeddings = torch.tensor(embeddings, dtype=torch.float32)  # Convert to Tensor
        self.labels = torch.tensor(labels, dtype=torch.float32)  # Convert to Tensor

    def __len__(self):
        return len(self.embeddings)

    def __getitem__(self, idx):
        return self.embeddings[idx], self.labels[idx]  # Return (embedding, sentiment_label)

### LSTM

In [14]:
class SentimentLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim, dropout=0.2):
        super(SentimentLSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        lstm_out, _ = self.lstm(x)  # Output shape: (batch_size, seq_length, hidden_dim)
        final_out = lstm_out[:, -1, :]  # Take last LSTM output
        final_out = self.dropout(final_out)
        return self.fc(final_out)  # Output sentiment intensity

In [33]:
def train_model(model, train_loader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in tqdm(range(num_epochs)):
        epoch_loss = 0
        for inputs, labels in train_loader:
            inputs, labels = inputs.float(), labels.float()

            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs).squeeze()

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss / len(train_loader)}")

def predict(model, input_embedding, device):
    model.to(device)
    model.eval()
    with torch.no_grad():
        input_embedding = input_embedding.to(device).float()
        sentiment_score = model(input_embedding).squeeze()
    return sentiment_score.cpu()

### Evaluation Metrics

In [18]:
def calculate_mae(predictions, test_label):
  l1_loss_fn = nn.L1Loss()
  loss = l1_loss_fn(predictions, test_label)
  return loss.item()

In [19]:
def calculate_acc(predictions, test_label):
  predicted_labels = (predictions >= 0).long()  # Convert to 0/1 classes
  true_labels = (test_label >= 0).long()  # Convert true values to 0/1
  accuracy = (predicted_labels == true_labels).float().mean().item()
  return accuracy

In [20]:
def calculate_mse(predictions, true_labels):
  mse_loss = nn.MSELoss()
  loss = mse_loss(predictions, true_labels)
  print(f"MSE Loss: {loss.item()}")

In [42]:
def calculate_pearson_corr(predictions, true_labels):
  predictions, true_labels = predictions.float(), true_labels.float()
  # Compute mean values
  pred_mean = predictions.mean()
  true_mean = true_labels.mean()

  # Compute Covariance
  numerator = torch.sum((predictions - pred_mean) * (true_labels - true_mean))

  # Compute Standard Deviation
  denominator = torch.sqrt(torch.sum((predictions - pred_mean) ** 2)) * torch.sqrt(torch.sum((true_labels - true_mean) ** 2))

  corr = (numerator / denominator).item()

  return corr

In [22]:
def compute_accuracy_7(predictions, true_labels):
    batch_size = predictions.shape[0]

    errors = torch.abs(predictions - true_labels.view(-1, 1))  # Shape: (batch_size, num_samples)

    # Get indices of the 7 closest predictions
    top7_indices = torch.topk(errors, k=7, dim=1, largest=False).indices

    # Check if the ground truth value is among the closest 7 predictions
    correct = torch.any(top7_indices == 0, dim=1).float()
    return correct.mean().item()

### Text Unimodal

In [25]:
train_data = "text_embeddings_train.npy"
train = torch.from_numpy(np.load(train_data))
test_data = "text_embeddings_test.npy"
test = torch.from_numpy(np.load(test_data))
train_dataset = SentimentDataset(train, train_label)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)

  self.embeddings = torch.tensor(embeddings, dtype=torch.float32)  # Convert to Tensor
  self.labels = torch.tensor(labels, dtype=torch.float32)  # Convert to Tensor


In [26]:
input_dim = train.shape[-1]
hidden_dim = 300
num_layers = 2
output_dim = 1
batch_size = 32
num_epochs = 10
lr = 0.001
model = SentimentLSTM(input_dim, hidden_dim, num_layers, output_dim)
model.to(device)
criterion = nn.L1Loss()
optimizer = optim.Adam(model.parameters(), lr=lr)

train_model(model, train_loader, criterion, optimizer, num_epochs)

Epoch 1/10, Loss: 0.7274075138825493
Epoch 2/10, Loss: 0.6517192131735807
Epoch 3/10, Loss: 0.6213554716040244
Epoch 4/10, Loss: 0.5951123219530177
Epoch 5/10, Loss: 0.5697418814652586
Epoch 6/10, Loss: 0.5420108481279324
Epoch 7/10, Loss: 0.5128574528222914
Epoch 8/10, Loss: 0.4783081463346743
Epoch 9/10, Loss: 0.44787338029031887
Epoch 10/10, Loss: 0.41729232362688407


**Evaluation**

In [27]:
predictions = predict(model, test, device)
mae = calculate_mae(predictions, test_label)
print(f"MAE: {mae}")
pc = calculate_pearson_corr(predictions, test_label)
print(f"Pearson Corr: {pc}")
r2 = r2_score(test_label.cpu().numpy(), predictions.cpu().numpy())
print(f"R2 Score: {r2}")

MAE: 0.6220446825027466
Pearson Corr: 0.6760468482971191
R2 Score: 0.42650842666625977


### Multimodal

In [28]:
def concat_modality(src1, src2):
  train1 = src1
  train2 = src2
  train1 = torch.from_numpy(np.load(train1))
  train2 = torch.from_numpy(np.load(train2))

  train1 = train1.permute(0, 2, 1) #(batch_size, embed_dim, seq_len)
  train2 = train2.permute(0, 2, 1)

  train1_pooled = F.adaptive_avg_pool1d(train1, output_size=5)
  train2_pooled = F.adaptive_avg_pool1d(train2, output_size=5)

  new_train = torch.cat((train1_pooled, train2_pooled), dim=1)
  new_train = new_train.permute(0, 2, 1) #(batch_size, seq_len, embed_dim)
  print("New data shape:", new_train.shape)
  return new_train


In [38]:
train_visual = "visual_face_train.npy"
train_text = "text_embeddings_train.npy"
new_train = concat_modality(train_visual, train_text)
train_dataset = SentimentDataset(new_train, train_label)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)

New data shape: torch.Size([16327, 5, 335])


  self.embeddings = torch.tensor(embeddings, dtype=torch.float32)  # Convert to Tensor
  self.labels = torch.tensor(labels, dtype=torch.float32)  # Convert to Tensor


In [39]:
input_dim = new_train.shape[-1]
hidden_dim = 100
num_layers = 2
output_dim = 1
batch_size = 32
num_epochs = 10
lr = 0.01
model = SentimentLSTM(input_dim, hidden_dim, num_layers, output_dim)
model.to(device)
criterion = nn.L1Loss()
optimizer = optim.Adam(model.parameters(), lr=lr)

train_model(model, train_loader, criterion, optimizer, num_epochs)

 10%|█         | 1/10 [00:02<00:23,  2.62s/it]

Epoch 1/10, Loss: 0.7824766578739637


 20%|██        | 2/10 [00:04<00:19,  2.47s/it]

Epoch 2/10, Loss: 0.7286703557417584


 30%|███       | 3/10 [00:07<00:16,  2.42s/it]

Epoch 3/10, Loss: 0.7141564553731108


 40%|████      | 4/10 [00:09<00:14,  2.39s/it]

Epoch 4/10, Loss: 0.6942280932765185


 50%|█████     | 5/10 [00:11<00:11,  2.35s/it]

Epoch 5/10, Loss: 0.6871814873003913


 60%|██████    | 6/10 [00:14<00:09,  2.40s/it]

Epoch 6/10, Loss: 0.6762212981449881


 70%|███████   | 7/10 [00:16<00:07,  2.41s/it]

Epoch 7/10, Loss: 0.6688413876610027


 80%|████████  | 8/10 [00:19<00:04,  2.38s/it]

Epoch 8/10, Loss: 0.6629223397216685


 90%|█████████ | 9/10 [00:21<00:02,  2.39s/it]

Epoch 9/10, Loss: 0.6581406690136561


100%|██████████| 10/10 [00:24<00:00,  2.41s/it]

Epoch 10/10, Loss: 0.6573706014982175





**Evaluation**

In [40]:
test_visual = "visual_face_test.npy"
test_text = "text_embeddings_test.npy"
new_test = concat_modality(test_visual, test_text)

New data shape: torch.Size([4662, 5, 335])


In [41]:
predictions = predict(model, new_test, device)
mae = calculate_mae(predictions, test_label)
print(f"MAE: {mae}")
pc = calculate_pearson_corr(predictions, test_label)
print(f"Pearson Corr: {pc}")
r2 = r2_score(test_label.cpu().numpy(), predictions.cpu().numpy())
print(f"R2 Score: {r2}")

MAE: 0.6804166436195374
Pearson Corr: 0.5921444892883301
R2 Score: 0.30067217350006104


### Load Tensors(Optional)

In [None]:
objects = []
with open('tensors.pkl', 'rb') as f:
    while True:
        try:
            obj = pickle.load(f)
            objects.append(obj)
            break
        except EOFError:
            break
train_all = objects[0][0][0]
val_all = objects[0][0][1]
test_all = objects[0][0][2]

In [None]:
np.save('text_embeddings_val.npy', val_all['glove_vectors'])
np.save('text_embeddings_train.npy', train_all['glove_vectors'])
np.save('text_embeddings_test.npy', test_all['glove_vectors'])

In [None]:
np.save('label_val.npy', val_all['All Labels'])
np.save('label_train.npy', train_all['All Labels'])
np.save('label_test.npy', test_all['All Labels'])

In [None]:
np.save('audios_embeddings_train.npy', train_all['COAVAREP'])
np.save('audios_embeddings_val.npy', val_all['COAVAREP'])
np.save('audios_embeddings_test.npy', test_all['COAVAREP'])

In [None]:
np.save('visual_face_train.npy', train_all['FACET 4.2'])
np.save('visual_face_val.npy', val_all['FACET 4.2'])
np.save('visual_face_test.npy', test_all['FACET 4.2'])

In [None]:
np.save('visual_op_train.npy', train_all['OpenFace_2.0'])
np.save('visual_op_val.npy', val_all['OpenFace_2.0'])
np.save('visual_op_test.npy', test_all['OpenFace_2.0'])