This document is for quickly testing a binary classification model. The user can load the best binary classification model and quickly obtain the results.

In [None]:
import numpy as np
from sklearn.preprocessing import LabelEncoder
import pickle
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch.nn as nn
import torch.nn.functional as F

In [None]:
# Path for the preprocessed data
file_path = '/root/autodl-tmp/test_MOSEI_short_sbert_Hubert_long_features_with_labels.pkl'

# Load the file using pickle
with open(file_path, 'rb') as file:
    features3 = pickle.load(file)

# In experiments where data with label 0 is excluded, we only print entries with labels 'Positive' and 'Negative'
for i in range(0, len(features3), 3):  # Process in batches of 3 items at a time
    for j in range(3):
        if i + j >= len(features3):  # Check if there are enough items left
            break
        item = features3[i + j]
        video_id = item.get('video_id', 'N/A')
        clip_id = item.get('clip_id', 'N/A')
        label_1 = item.get('label_1', 'N/A')
        label = item.get('label', 'N/A')
        mode = item.get('mode', 'N/A')
        text = item.get('text', 'N/A')
        text_feature_shape = item['text_feature'].shape if 'text_feature' in item else 'N/A'
        audio_feature_shape = item['audio_feature'].shape if 'audio_feature' in item else 'N/A'
        vision_feature_shape = item['vision_feature_resnet'].shape if 'vision_feature_resnet' in item else 'N/A'

        if label in ['Positive', 'Negative']:
            print(f"Item {i + j}:")
            print(f"  video_id: {video_id}")
            print(f"  clip_id: {clip_id}")
            print(f"  label_1: {label_1}")
            print(f"  label: {label}")
            print(f"  mode: {mode}")
            print(f"  text: {text}")
            print(f"  text_feature_shape: {text_feature_shape}")
            print(f"  audio_feature_shape: {audio_feature_shape}")
            print(f"  vision_feature_shape: {vision_feature_shape}")


In [None]:
# Filter out data with 'Positive' or 'Negative' labels and 'test' mode
test_data = [item for item in features3 if item['mode'] == 'test' and item['label'] in ['Positive', 'Negative']]

# Get the labels from the test data
test_y = np.array([item['label'] for item in test_data])

# Get text features from the test data
test_X_text = [item['text_feature'] for item in test_data]

# Get audio features from the test data
test_X_audio = [item['audio_feature'] for item in test_data]

# Get vision features from the test data
test_X_vision = [item['vision_feature_resnet'] for item in test_data]


# Initialize label encoder
label_encoder = LabelEncoder()

# Transform labels in test_y to numeric labels
test_Y = label_encoder.fit_transform(test_y)


In [None]:
import torch

# Put all audio-feature data into a dictionary
text_features = {
    'test_audio': test_X_audio
}

# Iterate over the dictionary, convert to tensor and remove dimensions, then print the shape
for key, features in text_features.items():
    for i, feature in enumerate(features):
        tensor_feature = torch.tensor(feature)  # Convert feature to tensor
        tensor_feature = tensor_feature.squeeze(0)  # Remove the dimension of size 1 (assuming it's the first dimension)
        text_features[key][i] = tensor_feature  # Update the value in the dictionary
        print(f"Feature in {key} at index {i} has shape: {tensor_feature.shape}")

# Put all text_feature data into a dictionary
text_features = {
    'test_text': test_X_text
}
# Iterate over the dictionary, convert to tensor, remove dimensions, and print the shape
for key, features in text_features.items():
    for i, feature in enumerate(features):
        tensor_feature = torch.tensor(feature)  # Convert feature to tensor
        tensor_feature = tensor_feature.squeeze(0)  # Remove the dimension of size 1 (assuming it's the first dimension)
        text_features[key][i] = tensor_feature  # Update the value in the dictionary
        print(f"Feature in {key} at index {i} has shape: {tensor_feature.shape}")

# Put all vision_feature data into a dictionary
vision_features = {
    'test_vision': test_X_vision
}

