### Prerequisite Packages

In [1]:
import sys
import os
import pandas as pd
import numpy as np
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, WeightedRandomSampler
from torcheval.metrics import BinaryPrecision, BinaryRecall, BinaryF1Score
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import accuracy_score

In [2]:
sys.path.append('../../')

from modules.cross_attentionb import CrossAttentionB
from modules.dataloader import load_npy_files
from modules.classifier import DenseLayer, BCELoss, CustomLoss, BCEWithLogits
from modules.linear_transformation import LinearTransformations

### Data Loading

In [3]:
class MultimodalDataset(Dataset):
    def __init__(self, id_label_df, text_features, audio_features, video_features, lower_bound=35, upper_bound=197):
        self.id_label_df = id_label_df
        self.lower_bound = lower_bound
        self.upper_bound = upper_bound
        
        # Convert feature lists to dictionaries for fast lookup
        self.text_features = {os.path.basename(file).split('.')[0]: tensor for file, tensor in text_features}
        self.audio_features = {os.path.basename(file).split('_')[1].split('.')[0]: tensor for file, tensor in audio_features}
        self.video_features = {os.path.basename(file).split('_')[0]: tensor for file, tensor in video_features}

        # Find and remove outliers
        self.id_label_df, self.text_features, self.audio_features, self.video_features = self.remove_outliers()

        # List to store missing files
        self.missing_files = []

        # Filter out entries with missing files
        self.valid_files = self._filter_valid_files()

    def remove_outliers(self):
        # Identify outliers based on video sequence length
        outlier_ids = []
        
        for imdbid, video_tensor in self.video_features.items():
            seq_length = video_tensor.size(0)
            
            # Check if the sequence length is an outlier
            if seq_length < self.lower_bound or seq_length > self.upper_bound:
                outlier_ids.append(imdbid)
        
        # Filter out outliers from the DataFrame and features
        self.id_label_df = self.id_label_df[~self.id_label_df['IMDBid'].isin(outlier_ids)].reset_index(drop=True)
        self.text_features = {k: v for k, v in self.text_features.items() if k not in outlier_ids}
        self.audio_features = {k: v for k, v in self.audio_features.items() if k not in outlier_ids}
        self.video_features = {k: v for k, v in self.video_features.items() if k not in outlier_ids}

        return self.id_label_df, self.text_features, self.audio_features, self.video_features

    def _filter_valid_files(self):
        valid_indices = []
        missing_files = []

        for idx in range(len(self.id_label_df)):
            imdbid = self.id_label_df.iloc[idx]['IMDBid']

            # Check if the IMDBid exists in each modality's features
            if imdbid in self.text_features and imdbid in self.audio_features and imdbid in self.video_features:
                valid_indices.append(idx)
            else:
                missing_files.append({'IMDBid': imdbid})

        # Filter id_label_df to only include valid rows
        self.id_label_df = self.id_label_df.iloc[valid_indices].reset_index(drop=True)
        self.missing_files = missing_files
        
        # Update valid_indices to reflect the new indices after resetting
        valid_indices = list(range(len(self.id_label_df)))

        # Return valid indices
        return valid_indices

    def __len__(self):
        return len(self.valid_files)

    def __getitem__(self, idx):
        # Get the original index from the filtered valid files
        original_idx = self.valid_files[idx]
        imdbid = self.id_label_df.iloc[original_idx]['IMDBid']
        label = self.id_label_df.iloc[original_idx]['Label']

        # Retrieve data from the loaded features
        text_data = self.text_features.get(imdbid, torch.zeros((1024,)))
        audio_data = self.audio_features.get(imdbid, torch.zeros((1, 197, 768)))
        video_data = self.video_features.get(imdbid, torch.zeros((95, 768)))
        
        # Define label mapping
        label_map = {'red': 1, 'green': 0} 
        
        # Convert labels to tensor using label_map
        try:
            label_data = torch.tensor([label_map[label]], dtype=torch.float32)
        except KeyError as e:
            print(f"Error: Label '{e}' not found in label_map.")
            raise

        return imdbid, text_data, audio_data, video_data, label_data

In [4]:
def collate_fn(batch):
    # Unpack batch elements
    imdbids, text_data, audio_data, video_data, label_data = zip(*batch)

    # Convert lists to tensors
    text_data = torch.stack(text_data)
    audio_data = torch.stack(audio_data)

    # Padding for video data
    # Determine maximum length of video sequences in the batch
    max_length = 197

    # Pad video sequences to the maximum length
    video_data_padded = torch.stack([
        F.pad(v, (0, 0, 0, max_length - v.size(0)), "constant", 0)
        for v in video_data
    ])

    # Convert labels to tensor and ensure the shape [batch_size, 1]
    label_data = torch.stack(label_data)  # Convert list of tensors to a single tensor

    return imdbids, text_data, audio_data, video_data_padded, label_data

In [5]:
# Load the labels DataFrame
id_label_df = pd.read_excel('../../misc/MM-Trailer_dataset.xlsx')

# Define the directories
text_features_dir = '/Users/david/Documents/THESIS/DATA/TEXT'
audio_features_dir = '/Users/david/Documents/THESIS/DATA/AUDIO'
video_features_dir = '/Users/david/Documents/THESIS/DATA/VIDEO'

# Load the feature vectors from each directory
text_features = load_npy_files(text_features_dir)
audio_features = load_npy_files(audio_features_dir)
video_features = load_npy_files(video_features_dir)

print(f"Number of text feature vectors loaded: {len(text_features)}")
print(f"Number of audio feature vectors loaded: {len(audio_features)}")
print(f"Number of video feature vectors loaded: {len(video_features)}")

# Drop unnecessary columns
id_label_df = id_label_df.drop(columns=['Movie Title', 'URL'])

# Create dataset with outliers removed
full_dataset = MultimodalDataset(id_label_df, text_features, audio_features, video_features)

# perform train-test split on the filtered DataFrame
train_df, val_test_df = train_test_split(
    full_dataset.id_label_df, test_size=0.3, random_state=42, stratify=full_dataset.id_label_df['Label'])

# Further splitting remaining set into validation and test sets
val_df, test_df = train_test_split(
    val_test_df, test_size=0.5, random_state=42, stratify=val_test_df['Label'])

print("Train label distribution:", train_df['Label'].value_counts())
print("Validation label distribution:", val_df['Label'].value_counts())
print("Test label distribution:", test_df['Label'].value_counts())

# Create datasets based on these splits
train_dataset = MultimodalDataset(train_df, text_features, audio_features, video_features)
val_dataset = MultimodalDataset(val_df, text_features, audio_features, video_features)
test_dataset = MultimodalDataset(test_df, text_features, audio_features, video_features)

# Create DataLoaders
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0, collate_fn=collate_fn)
val_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=True, num_workers=0, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=0, collate_fn=collate_fn)

# Function to calculate and print the size of each DataLoader
def print_dataloader_sizes(dataloader, name):
    total_samples = len(dataloader.dataset)  # Get the size of the dataset
    num_batches = len(dataloader)  # Get the number of batches
    print(f"{name} DataLoader: Total Samples = {total_samples}, Number of Batches = {num_batches}")

# Print sizes of each DataLoader
print_dataloader_sizes(train_dataloader, "Train")
print_dataloader_sizes(val_dataloader, "Validation")
print_dataloader_sizes(test_dataloader, "Test")


Number of text feature vectors loaded: 1353
Number of audio feature vectors loaded: 1353
Number of video feature vectors loaded: 1353
Train label distribution: Label
green    693
red      234
Name: count, dtype: int64
Validation label distribution: Label
green    149
red       50
Name: count, dtype: int64
Test label distribution: Label
green    149
red       50
Name: count, dtype: int64
Train DataLoader: Total Samples = 927, Number of Batches = 29
Validation DataLoader: Total Samples = 199, Number of Batches = 13
Test DataLoader: Total Samples = 199, Number of Batches = 13


### Inspecting the Data

### Important Functions for Indiv Modality Check

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from sklearn.metrics import recall_score, precision_score, f1_score


### TEXT ONLY

In [7]:
class TextModel(nn.Module):
    def __init__(self, input_size=1024):
        super(TextModel, self).__init__()
        self.fc1 = nn.Linear(input_size, 512)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(512, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [8]:
def train_text_model(model, dataloader, criterion, optimizer, device='cpu'):
    model.train()
    total_loss = 0

    for imdbids, text_features, _, _, labels in dataloader:  # Only use text data
            #text_features, labels = text_features.to(device), labels.to(device).view(-1)

            optimizer.zero_grad()

            outputs = model(text_features)

            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
        
    return avg_loss  # Optionally, return the last average loss if needed


In [9]:
def evaluate_text_model(model, dataloader, criterion, device='cpu'):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for imdbids, text_features, _, _, labels in dataloader:  # Only use text data
            #text_features, labels = text_features.to(device), labels.to(device).view(-1)
            outputs = model(text_features)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            outputs = torch.sigmoid(outputs)

            all_preds.extend((outputs > 0.5).int().tolist())
            all_labels.extend(labels.int().tolist())

    # Calculate metrics
    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    avg_loss = total_loss / len(dataloader)

    print(f"Text Model Evaluation - Loss: {avg_loss:.4f}, Recall: {recall:.4f}, Precision: {precision:.4f}, F1: {f1:.4f}")

    return avg_loss, precision, recall, f1


In [10]:
def test_text_model(model, dataloader, criterion, device='cpu'):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for imdbids, text_features, _, _, labels in dataloader:  # Only use text data
            #text_features, labels = text_features.to(device), labels.to(device).view(-1)
            outputs = model(text_features)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            outputs = torch.sigmoid(outputs)

            all_preds.extend((outputs > 0.5).int().tolist())
            all_labels.extend(labels.int().tolist())

    # Calculate metrics
    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    avg_loss = total_loss / len(dataloader)
    
    print(f"Text Model Test - Loss: {avg_loss:.4f}, Test Recall: {recall:.4f}, Test Precision: {precision:.4f}, Test F1: {f1:.4f}")

    return avg_loss, precision, recall, f1


In [11]:
if __name__ == "__main__":
    torch.manual_seed(42)

    # Define models for each modality
    text_model = TextModel()

    # Define the criterion
    criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(2.94))

    for name, param in text_model.named_parameters():
        if param.grad is None:
            print("model:", "No gradient for:", name)
            
    # Train each modality
    optimizer = optim.Adam(text_model.parameters(), lr=0.0001)
    num_epochs = 10

    print("\nTraining Text Model...")
    for epoch in range(num_epochs):
        print("-" * 40)
        print(f"Epoch {epoch + 1}/{num_epochs}")

        # Ensure you have a dataloader that yields inputs and targets
        train_loss = train_text_model(text_model, train_dataloader, criterion, optimizer, device='cpu')

        # Validate step
        val_loss, precision, recall, f1 = evaluate_text_model(text_model, val_dataloader, criterion, device='cpu')

        print(f"Training Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

    # Testing the model
    print("-" * 40)
    print("Testing the model on the test set...")
    test_loss, test_precision, test_recall, test_f1_score = test_text_model(text_model, test_dataloader, criterion, device='cpu')


model: No gradient for: fc1.weight
model: No gradient for: fc1.bias
model: No gradient for: fc2.weight
model: No gradient for: fc2.bias

Training Text Model...
----------------------------------------
Epoch 1/10
Text Model Evaluation - Loss: 0.9480, Recall: 0.8600, Precision: 0.3772, F1: 0.5244
Training Loss: 0.9990, Validation Loss: 0.9480
----------------------------------------
Epoch 2/10
Text Model Evaluation - Loss: 0.8899, Recall: 0.8000, Precision: 0.4706, F1: 0.5926
Training Loss: 0.9052, Validation Loss: 0.8899
----------------------------------------
Epoch 3/10
Text Model Evaluation - Loss: 0.8552, Recall: 0.6800, Precision: 0.5484, F1: 0.6071
Training Loss: 0.8376, Validation Loss: 0.8552
----------------------------------------
Epoch 4/10
Text Model Evaluation - Loss: 0.8084, Recall: 0.7600, Precision: 0.4935, F1: 0.5984
Training Loss: 0.7812, Validation Loss: 0.8084
----------------------------------------
Epoch 5/10
Text Model Evaluation - Loss: 0.8037, Recall: 0.6600, Pr

### AUDIO MODEL ONLY


In [12]:
import torch
import torch.nn as nn

class AudioModel(nn.Module):
    def __init__(self):
        super(AudioModel, self).__init__()
        self.fc1 = nn.Linear(768, 512)  # Input size should match the last token dimension
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(512, 1)    # Output layer for binary classification

    def forward(self, x):
        # Assuming x has shape [batch_size, 1, 197, 768]

        # Remove the extra dimension
        x = x.squeeze(1)  # Shape should now be [batch_size, 197, 768]

        # Select only the last token
        x = x[:, -1, :]  # Shape should now be [batch_size, 768]
        
        # First fully connected layer
        x = self.relu(self.fc1(x))  # Shape should be [batch_size, 512]

        # Final output layer
        x = self.fc2(x)  # Shape should be [batch_size, 1]

        return x


In [13]:
def train_audio_model(model, dataloader, criterion, optimizer, device='cpu'):
    model.train()
    total_loss = 0

    for imdbids, _, audio_features, _, labels in dataloader:  # Only use text data
            
            optimizer.zero_grad()

            outputs = model(audio_features)

            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
        
    return avg_loss  # Optionally, return the last average loss if needed


In [14]:
def evaluate_audio_model(model, dataloader, criterion, device='cpu'):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for imdbids, _, audio_features, _, labels in dataloader:  # Only use text data
            #text_features, labels = text_features.to(device), labels.to(device).view(-1)
            outputs = model(audio_features)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            outputs = torch.sigmoid(outputs)

            all_preds.extend((outputs > 0.5).int().tolist())
            all_labels.extend(labels.int().tolist())

    # Calculate metrics
    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    avg_loss = total_loss / len(dataloader)

    print(f"Audio Model Evaluation - Loss: {avg_loss:.4f}, Recall: {recall:.4f}, Precision: {precision:.4f}, F1: {f1:.4f}")

    return avg_loss, precision, recall, f1


In [15]:
def test_audio_model(model, dataloader, criterion, device='cpu'):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for imdbids, _, audio_features, _, labels in dataloader:  # Only use text data
            #text_features, labels = text_features.to(device), labels.to(device).view(-1)
            outputs = model(audio_features)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            outputs = torch.sigmoid(outputs)

            all_preds.extend((outputs > 0.5).int().tolist())
            all_labels.extend(labels.int().tolist())

    # Calculate metrics
    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    avg_loss = total_loss / len(dataloader)
    
    print(f"Audio Model Test - Loss: {avg_loss:.4f}, Test Recall: {recall:.4f}, Test Precision: {precision:.4f}, Test F1: {f1:.4f}")

    return avg_loss, precision, recall, f1


In [16]:
if __name__ == "__main__":
    torch.manual_seed(42)

    # Define models for each modality
    audio_model = AudioModel()

    # Define the criterion
    criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(2.94))

    for name, param in audio_model.named_parameters():
        if param.grad is None:
            print("model:", "No gradient for:", name)
            
    # Train each modality
    optimizer = optim.Adam(audio_model.parameters(), lr=0.0001)
    num_epochs = 10

    print("\nTraining on Audio Model...")
    for epoch in range(num_epochs):
        print("-" * 40)
        print(f"Epoch {epoch + 1}/{num_epochs}")

        # Ensure you have a dataloader that yields inputs and targets
        train_loss = train_audio_model(audio_model, train_dataloader, criterion, optimizer, device='cpu')

        # Validate step
        val_loss, precision, recall, f1 = evaluate_audio_model(audio_model, val_dataloader, criterion, device='cpu')

        print(f"Training Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

    # Testing the model
    print("-" * 40)
    print("Testing the model on the test set...")
    test_loss, test_precision, test_recall, test_f1_score = test_audio_model(audio_model, test_dataloader, criterion, device='cpu')


model: No gradient for: fc1.weight
model: No gradient for: fc1.bias
model: No gradient for: fc2.weight
model: No gradient for: fc2.bias

Training on Audio Model...
----------------------------------------
Epoch 1/10
Audio Model Evaluation - Loss: 0.9953, Recall: 0.5400, Precision: 0.5192, F1: 0.5294
Training Loss: 1.0183, Validation Loss: 0.9953
----------------------------------------
Epoch 2/10
Audio Model Evaluation - Loss: 0.9441, Recall: 0.7600, Precision: 0.4130, F1: 0.5352
Training Loss: 0.9782, Validation Loss: 0.9441
----------------------------------------
Epoch 3/10
Audio Model Evaluation - Loss: 0.8988, Recall: 0.6000, Precision: 0.5000, F1: 0.5455
Training Loss: 0.9455, Validation Loss: 0.8988
----------------------------------------
Epoch 4/10
Audio Model Evaluation - Loss: 0.8898, Recall: 0.7600, Precision: 0.4471, F1: 0.5630
Training Loss: 0.9155, Validation Loss: 0.8898
----------------------------------------
Epoch 5/10
Audio Model Evaluation - Loss: 0.8739, Recall: 0

### VIDEO ONLY

In [17]:
import torch
import torch.nn as nn

class VideoModel(nn.Module):
    def __init__(self, dropout_rate=0.2):
        super(VideoModel, self).__init__()
        
        # Define the layers in a simpler structure
        self.fc1 = nn.Linear(768, 256)  # First hidden layer
        self.fc2 = nn.Linear(256, 1)    # Output layer
        self.dropout = nn.Dropout(dropout_rate)  # Dropout layer
        self.relu = nn.ReLU()

    def forward(self, x):
        x = x.mean(dim=1)  # Global Average Pooling over sequence dimension
        x = self.relu(self.fc1(x))  # Pass through the first layer with ReLU activation
        x = self.dropout(x)  # Apply dropout after the first layer
        x = self.fc2(x)  # Output layer
        return x  # Final output shape: [batch_size, 1]


In [18]:
def train_video_model(model, dataloader, criterion, optimizer, device='cpu'):
    model.train()
    total_loss = 0

    for _, _, _, video_features, labels in dataloader:  # Only use video data
            
            optimizer.zero_grad()
            
            outputs = model(video_features)

            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
        
    return avg_loss  # Optionally, return the last average loss if needed


In [19]:
def evaluate_video_model(model, dataloader, criterion, device='cpu'):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for _, _, _, video_features, labels in dataloader:  # Only use text data
            #text_features, labels = text_features.to(device), labels.to(device).view(-1)
            outputs = model(video_features)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            outputs = torch.sigmoid(outputs)

            all_preds.extend((outputs > 0.5).int().tolist())
            all_labels.extend(labels.int().tolist())

    # Calculate metrics
    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    avg_loss = total_loss / len(dataloader)

    print(f"Audio Model Evaluation - Loss: {avg_loss:.4f}, Recall: {recall:.4f}, Precision: {precision:.4f}, F1: {f1:.4f}")

    return avg_loss, precision, recall, f1


In [20]:
def test_video_model(model, dataloader, criterion, device='cpu'):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for _, _, _, video_features, labels in dataloader:  # Only use text data
            #text_features, labels = text_features.to(device), labels.to(device).view(-1)
            outputs = model(video_features)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            outputs = torch.sigmoid(outputs)

            all_preds.extend((outputs > 0.5).int().tolist())
            all_labels.extend(labels.int().tolist())

    # Calculate metrics
    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    avg_loss = total_loss / len(dataloader)
    
    print(f"Video Model Test - Loss: {avg_loss:.4f}, Test Recall: {recall:.4f}, Test Precision: {precision:.4f}, Test F1: {f1:.4f}")

    return avg_loss, precision, recall, f1


In [21]:
if __name__ == "__main__":
    torch.manual_seed(42)

    # Define models for each modality
    video_model = VideoModel()

    # Define the criterion
    criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(2.94))

    for name, param in video_model.named_parameters():
        if param.grad is None:
            print("model:", "No gradient for:", name)
            
    # Train each modality
    optimizer = optim.Adam(video_model.parameters(), lr=0.0001)
    num_epochs = 10

    print("\nTraining on Video Model...")
    for epoch in range(num_epochs):
        print("-" * 40)
        print(f"Epoch {epoch + 1}/{num_epochs}")

        # Ensure you have a dataloader that yields inputs and targets
        train_loss = train_video_model(video_model, train_dataloader, criterion, optimizer, device='cpu')

        # Validate step
        val_loss, precision, recall, f1 = evaluate_video_model(video_model, val_dataloader, criterion, device='cpu')

        print(f"Training Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

    # Testing the model
    print("-" * 40)
    print("Testing the model on the test set...")
    test_loss, test_precision, test_recall, test_f1_score = test_video_model(video_model, test_dataloader, criterion, device='cpu')


model: No gradient for: fc1.weight
model: No gradient for: fc1.bias
model: No gradient for: fc2.weight
model: No gradient for: fc2.bias

Training on Video Model...
----------------------------------------
Epoch 1/10
Audio Model Evaluation - Loss: 1.0321, Recall: 0.1400, Precision: 0.8750, F1: 0.2414
Training Loss: 1.0299, Validation Loss: 1.0321
----------------------------------------
Epoch 2/10
Audio Model Evaluation - Loss: 1.0099, Recall: 0.7200, Precision: 0.6316, F1: 0.6729
Training Loss: 1.0201, Validation Loss: 1.0099
----------------------------------------
Epoch 3/10
Audio Model Evaluation - Loss: 1.0098, Recall: 0.9000, Precision: 0.5488, F1: 0.6818
Training Loss: 1.0097, Validation Loss: 1.0098
----------------------------------------
Epoch 4/10
Audio Model Evaluation - Loss: 0.9745, Recall: 0.9000, Precision: 0.4839, F1: 0.6294
Training Loss: 0.9971, Validation Loss: 0.9745
----------------------------------------
Epoch 5/10
Audio Model Evaluation - Loss: 0.9370, Recall: 0

### Bimodal Fusion Classification

#### Bi-Concat

In [22]:
import torch
import torch.nn as nn

class BiConcatFusionModule(nn.Module):
    def __init__(self, common_dim=768, hidden_dim=768):
        super(BiConcatFusionModule, self).__init__()
        
        self.common_dim = common_dim
        self.hidden_dim = hidden_dim
        
        # Define a fully connected layer to project modalities to common dimension
        self.fc = nn.Linear(common_dim * 2, hidden_dim)  # Concatenated output of two modalities
    
    def project_to_common_dim(self, x, target_dim):
        """Project input to the common dimension if required."""
        if x.size(-1) != target_dim:
            return nn.Linear(x.size(-1), target_dim)(x)
        return x
    
    def forward(self, modality1, modality2):
        """
        Accepts two modalities and fuses them.
        
        Parameters:
        modality1 (torch.Tensor): First modality with shape (batch_size, seq_len, features).
        modality2 (torch.Tensor): Second modality with shape (batch_size, seq_len, features).
        
        Returns:
        torch.Tensor: Fused representation.
        """
        # Ensure modalities are projected to the common dimension (768)
        modality1 = self.project_to_common_dim(modality1, self.common_dim)
        modality2 = self.project_to_common_dim(modality2, self.common_dim)
        
        # Find the maximum sequence length to handle varying sequence lengths
        max_seq_len = max(modality1.size(1), modality2.size(1))

        # Pad modalities to the same sequence length
        if modality1.size(1) < max_seq_len:
            pad_size = max_seq_len - modality1.size(1)
            modality1 = torch.nn.functional.pad(modality1, (0, 0, 0, pad_size))  # Pad on the sequence dimension
        
        if modality2.size(1) < max_seq_len:
            pad_size = max_seq_len - modality2.size(1)
            modality2 = torch.nn.functional.pad(modality2, (0, 0, 0, pad_size))  # Pad on the sequence dimension
        
        # Concatenate the modalities along the feature dimension
        combined_output = torch.cat((modality1, modality2), dim=-1)  # Shape: (batch_size, max_seq_len, common_dim * 2)

        # Apply fully connected layer to get the fused output
        fused_output = self.fc(combined_output)  # Shape: (batch_size, max_seq_len, hidden_dim)
        
        return combined_output



In [23]:
# Example usage:

# Assume we have the following three modality inputs:
modality1 = torch.randn(32, 1024)         # Shape: (batch_size, 1, 1024)
modality2 = torch.randn(32, 1, 197, 768)       # Shape: (batch_size, seq_len, 768)
modality3 = torch.randn(32, 50, 768)        # Shape: (batch_size, frames, 768)

modality1 = modality1.unsqueeze(1)
modality2 = modality2.squeeze(1)

print("Text: ", modality1.shape)
print("Audio: ", modality2.shape)
print("Video: ", modality3.shape)

# Initialize the fusion module
fusion_module = BiConcatFusionModule()

# Example fusion: Fuse modality2 and modality3
fused_output = fusion_module(modality2, modality3)
print(f"Fused Output Shape: {fused_output.shape}")

# Example fusion: Fuse modality1 and modality2
fused_output = fusion_module(modality1, modality2)
print(f"Fused Output Shape: {fused_output.shape}")

# Example fusion: Fuse modality1 and modality2
fused_output = fusion_module(modality1, modality3)
print(f"Fused Output Shape: {fused_output.shape}")

Text:  torch.Size([32, 1, 1024])
Audio:  torch.Size([32, 197, 768])
Video:  torch.Size([32, 50, 768])
Fused Output Shape: torch.Size([32, 197, 1536])
Fused Output Shape: torch.Size([32, 197, 1536])
Fused Output Shape: torch.Size([32, 50, 1536])


In [24]:
for _, text_features, audio_features, _, labels in train_dataloader: 

    print(text_features.unsqueeze(1).shape)
    print(audio_features.squeeze(1).shape)
    
    # Create an instance of the FusionModule
    fusion_module = BiConcatFusionModule()

    text_features = text_features.unsqueeze(1)
    audio_features = audio_features.squeeze(1)

    # Forward pass through the fusion module
    combined_output = fusion_module(audio_features, text_features)

    print('Fused AlphaBeta: ', combined_output.shape)

torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 1536])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 1536])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 1536])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 1536])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 1536])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 1536])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 1536])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 1536])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 1536])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 1536])
torch.Size