# Iterate over the dictionary, convert to tensor, and print the shape
for key, features in vision_features.items():
    for i, feature in enumerate(features):
        tensor_feature = torch.tensor(feature)  # Convert feature to tensor
        vision_features[key][i] = tensor_feature  # Update the value in the dictionary
        print(f"Feature in {key} at index {i} has shape: {tensor_feature.shape}")

In [None]:
class ThreeModal_Dataset(Dataset):
    def __init__(self, text_features, audio_features, vision_features, labels):
        # Initialize the dataset with text, audio, vision features, and labels
        self.text_features = text_features
        self.audio_features = audio_features
        self.vision_features = vision_features
        self.labels = labels

    def __len__(self):
        # Return the length of the dataset (number of samples)
        return len(self.labels)

    def __getitem__(self, idx):
        # Return the features (text, audio, vision) and label for the given index
        text_features = self.text_features[idx]
        audio_features = self.audio_features[idx]
        vision_features = self.vision_features[idx]
        labels = self.labels[idx]
        return text_features, audio_features, vision_features, labels

In [None]:
def collate_fn(batch):
    # Separate audio features, text features, and labels from the batch
    text_features, audio_features, vision_features, labels = zip(*batch)
    
    # Pad the audio features and text features to the length of the longest sequence in the batch
    text_features_padded = pad_sequence(text_features, batch_first=True)    
    audio_features_padded = pad_sequence(audio_features, batch_first=True)
    vision_features_padded = pad_sequence(vision_features, batch_first=True)    
    
    # Convert labels to a tensor
    labels = torch.tensor(labels)

    return text_features_padded, audio_features_padded, vision_features_padded, labels


In [None]:
# Convert test labels to a tensor
test_Y_tensor = torch.tensor(test_Y)

# Create a dataset for the test data using ThreeModal_Dataset class
test_dataset = ThreeModal_Dataset(test_X_text, test_X_audio, test_X_vision, test_Y_tensor)

# Create a DataLoader for the test dataset with a batch size of 8, without shuffling, 
# and using 2 worker processes for loading the data, with a custom collate function
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=2, collate_fn=collate_fn)

In [None]:

class SimpleCrossAttentionClassifier(nn.Module):
    def __init__(self, text_dim, audio_dim, vision_dim, hidden_dim, num_heads, dropout_rate=0.1):
        super(SimpleCrossAttentionClassifier, self).__init__()

        self.num_heads = num_heads
        self.hidden_dim = hidden_dim
        self.dropout = nn.Dropout(dropout_rate)

        # Linear projection of text and audio/vision to map to a unified hidden dimension
        self.text_to_hidden = nn.Linear(text_dim, hidden_dim)
        self.audio_to_hidden = nn.Linear(audio_dim, hidden_dim)
        self.vision_to_hidden = nn.Linear(vision_dim, hidden_dim)

        # Cross-Attention between Text-Audio and Vision-Text
        self.cross_attention_text_audio = nn.MultiheadAttention(embed_dim=hidden_dim, num_heads=num_heads, dropout=dropout_rate)
        self.cross_attention_vision_text = nn.MultiheadAttention(embed_dim=hidden_dim, num_heads=num_heads, dropout=dropout_rate)

        # Classifier
        self.classifier = nn.Linear(hidden_dim * 2, 1)  # Concatenation of two Cross-Attention outputs

    def forward(self, text_features, audio_features, vision_features):
        # Project to hidden dimension
        text_hidden = self.text_to_hidden(text_features)  # (batch_size, seq_len, hidden_dim)
        audio_hidden = self.audio_to_hidden(audio_features)  # (batch_size, seq_len, hidden_dim)
        vision_hidden = self.vision_to_hidden(vision_features)  # (batch_size, seq_len, hidden_dim)

        # Reshape to (seq_len, batch_size, hidden_dim) to fit MultiheadAttention
        text_hidden = text_hidden.permute(1, 0, 2)
        audio_hidden = audio_hidden.permute(1, 0, 2)
        vision_hidden = vision_hidden.permute(1, 0, 2)

        # Text-Audio Cross-Attention
        text_audio_output, _ = self.cross_attention_text_audio(query=text_hidden, key=audio_hidden, value=audio_hidden)
        text_audio_output = self.dropout(text_audio_output)  # Apply Dropout
        text_audio_output = text_audio_output.permute(1, 0, 2)  # Reshape back to (batch_size, seq_len, hidden_dim)

        # Vision-Text Cross-Attention
        vision_text_output, _ = self.cross_attention_vision_text(query=vision_hidden, key=text_hidden, value=text_hidden)
        vision_text_output = self.dropout(vision_text_output)  # Apply Dropout
        vision_text_output = vision_text_output.permute(1, 0, 2)  # Reshape back to (batch_size, seq_len, hidden_dim)

        # Contrastive learning - using InfoNCE loss
        batch_size = text_audio_output.size(0)

        # Normalize the output with L2 normalization to compute cosine similarity
        text_audio_norm = F.normalize(text_audio_output.mean(dim=1), p=2, dim=-1)
        vision_text_norm = F.normalize(vision_text_output.mean(dim=1), p=2, dim=-1)

        # Compute the similarity matrix
        similarity_matrix = torch.matmul(text_audio_norm, vision_text_norm.T)

        # Compute similarity for positive samples (diagonal)
        positive_samples = torch.diag(similarity_matrix)

        # Compute InfoNCE loss
        temperature = 0.07  # Temperature parameter, adjustable
        similarity_matrix /= temperature
        positive_samples /= temperature

        # InfoNCE loss computation
        contrastive_loss = -torch.log(
            torch.exp(positive_samples) / torch.exp(similarity_matrix).sum(dim=1)
        ).mean()

        # Pooling operation: take the average across the sequence dimension
        text_audio_pooled = text_audio_output.mean(dim=1)  # (batch_size, hidden_dim)
        vision_text_pooled = vision_text_output.mean(dim=1)  # (batch_size, hidden_dim)

        # Concatenate Text-Audio and Vision-Text Cross-Attention outputs
        combined_output = torch.cat((text_audio_pooled, vision_text_pooled), dim=1)  # (batch_size, hidden_dim * 2)

        # Classification
        output = self.classifier(combined_output)
        output = torch.sigmoid(output)  # Binary classification

        return output, contrastive_loss

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
import numpy as np
import torch

# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SimpleCrossAttentionClassifier(
    text_dim=768,
    audio_dim=1024,
    vision_dim=2048,
    hidden_dim=256,
    num_heads=8,
    dropout_rate=0.01
).to(device)
# Load the best model weights
best_model_path = '/root/autodl-tmp/best_model_three_modal.pth' #You can enter the local path of the best model.
model.load_state_dict(torch.load(best_model_path))
model.to(device)
model.eval()  # Set the model to evaluation mode

# Initialize lists to collect true labels and predicted labels
true_labels = []
predicted_labels = []

# Evaluate the model on the test data
with torch.no_grad():
    for audio_features, text_features, vision_features, labels in test_loader:  # Assuming you have vision_features
        # Move audio, text, vision features, and labels to GPU
        audio_features = audio_features.to(device)
        text_features = text_features.to(device)
        vision_features = vision_features.to(device)  # Includes vision features
        labels = labels.to(device)

        # Forward pass
        outputs, _ = model(audio_features, text_features, vision_features)  # If the model has contrastive learning part, we ignore the loss

        # Get predictions and convert to numpy array
        predicted = torch.sigmoid(outputs.squeeze()).round().cpu().numpy()  # Binary classification, predictions are 0 or 1
        predicted_labels.extend(predicted)
        
        # Convert true labels to numpy array
        true_labels.extend(labels.cpu().numpy())

# Convert lists to NumPy arrays
true_labels = np.array(true_labels)
predicted_labels = np.array(predicted_labels)

# Calculate metrics
accuracy = accuracy_score(true_labels, predicted_labels)
precision = precision_score(true_labels, predicted_labels, average='binary')
recall = recall_score(true_labels, predicted_labels, average='binary')
f1 = f1_score(true_labels, predicted_labels, average='binary')

# Compute confusion matrix
cm = confusion_matrix(true_labels, predicted_labels)

# Prevent division by zero and calculate UA (Unweighted Average) and WA (Weighted Average)
with np.errstate(divide='ignore', invalid='ignore'):
    per_class_accuracy = np.diag(cm) / np.sum(cm, axis=1)
    per_class_accuracy = np.nan_to_num(per_class_accuracy)  # Convert NaN to 0 to avoid computation errors
    ua = np.mean(per_class_accuracy)  # Unweighted Average (UA)

wa = accuracy  # Weighted Average (WA), for binary classification, weighted average equals overall accuracy

# Print results
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"Unweighted Average (UA): {ua:.4f}")
print(f"Weighted Average (WA): {wa:.4f}")

# Print detailed classification report
print("\nDetailed classification report:")
print(classification_report(true_labels, predicted_labels, target_names=['Class 0', 'Class 1']))

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
import numpy as np
import torch

# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SimpleCrossAttentionClassifier(
    text_dim=768,
    audio_dim=1024,
    vision_dim=2048,
    hidden_dim=256,
    num_heads=8,
    dropout_rate=0.01
).to(device)
# Load the best model weights
best_model_path = '/root/autodl-tmp/best_model_three_modal.pth'
model.load_state_dict(torch.load(best_model_path))
model.to(device)
model.eval()  # Set the model to evaluation mode

# Initialize lists to collect true labels and predicted labels
true_labels = []
predicted_labels = []

# Evaluate the model on the test data
with torch.no_grad():
    for audio_features, text_features, vision_features, labels in test_loader:  # Assuming you have vision_features
        # Move audio, text, vision features, and labels to GPU
        audio_features = audio_features.to(device)
        text_features = text_features.to(device)
        vision_features = vision_features.to(device)  # Includes vision features
        labels = labels.to(device)

        # Forward pass
        outputs, _ = model(audio_features, text_features, vision_features)  # If the model has contrastive learning part, we ignore the loss

        # Get predictions and convert to numpy array
        predicted = torch.sigmoid(outputs.squeeze()).round().cpu().numpy()  # Binary classification, predictions are 0 or 1
        predicted_labels.extend(predicted)
        
        # Convert true labels to numpy array
        true_labels.extend(labels.cpu().numpy())

# Convert lists to NumPy arrays
true_labels = np.array(true_labels)
predicted_labels = np.array(predicted_labels)

# Calculate metrics
accuracy = accuracy_score(true_labels, predicted_labels)
precision = precision_score(true_labels, predicted_labels, average='binary')
recall = recall_score(true_labels, predicted_labels, average='binary')
f1 = f1_score(true_labels, predicted_labels, average='binary')

# Compute confusion matrix
cm = confusion_matrix(true_labels, predicted_labels)

# Prevent division by zero and calculate UA (Unweighted Average) and WA (Weighted Average)
with np.errstate(divide='ignore', invalid='ignore'):
    per_class_accuracy = np.diag(cm) / np.sum(cm, axis=1)
    per_class_accuracy = np.nan_to_num(per_class_accuracy)  # Convert NaN to 0 to avoid computation errors
    ua = np.mean(per_class_accuracy)  # Unweighted Average (UA)

wa = accuracy  # Weighted Average (WA), for binary classification, weighted average equals overall accuracy

# Print results
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"Unweighted Average (UA): {ua:.4f}")
print(f"Weighted Average (WA): {wa:.4f}")

# Print detailed classification report
print("\nDetailed classification report:")
print(classification_report(true_labels, predicted_labels, target_names=['Class 0', 'Class 1']))