In [25]:
import torch
import torch.nn as nn

class BiConcatClassifier(nn.Module):
    def __init__(self, input_dim=1536, hidden_dim=768, dropout_prob=0.2):
        super(BiConcatClassifier, self).__init__()
        
        # First hidden layer
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_prob)

        # Second hidden layer
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)

        # Third hidden layer (slightly smaller)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim // 2)
        
        # Output layer
        self.fc4 = nn.Linear(hidden_dim // 2, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc3(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc4(x)
        return x


In [None]:
def train_biconcat_model(model, classifier, dataloader, criterion, optimizer, device='cpu'):
    model.train()
    classifier.train()
    total_loss = 0

    for _, text_features, audio_features, video_features, labels in dataloader:  
            
            optimizer.zero_grad()

            text_features = text_features.unsqueeze(1)
            audio_features = audio_features.squeeze(1)
            video_features = video_features

            combined_output = fusion_module(text_features, audio_features)

            pooled_output = torch.mean(combined_output, dim=1) 
            
            logits = classifier(pooled_output)

            loss = criterion(logits, labels)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
        
    return avg_loss  # Optionally, return the last average loss if needed

In [27]:
def evaluate_biconcat_model(model, classifier, dataloader, criterion, device='cpu'):
    model.eval()  # Set the model to evaluation mode
    classifier.eval()  # Set the classifier to evaluation mode
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for _, text_features, audio_features, video_features, labels in dataloader:

            text_features = text_features.unsqueeze(1)
            audio_features = audio_features.squeeze(1)
            video_features = video_features

            # Get the combined output from the fusion module
            combined_output = model(text_features, audio_features)  

            pooled_output = torch.mean(combined_output, dim=1) 
            
            # Get logits from the classifier
            logits = classifier(pooled_output)

            # Calculate loss
            loss = criterion(logits, labels)
            total_loss += loss.item()

            # Apply sigmoid to get probabilities
            outputs = torch.sigmoid(logits)

            # Convert probabilities to binary predictions
            all_preds.extend((outputs > 0.5).int().tolist())
            all_labels.extend(labels.int().tolist())

    # Calculate metrics
    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    accuracy = accuracy_score(all_labels, all_preds)  # Calculate accuracy
    avg_loss = total_loss / len(dataloader)

    print(f"Alpha-Beta Model Evaluation - Loss: {avg_loss:.4f}, Recall: {recall:.4f}, Precision: {precision:.4f}, Accuracy: {accuracy:.4f}. F1: {f1:.4f}")

    return avg_loss, precision, recall, accuracy, f1

In [28]:
def test_biconcat_model(model, classifier, dataloader, criterion, device='cpu'):
    model.eval()  # Set the model to evaluation mode
    classifier.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for _, text_features, audio_features, video_features, labels in dataloader:
            
            text_features = text_features.unsqueeze(1)
            audio_features = audio_features.squeeze(1)
            video_features = video_features

            # Forward pass through the model
            combined_output = model(text_features, audio_features)

            pooled_output = torch.mean(combined_output, dim=1) 

            logits = classifier(pooled_output)
            
            loss = criterion(logits, labels)
            total_loss += loss.item()

            # Apply sigmoid to obtain probabilities
            outputs = torch.sigmoid(logits)

            # Convert probabilities to binary predictions
            all_preds.extend((outputs > 0.5).int().tolist())
            all_labels.extend(labels.int().tolist())

    # Calculate metrics
    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    accuracy = accuracy_score(all_labels, all_preds)  # Calculate accuracy
    f1 = f1_score(all_labels, all_preds)
    avg_loss = total_loss / len(dataloader)
    
    print(f"AlphaBeta Model Test - Loss: {avg_loss:.4f}, Test Recall: {recall:.4f}, Test Precision: {precision:.4f}, Test Accuracy: {accuracy:.4f},Test F1: {f1:.4f}")

    return avg_loss, precision, recall, accuracy, f1

In [29]:
if __name__ == "__main__":
     torch.manual_seed(42)

     # Initialize models
     fusion_model = BiConcatFusionModule()
     classifier = BiConcatClassifier()

     # Define loss function and optimizer
     criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(2.94))
     optimizer = optim.Adam(
          list(classifier.parameters()),
         lr=0.0001
        )

     # Training loop
     num_epochs = 15
     print("\nTraining Biconcat Model...")
    
     for epoch in range(num_epochs):
         print("-" * 40)
         print(f"Epoch {epoch + 1}/{num_epochs}")

         # Train step
         train_loss = train_biconcat_model(
             fusion_model, classifier, train_dataloader, 
             criterion, optimizer, device='cpu'
         )

         # Validation step
         val_loss, precision, recall, accuracy, f1 = evaluate_biconcat_model(
             fusion_model, classifier, val_dataloader, 
             criterion, device='cpu'
         )

         print(f"Training Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

     # Final testing
     print("-" * 40)
     print("Testing the model on the test set...")
     test_loss, test_precision, test_recall, test_accuracy, test_f1_score = test_biconcat_model(
         fusion_model, classifier, test_dataloader, 
         criterion, device='cpu'
     )

     print(f"Test Results:")
     print(f"Loss: {test_loss:.4f}")
     print(f"Precision: {test_precision:.4f}")
     print(f"Recall: {test_recall:.4f}")
     print(f"Accuracy: {test_accuracy:.4f}")
     print(f"F1 Score: {test_f1_score:.4f}")


Training Biconcat Model...
----------------------------------------
Epoch 1/15
Alpha-Beta Model Evaluation - Loss: 1.0188, Recall: 0.6600, Precision: 0.3882, Accuracy: 0.6533. F1: 0.4889
Training Loss: 1.0286, Validation Loss: 1.0188
----------------------------------------
Epoch 2/15
Alpha-Beta Model Evaluation - Loss: 0.9601, Recall: 0.7800, Precision: 0.3900, Accuracy: 0.6382. F1: 0.5200
Training Loss: 1.0042, Validation Loss: 0.9601
----------------------------------------
Epoch 3/15
Alpha-Beta Model Evaluation - Loss: 0.8819, Recall: 0.5600, Precision: 0.4828, Accuracy: 0.7387. F1: 0.5185
Training Loss: 0.9452, Validation Loss: 0.8819
----------------------------------------
Epoch 4/15
Alpha-Beta Model Evaluation - Loss: 0.8597, Recall: 0.5600, Precision: 0.5385, Accuracy: 0.7688. F1: 0.5490
Training Loss: 0.8861, Validation Loss: 0.8597
----------------------------------------
Epoch 5/15
Alpha-Beta Model Evaluation - Loss: 0.8734, Recall: 0.6000, Precision: 0.4839, Accuracy: 0.7

### Bi-Average

In [30]:
import torch
import torch.nn as nn

class BiAverageFusionModule(nn.Module):
    def __init__(self, common_dim=768, hidden_dim=768):
        super(BiAverageFusionModule, self).__init__()
        
        self.common_dim = common_dim
        self.hidden_dim = hidden_dim
        
        # Define a fully connected layer to project modalities to common dimension
        self.fc = nn.Linear(common_dim, hidden_dim)  # After averaging, only one modality's size

    def project_to_common_dim(self, x, target_dim):
        """Project input to the common dimension if required."""
        if x.size(-1) != target_dim:
            return nn.Linear(x.size(-1), target_dim)(x)
        return x
    
    def forward(self, modality1, modality2):
        """
        Accepts two modalities and fuses them.
        
        Parameters:
        modality1 (torch.Tensor): First modality with shape (batch_size, seq_len, features).
        modality2 (torch.Tensor): Second modality with shape (batch_size, seq_len, features).
        
        Returns:
        torch.Tensor: Fused representation.
        """
        
        # Ensure modalities are projected to the common dimension (768)
        modality1 = self.project_to_common_dim(modality1, self.common_dim)
        modality2 = self.project_to_common_dim(modality2, self.common_dim)
        
        # Find the maximum sequence length to handle varying sequence lengths
        max_seq_len = max(modality1.size(1), modality2.size(1))

        # Pad modalities to the same sequence length
        if modality1.size(1) < max_seq_len:
            pad_size = max_seq_len - modality1.size(1)
            modality1 = torch.nn.functional.pad(modality1, (0, 0, 0, pad_size))  # Pad on the sequence dimension
        
        if modality2.size(1) < max_seq_len:
            pad_size = max_seq_len - modality2.size(1)
            modality2 = torch.nn.functional.pad(modality2, (0, 0, 0, pad_size))  # Pad on the sequence dimension
        
        # Average the modalities instead of concatenating them
        averaged_output = (modality1 + modality2) / 2  # Shape: (batch_size, max_seq_len, common_dim)

        # Apply fully connected layer to get the fused output
        fused_output = self.fc(averaged_output)  # Shape: (batch_size, max_seq_len, hidden_dim)
        
        return fused_output


In [31]:
# Example usage:

# Assume we have the following three modality inputs:
modality1 = torch.randn(32, 1024)         # Shape: (batch_size, 1, 1024)
modality2 = torch.randn(32, 1, 197, 768)       # Shape: (batch_size, seq_len, 768)
modality3 = torch.randn(32, 50, 768)        # Shape: (batch_size, frames, 768)

modality1 = modality1.unsqueeze(1)
modality2 = modality2.squeeze(1)

print("Text: ", modality1.shape)
print("Audio: ", modality2.shape)
print("Video: ", modality3.shape)

# Initialize the fusion module
fusion_module = BiAverageFusionModule()

# Example fusion: Fuse modality2 and modality3
fused_output = fusion_module(modality2, modality3)
print(f"Fused Output Shape: {fused_output.shape}")

# Example fusion: Fuse modality1 and modality2
fused_output = fusion_module(modality1, modality2)
print(f"Fused Output Shape: {fused_output.shape}")

# Example fusion: Fuse modality1 and modality2
fused_output = fusion_module(modality1, modality3)
print(f"Fused Output Shape: {fused_output.shape}")

Text:  torch.Size([32, 1, 1024])
Audio:  torch.Size([32, 197, 768])
Video:  torch.Size([32, 50, 768])
Fused Output Shape: torch.Size([32, 197, 768])
Fused Output Shape: torch.Size([32, 197, 768])
Fused Output Shape: torch.Size([32, 50, 768])


In [32]:
for _, text_features, audio_features, _, labels in train_dataloader: 

    print(text_features.unsqueeze(1).shape)
    print(audio_features.squeeze(1).shape)
    
    # Create an instance of the FusionModule
    fusion_module = BiAverageFusionModule()

    text_features = text_features.unsqueeze(1)
    audio_features = audio_features.squeeze(1)

    # Forward pass through the fusion module
    combined_output = fusion_module(audio_features, text_features)

    print('Fused AlphaBeta: ', combined_output.shape)

torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1

In [40]:
import torch
import torch.nn as nn

class BiAverageClassifier(nn.Module):
    def __init__(self, input_dim=768, hidden_dim=382, dropout_prob=0.2):
        super(BiAverageClassifier, self).__init__()
        
        # First hidden layer
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_prob)

        # Second hidden layer
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)

        # Third hidden layer (slightly smaller)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim // 2)
        
        # Output layer
        self.fc4 = nn.Linear(hidden_dim // 2, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc3(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc4(x)
        return x


In [41]:
def train_biavg_model(model, classifier, dataloader, criterion, optimizer, device='cpu'):
    model.train()
    classifier.train()
    total_loss = 0

    for _, text_features, audio_features, video_features, labels in dataloader:  
            
            optimizer.zero_grad()

            text_features = text_features.unsqueeze(1)
            audio_features = audio_features.squeeze(1)
            video_features = video_features

            combined_output = fusion_module(text_features, video_features)

            pooled_output = torch.mean(combined_output, dim=1) 
            
            logits = classifier(pooled_output)

            loss = criterion(logits, labels)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
        
    return avg_loss  # Optionally, return the last average loss if needed

In [42]:
def evaluate_biavg_model(model, classifier, dataloader, criterion, device='cpu'):
    model.eval()  # Set the model to evaluation mode
    classifier.eval()  # Set the classifier to evaluation mode
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for _, text_features, audio_features, video_features, labels in dataloader:

            text_features = text_features.unsqueeze(1)
            audio_features = audio_features.squeeze(1)
            video_features = video_features

            # Get the combined output from the fusion module
            combined_output = model(text_features, video_features)  

            pooled_output = torch.mean(combined_output, dim=1) 
            
            # Get logits from the classifier
            logits = classifier(pooled_output)

            # Calculate loss
            loss = criterion(logits, labels)
            total_loss += loss.item()

            # Apply sigmoid to get probabilities
            outputs = torch.sigmoid(logits)

            # Convert probabilities to binary predictions
            all_preds.extend((outputs > 0.5).int().tolist())
            all_labels.extend(labels.int().tolist())

    # Calculate metrics
    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    accuracy = accuracy_score(all_labels, all_preds)  # Calculate accuracy
    avg_loss = total_loss / len(dataloader)

    print(f"Alpha-Beta Model Evaluation - Loss: {avg_loss:.4f}, Recall: {recall:.4f}, Precision: {precision:.4f}, Accuracy: {accuracy:.4f}. F1: {f1:.4f}")

    return avg_loss, precision, recall, accuracy, f1

In [43]:
def test_biavg_model(model, classifier, dataloader, criterion, device='cpu'):
    model.eval()  # Set the model to evaluation mode
    classifier.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for _, text_features, audio_features, video_features, labels in dataloader:
            
            text_features = text_features.unsqueeze(1)
            audio_features = audio_features.squeeze(1)
            video_features = video_features

            # Forward pass through the model
            combined_output = model(text_features, video_features)

            pooled_output = torch.mean(combined_output, dim=1) 

            logits = classifier(pooled_output)
            
            loss = criterion(logits, labels)
            total_loss += loss.item()

            # Apply sigmoid to obtain probabilities
            outputs = torch.sigmoid(logits)

            # Convert probabilities to binary predictions
            all_preds.extend((outputs > 0.5).int().tolist())
            all_labels.extend(labels.int().tolist())

    # Calculate metrics
    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    accuracy = accuracy_score(all_labels, all_preds)  # Calculate accuracy
    f1 = f1_score(all_labels, all_preds)
    avg_loss = total_loss / len(dataloader)
    
    print(f"AlphaBeta Model Test - Loss: {avg_loss:.4f}, Test Recall: {recall:.4f}, Test Precision: {precision:.4f}, Test Accuracy: {accuracy:.4f},Test F1: {f1:.4f}")

    return avg_loss, precision, recall, accuracy, f1

In [46]:
if __name__ == "__main__":
     torch.manual_seed(42)

     # Initialize models
     fusion_model = BiAverageFusionModule()
     classifier = BiAverageClassifier()

     # Define loss function and optimizer
     criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(2.94))
     optimizer = optim.Adam(
          list(classifier.parameters()),
         lr=0.001
        )

     # Training loop
     num_epochs = 10
     print("\nTraining Biaverage Model...")
    
     for epoch in range(num_epochs):
         print("-" * 40)
         print(f"Epoch {epoch + 1}/{num_epochs}")

         # Train step
         train_loss = train_biavg_model(
             fusion_model, classifier, train_dataloader, 
             criterion, optimizer, device='cpu'
         )

         # Validation step
         val_loss, precision, recall, accuracy, f1 = evaluate_biavg_model(
             fusion_model, classifier, val_dataloader, 
             criterion, device='cpu'
         )

         print(f"Training Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

     # Final testing
     print("-" * 40)
     print("Testing the model on the test set...")
     test_loss, test_precision, test_recall, test_accuracy, test_f1_score = test_biavg_model(
         fusion_model, classifier, test_dataloader, 
         criterion, device='cpu'
     )

     print(f"Test Results:")
     print(f"Loss: {test_loss:.4f}")
     print(f"Precision: {test_precision:.4f}")
     print(f"Recall: {test_recall:.4f}")
     print(f"Accuracy: {test_accuracy:.4f}")
     print(f"F1 Score: {test_f1_score:.4f}")


Training Biaverage Model...
----------------------------------------
Epoch 1/10


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Alpha-Beta Model Evaluation - Loss: 1.0244, Recall: 0.0000, Precision: 0.0000, Accuracy: 0.7487. F1: 0.0000
Training Loss: 1.0374, Validation Loss: 1.0244
----------------------------------------
Epoch 2/10
Alpha-Beta Model Evaluation - Loss: 1.0328, Recall: 0.0400, Precision: 0.5000, Accuracy: 0.7487. F1: 0.0741
Training Loss: 1.0306, Validation Loss: 1.0328
----------------------------------------
Epoch 3/10
Alpha-Beta Model Evaluation - Loss: 1.0350, Recall: 0.1000, Precision: 0.3333, Accuracy: 0.7236. F1: 0.1538
Training Loss: 0.9886, Validation Loss: 1.0350
----------------------------------------
Epoch 4/10
Alpha-Beta Model Evaluation - Loss: 1.0281, Recall: 0.4000, Precision: 0.2597, Accuracy: 0.5628. F1: 0.3150
Training Loss: 0.8787, Validation Loss: 1.0281
----------------------------------------
Epoch 5/10
Alpha-Beta Model Evaluation - Loss: 1.0559, Recall: 0.4400, Precision: 0.2268, Accuracy: 0.4824. F1: 0.2993
Training Loss: 0.7576, Validation Loss: 1.0559
-----------------

### Trimodal

### Tri-Concat

In [67]:
import torch
import torch.nn as nn

class TriConcatFusionModule(nn.Module):
    def __init__(self, common_dim=768, hidden_dim=768):
        super(TriConcatFusionModule, self).__init__()
        
        self.common_dim = common_dim
        self.hidden_dim = hidden_dim
        
        # Define a fully connected layer to project modalities to common dimension
        self.fc = nn.Linear(common_dim * 3, hidden_dim)  # Concatenated output of three modalities
    
    def project_to_common_dim(self, x, target_dim):
        """Project input to the common dimension if required."""
        if x.size(-1) != target_dim:
            return nn.Linear(x.size(-1), target_dim)(x)
        return x
    
    def forward(self, modality1, modality2, modality3):
        """
        Accepts three modalities and fuses them.
        
        Parameters:
        modality1 (torch.Tensor): First modality with shape (batch_size, seq_len, features).
        modality2 (torch.Tensor): Second modality with shape (batch_size, seq_len, features).
        modality3 (torch.Tensor): Third modality with shape (batch_size, seq_len, features).
        
        Returns:
        torch.Tensor: Fused representation.
        """
        
        # Ensure modalities are projected to the common dimension (768)
        modality1 = self.project_to_common_dim(modality1, self.common_dim)
        modality2 = self.project_to_common_dim(modality2, self.common_dim)
        modality3 = self.project_to_common_dim(modality3, self.common_dim)
        
        # Find the maximum sequence length to handle varying sequence lengths
        max_seq_len = max(modality1.size(1), modality2.size(1), modality3.size(1))

        # Pad modalities to the same sequence length
        if modality1.size(1) < max_seq_len:
            pad_size = max_seq_len - modality1.size(1)
            modality1 = torch.nn.functional.pad(modality1, (0, 0, 0, pad_size))  # Pad on the sequence dimension
        
        if modality2.size(1) < max_seq_len:
            pad_size = max_seq_len - modality2.size(1)
            modality2 = torch.nn.functional.pad(modality2, (0, 0, 0, pad_size))  # Pad on the sequence dimension
        
        if modality3.size(1) < max_seq_len:
            pad_size = max_seq_len - modality3.size(1)
            modality3 = torch.nn.functional.pad(modality3, (0, 0, 0, pad_size))  # Pad on the sequence dimension
        
        # Concatenate the modalities along the feature dimension
        combined_output = torch.cat((modality1, modality2, modality3), dim=-1)  # Shape: (batch_size, max_seq_len, common_dim * 3)

        # Apply fully connected layer to get the fused output
        fused_output = self.fc(combined_output)  # Shape: (batch_size, max_seq_len, hidden_dim)
        
        return combined_output


In [68]:
# Example input tensors for modality1 (e.g., text), modality2 (e.g., audio), and modality3 (e.g., video)
batch_size = 32
seq_len_1 = 10  # Length of sequence for modality 1
seq_len_2 = 12  # Length of sequence for modality 2
seq_len_3 = 8   # Length of sequence for modality 3
common_dim = 768  # Common feature dimension for all modalities

# Creating random example tensors for the modalities
modality1 = torch.randn(batch_size, seq_len_1, common_dim)  # Shape: (batch_size, seq_len_1, common_dim)
modality2 = torch.randn(batch_size, seq_len_2, common_dim)  # Shape: (batch_size, seq_len_2, common_dim)
modality3 = torch.randn(batch_size, seq_len_3, common_dim)  # Shape: (batch_size, seq_len_3, common_dim)

# Instantiate the fusion module
fusion_module = TriConcatFusionModule()

# Forward pass through the fusion module
fused_output = fusion_module(modality1, modality2, modality3)

# Print the shape of the fused output
print(f"Fused Output Shape: {fused_output.shape}")


Fused Output Shape: torch.Size([32, 12, 2304])


In [69]:
for _, text_features, audio_features, video_features, labels in train_dataloader: 

    print(text_features.unsqueeze(1).shape)
    print(audio_features.squeeze(1).shape)
    
    # Create an instance of the FusionModule
    fusion_module = TriConcatFusionModule()

    text_features = text_features.unsqueeze(1)
    audio_features = audio_features.squeeze(1)

    # Forward pass through the fusion module
    combined_output = fusion_module(audio_features, text_features, video_features)

    print('Fused AlphaBeta: ', combined_output.shape)

torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 2304])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 2304])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 2304])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 2304])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 2304])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 2304])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 2304])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 2304])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 2304])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 2304])
torch.Size

In [85]:
import torch
import torch.nn as nn

class TriConcatClassifier(nn.Module):
    def __init__(self, input_dim=2304, hidden_dim=1152, dropout_prob=0.2):
        super(TriConcatClassifier, self).__init__()
        
        # First hidden layer
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_prob)

        # Second hidden layer
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)

        # Third hidden layer (slightly smaller)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim // 2)
        
        # Output layer
        self.fc4 = nn.Linear(hidden_dim // 2, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc3(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc4(x)
        return x

In [86]:
def train_triconcat_model(model, classifier, dataloader, criterion, optimizer, device='cpu'):
    model.train()
    classifier.train()
    total_loss = 0

    for _, text_features, audio_features, video_features, labels in dataloader:  
            
            optimizer.zero_grad()

            text_features = text_features.unsqueeze(1)
            audio_features = audio_features.squeeze(1)
            video_features = video_features

            combined_output = fusion_module(audio_features, text_features, video_features)

            pooled_output = torch.mean(combined_output, dim=1) 
            
            logits = classifier(pooled_output)

            loss = criterion(logits, labels)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
        
    return avg_loss  # Optionally, return the last average loss if needed

In [87]:
def evaluate_triconcat_model(model, classifier, dataloader, criterion, device='cpu'):
    model.eval()  # Set the model to evaluation mode
    classifier.eval()  # Set the classifier to evaluation mode
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for _, text_features, audio_features, video_features, labels in dataloader:

            text_features = text_features.unsqueeze(1)
            audio_features = audio_features.squeeze(1)
            video_features = video_features

            # Get the combined output from the fusion module
            combined_output = fusion_module(audio_features, text_features, video_features)

            pooled_output = torch.mean(combined_output, dim=1) 
            
            # Get logits from the classifier
            logits = classifier(pooled_output)

            # Calculate loss
            loss = criterion(logits, labels)
            total_loss += loss.item()

            # Apply sigmoid to get probabilities
            outputs = torch.sigmoid(logits)

            # Convert probabilities to binary predictions
            all_preds.extend((outputs > 0.5).int().tolist())
            all_labels.extend(labels.int().tolist())

    # Calculate metrics
    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    accuracy = accuracy_score(all_labels, all_preds)  # Calculate accuracy
    avg_loss = total_loss / len(dataloader)

    print(f"Alpha-Beta Model Evaluation - Loss: {avg_loss:.4f}, Recall: {recall:.4f}, Precision: {precision:.4f}, Accuracy: {accuracy:.4f}. F1: {f1:.4f}")

    return avg_loss, precision, recall, accuracy, f1

In [88]:
def test_triconcat_model(model, classifier, dataloader, criterion, device='cpu'):
    model.eval()  # Set the model to evaluation mode
    classifier.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for _, text_features, audio_features, video_features, labels in dataloader:
            
            text_features = text_features.unsqueeze(1)
            audio_features = audio_features.squeeze(1)
            video_features = video_features

            # Get the combined output from the fusion module
            combined_output = fusion_module(audio_features, text_features, video_features)

            pooled_output = torch.mean(combined_output, dim=1) 

            logits = classifier(pooled_output)
            
            loss = criterion(logits, labels)
            total_loss += loss.item()

            # Apply sigmoid to obtain probabilities
            outputs = torch.sigmoid(logits)

            # Convert probabilities to binary predictions
            all_preds.extend((outputs > 0.5).int().tolist())
            all_labels.extend(labels.int().tolist())

    # Calculate metrics
    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    accuracy = accuracy_score(all_labels, all_preds)  # Calculate accuracy
    f1 = f1_score(all_labels, all_preds)
    avg_loss = total_loss / len(dataloader)
    
    print(f"AlphaBeta Model Test - Loss: {avg_loss:.4f}, Test Recall: {recall:.4f}, Test Precision: {precision:.4f}, Test Accuracy: {accuracy:.4f},Test F1: {f1:.4f}")

    return avg_loss, precision, recall, accuracy, f1

In [84]:
if __name__ == "__main__":
     torch.manual_seed(42)

     # Initialize models
     fusion_model = TriConcatFusionModule()
     classifier = TriConcatClassifier()

     # Define loss function and optimizer
     criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(2.94))
     optimizer = optim.Adam(
          list(classifier.parameters()),
         lr=0.001
        )

     # Training loop
     num_epochs = 10
     print("\nTraining Biaverage Model...")
    
     for epoch in range(num_epochs):
         print("-" * 40)
         print(f"Epoch {epoch + 1}/{num_epochs}")

         # Train step
         train_loss = train_triconcat_model(
             fusion_model, classifier, train_dataloader, 
             criterion, optimizer, device='cpu'
         )

         # Validation step
         val_loss, precision, recall, accuracy, f1 = evaluate_triconcat_model(
             fusion_model, classifier, val_dataloader, 
             criterion, device='cpu'
         )

         print(f"Training Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

     # Final testing
     print("-" * 40)
     print("Testing the model on the test set...")
     test_loss, test_precision, test_recall, test_accuracy, test_f1_score = test_triconcat_model(
         fusion_model, classifier, test_dataloader, 
         criterion, device='cpu'
     )

     print(f"Test Results:")
     print(f"Loss: {test_loss:.4f}")
     print(f"Precision: {test_precision:.4f}")
     print(f"Recall: {test_recall:.4f}")
     print(f"Accuracy: {test_accuracy:.4f}")
     print(f"F1 Score: {test_f1_score:.4f}")


Training Biaverage Model...
----------------------------------------
Epoch 1/10
Alpha-Beta Model Evaluation - Loss: 0.7425, Recall: 0.7800, Precision: 0.5065, Accuracy: 0.7538. F1: 0.6142
Training Loss: 0.9662, Validation Loss: 0.7425
----------------------------------------
Epoch 2/10
Alpha-Beta Model Evaluation - Loss: 0.5955, Recall: 0.6800, Precision: 0.7727, Accuracy: 0.8693. F1: 0.7234
Training Loss: 0.7961, Validation Loss: 0.5955
----------------------------------------
Epoch 3/10
Alpha-Beta Model Evaluation - Loss: 0.6330, Recall: 0.9400, Precision: 0.4563, Accuracy: 0.7035. F1: 0.6144
Training Loss: 0.7048, Validation Loss: 0.6330
----------------------------------------
Epoch 4/10
Alpha-Beta Model Evaluation - Loss: 0.5815, Recall: 0.9400, Precision: 0.5000, Accuracy: 0.7487. F1: 0.6528
Training Loss: 0.6464, Validation Loss: 0.5815
----------------------------------------
Epoch 5/10
Alpha-Beta Model Evaluation - Loss: 0.7178, Recall: 0.6000, Precision: 0.9091, Accuracy: 0.

### Tri-Average

In [91]:
import torch
import torch.nn as nn

class TriAverageFusionModule(nn.Module):
    def __init__(self, common_dim=768, hidden_dim=768):
        super(TriAverageFusionModule, self).__init__()
        
        self.common_dim = common_dim
        self.hidden_dim = hidden_dim
        
        # Define a fully connected layer to project modalities to the hidden dimension
        self.fc = nn.Linear(common_dim, hidden_dim)  # Output will be projected to hidden_dim
    
    def project_to_common_dim(self, x, target_dim):
        """Project input to the common dimension if required."""
        if x.size(-1) != target_dim:
            return nn.Linear(x.size(-1), target_dim)(x)
        return x
    
    def forward(self, modality1, modality2, modality3):
        """
        Accepts three modalities and fuses them by averaging.
        
        Parameters:
        modality1 (torch.Tensor): First modality with shape (batch_size, seq_len, features).
        modality2 (torch.Tensor): Second modality with shape (batch_size, seq_len, features).
        modality3 (torch.Tensor): Third modality with shape (batch_size, seq_len, features).
        
        Returns:
        torch.Tensor: Fused representation after averaging and projection.
        """
        
        # Ensure modalities are projected to the common dimension (768)
        modality1 = self.project_to_common_dim(modality1, self.common_dim)
        modality2 = self.project_to_common_dim(modality2, self.common_dim)
        modality3 = self.project_to_common_dim(modality3, self.common_dim)
        
        # Find the maximum sequence length to handle varying sequence lengths
        max_seq_len = max(modality1.size(1), modality2.size(1), modality3.size(1))
        
        # Pad modalities to the same sequence length
        if modality1.size(1) < max_seq_len:
            pad_size = max_seq_len - modality1.size(1)
            modality1 = torch.nn.functional.pad(modality1, (0, 0, 0, pad_size))  # Pad on the sequence dimension
        
        if modality2.size(1) < max_seq_len:
            pad_size = max_seq_len - modality2.size(1)
            modality2 = torch.nn.functional.pad(modality2, (0, 0, 0, pad_size))  # Pad on the sequence dimension
        
        if modality3.size(1) < max_seq_len:
            pad_size = max_seq_len - modality3.size(1)
            modality3 = torch.nn.functional.pad(modality3, (0, 0, 0, pad_size))  # Pad on the sequence dimension
        
        # Average the three modalities along the feature dimension
        combined_output = (modality1 + modality2 + modality3) / 3  # Shape: (batch_size, max_seq_len, common_dim)
        
        # Apply fully connected layer to get the fused output
        fused_output = self.fc(combined_output)  # Shape: (batch_size, max_seq_len, hidden_dim)
        
        return fused_output


In [92]:
# Example input tensors for modality1 (e.g., text), modality2 (e.g., audio), and modality3 (e.g., video)
batch_size = 32
seq_len_1 = 10  # Length of sequence for modality 1
seq_len_2 = 12  # Length of sequence for modality 2
seq_len_3 = 8   # Length of sequence for modality 3
common_dim = 768  # Common feature dimension for all modalities

# Creating random example tensors for the modalities
modality1 = torch.randn(batch_size, seq_len_1, common_dim)  # Shape: (batch_size, seq_len_1, common_dim)
modality2 = torch.randn(batch_size, seq_len_2, common_dim)  # Shape: (batch_size, seq_len_2, common_dim)
modality3 = torch.randn(batch_size, seq_len_3, common_dim)  # Shape: (batch_size, seq_len_3, common_dim)

# Instantiate the fusion module
fusion_module = TriAverageFusionModule()

# Forward pass through the fusion module
fused_output = fusion_module(modality1, modality2, modality3)

# Print the shape of the fused output
print(f"Fused Output Shape: {fused_output.shape}")


Fused Output Shape: torch.Size([32, 12, 768])


In [93]:
for _, text_features, audio_features, video_features, labels in train_dataloader: 

    print(text_features.unsqueeze(1).shape)
    print(audio_features.squeeze(1).shape)
    
    # Create an instance of the FusionModule
    fusion_module = TriAverageFusionModule()

    text_features = text_features.unsqueeze(1)
    audio_features = audio_features.squeeze(1)

    # Forward pass through the fusion module
    combined_output = fusion_module(audio_features, text_features, video_features)

    print('Fused AlphaBeta: ', combined_output.shape)

torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1024])
torch.Size([32, 197, 768])
Fused AlphaBeta:  torch.Size([32, 197, 768])
torch.Size([32, 1, 1

In [107]:
import torch
import torch.nn as nn

class TriAverageClassifier(nn.Module):
    def __init__(self, input_dim=768, hidden_dim=512, dropout_prob=0.2):
        super(TriAverageClassifier, self).__init__()
        
        # First hidden layer
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_prob)

        # Second hidden layer
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)

        # Third hidden layer (slightly smaller)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim // 2)
        
        # Output layer
        self.fc4 = nn.Linear(hidden_dim // 2, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc3(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.fc4(x)
        return x
    

In [108]:
def train_triavg_model(model, classifier, dataloader, criterion, optimizer, device='cpu'):
    model.train()
    classifier.train()
    total_loss = 0

    for _, text_features, audio_features, video_features, labels in dataloader:  
            
            optimizer.zero_grad()

            text_features = text_features.unsqueeze(1)
            audio_features = audio_features.squeeze(1)
            video_features = video_features

            combined_output = fusion_module(audio_features, text_features, video_features)

            pooled_output = torch.mean(combined_output, dim=1) 
            
            logits = classifier(pooled_output)

            loss = criterion(logits, labels)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
        
    return avg_loss  # Optionally, return the last average loss if needed

In [109]:
def evaluate_triavg_model(model, classifier, dataloader, criterion, device='cpu'):
    model.eval()  # Set the model to evaluation mode
    classifier.eval()  # Set the classifier to evaluation mode
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for _, text_features, audio_features, video_features, labels in dataloader:

            text_features = text_features.unsqueeze(1)
            audio_features = audio_features.squeeze(1)
            video_features = video_features

            # Get the combined output from the fusion module
            combined_output = model(audio_features, text_features, video_features)  

            pooled_output = torch.mean(combined_output, dim=1) 
            
            # Get logits from the classifier
            logits = classifier(pooled_output)

            # Calculate loss
            loss = criterion(logits, labels)
            total_loss += loss.item()

            # Apply sigmoid to get probabilities
            outputs = torch.sigmoid(logits)

            # Convert probabilities to binary predictions
            all_preds.extend((outputs > 0.5).int().tolist())
            all_labels.extend(labels.int().tolist())

    # Calculate metrics
    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    accuracy = accuracy_score(all_labels, all_preds)  # Calculate accuracy
    avg_loss = total_loss / len(dataloader)

    print(f"AlphaBetaGamma Model Evaluation - Loss: {avg_loss:.4f}, Recall: {recall:.4f}, Precision: {precision:.4f}, Accuracy: {accuracy:.4f}. F1: {f1:.4f}")

    return avg_loss, precision, recall, accuracy, f1

In [110]:
def test_triavg_model(model, classifier, dataloader, criterion, device='cpu'):
    model.eval()  # Set the model to evaluation mode
    classifier.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for _, text_features, audio_features, video_features, labels in dataloader:
            
            text_features = text_features.unsqueeze(1)
            audio_features = audio_features.squeeze(1)
            video_features = video_features

            # Forward pass through the model
            combined_output = model(audio_features, text_features, video_features)

            pooled_output = torch.mean(combined_output, dim=1) 

            logits = classifier(pooled_output)
            
            loss = criterion(logits, labels)
            total_loss += loss.item()

            # Apply sigmoid to obtain probabilities
            outputs = torch.sigmoid(logits)

            # Convert probabilities to binary predictions
            all_preds.extend((outputs > 0.5).int().tolist())
            all_labels.extend(labels.int().tolist())

    # Calculate metrics
    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    accuracy = accuracy_score(all_labels, all_preds)  # Calculate accuracy
    f1 = f1_score(all_labels, all_preds)
    avg_loss = total_loss / len(dataloader)
    
    print(f"AlphaBetaGamma Model Test - Loss: {avg_loss:.4f}, Test Recall: {recall:.4f}, Test Precision: {precision:.4f}, Test Accuracy: {accuracy:.4f},Test F1: {f1:.4f}")

    return avg_loss, precision, recall, accuracy, f1

In [111]:
if __name__ == "__main__":
     torch.manual_seed(42)

     # Initialize models
     fusion_model = TriAverageFusionModule()
     classifier = TriAverageClassifier()

     # Define loss function and optimizer
     criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(2.94))
     optimizer = optim.Adam(
          list(classifier.parameters()),
         lr=0.001
        )

     # Training loop
     num_epochs = 10
     print("\nTraining TriAverage Model...")
    
     for epoch in range(num_epochs):
         print("-" * 40)
         print(f"Epoch {epoch + 1}/{num_epochs}")

         # Train step
         train_loss = train_triavg_model(
             fusion_model, classifier, train_dataloader, 
             criterion, optimizer, device='cpu'
         )

         # Validation step
         val_loss, precision, recall, accuracy, f1 = evaluate_triavg_model(
             fusion_model, classifier, val_dataloader, 
             criterion, device='cpu'
         )

         print(f"Training Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

     # Final testing
     print("-" * 40)
     print("Testing the model on the test set...")
     test_loss, test_precision, test_recall, test_accuracy, test_f1_score = test_triavg_model(
         fusion_model, classifier, test_dataloader, 
         criterion, device='cpu'
     )

     print(f"Test Results:")
     print(f"Loss: {test_loss:.4f}")
     print(f"Precision: {test_precision:.4f}")
     print(f"Recall: {test_recall:.4f}")
     print(f"Accuracy: {test_accuracy:.4f}")
     print(f"F1 Score: {test_f1_score:.4f}")


Training TriAverage Model...
----------------------------------------
Epoch 1/10
AlphaBetaGamma Model Evaluation - Loss: 1.0178, Recall: 1.0000, Precision: 0.2577, Accuracy: 0.2764. F1: 0.4098
Training Loss: 1.0283, Validation Loss: 1.0178
----------------------------------------
Epoch 2/10
AlphaBetaGamma Model Evaluation - Loss: 1.0571, Recall: 1.0000, Precision: 0.2513, Accuracy: 0.2513. F1: 0.4016
Training Loss: 0.9603, Validation Loss: 1.0571
----------------------------------------
Epoch 3/10
AlphaBetaGamma Model Evaluation - Loss: 1.0963, Recall: 1.0000, Precision: 0.2525, Accuracy: 0.2563. F1: 0.4032
Training Loss: 0.8844, Validation Loss: 1.0963
----------------------------------------
Epoch 4/10
AlphaBetaGamma Model Evaluation - Loss: 1.0744, Recall: 1.0000, Precision: 0.2538, Accuracy: 0.2613. F1: 0.4049
Training Loss: 0.9005, Validation Loss: 1.0744
----------------------------------------
Epoch 5/10
AlphaBetaGamma Model Evaluation - Loss: 1.1240, Recall: 1.0000, Precision: