In [1]:
import pickle
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR, CosineAnnealingLR
from sklearn.metrics import accuracy_score, f1_score

In [11]:
torch.cuda.is_available()

True

In [2]:
class MOSIDataset(Dataset):
    def __init__(self, data_path, split='train'):
        # Load the data from the pickle file
        with open(data_path, 'rb') as f:
            data = pickle.load(f)
        
        # Select the appropriate split
        self.vision = data[split]['vision']
        self.text = data[split]['text']
        self.audio = data[split]['audio']
        self.labels = data[split]['labels']
        self.ids = data[split]['id']

        # audio数据中存在坏点需要处理：
        self.audio[self.audio == float('inf')] = 0.0
        self.audio[self.audio == float('-inf')] = 0.0

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        # Extract the features and label for the given index
        vision = torch.tensor(self.vision[idx], dtype=torch.float32)
        text = torch.tensor(self.text[idx], dtype=torch.float32)
        audio = torch.tensor(self.audio[idx], dtype=torch.float32)
        label = torch.tensor(self.labels[idx], dtype=torch.float32).squeeze()

        return vision, text, audio, label

In [3]:
class TransformerEncoderBlock(nn.Module):
    def __init__(self, input_dim, d_model=128, nhead=4, num_layers=2, dropout=0.1):
        super().__init__()
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=256, dropout=dropout, batch_first=True)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.project = nn.Linear(input_dim, d_model)
        self.norm = nn.LayerNorm(input_dim)

    def forward(self, x):
        x = self.norm(x)
        x = F.relu(self.project(x))
        x = self.encoder(x)
        return x  # (batch, seq, d_model)

class AttentionFusion(nn.Module):
    def __init__(self, d_model=128):
        super().__init__()
        self.query = nn.Parameter(torch.randn(1, 1, d_model))  # Learnable query vector
        self.attn = nn.MultiheadAttention(d_model, num_heads=4, batch_first=True)

    def forward(self, v, t, a):
        # Concatenate: (B, 3, D)
        x = torch.stack([v[:, -1], t[:, -1], a[:, -1]], dim=1)
        q = self.query.expand(x.size(0), -1, -1)  # (B, 1, D)
        fused, _ = self.attn(q, x, x)
        return fused.squeeze(1)

class MultimodalSentimentAnalysisModel(nn.Module):
    def __init__(self):
        super().__init__()

        self.vision_encoder = TransformerEncoderBlock(35)
        self.text_encoder = TransformerEncoderBlock(300)
        self.audio_encoder = TransformerEncoderBlock(74)

        self.fusion = AttentionFusion(d_model=128)
        self.fc = nn.Linear(128, 1)

    def forward(self, vision, text, audio):
        vision_feat = self.vision_encoder(vision)
        text_feat = self.text_encoder(text)
        audio_feat = self.audio_encoder(audio)

        fused = self.fusion(vision_feat, text_feat, audio_feat)

        out = torch.sigmoid(self.fc(fused)) * 6 - 3
        return out


In [4]:
def eval_mosi_regression(y_pred, y_true, exclude_zero=False):
    test_preds = y_pred.view(-1).cpu().detach().numpy()
    test_truth = y_true.view(-1).cpu().detach().numpy()

    test_preds_a7 = np.clip(test_preds, a_min=-3., a_max=3.)
    test_truth_a7 = np.clip(test_truth, a_min=-3., a_max=3.)
    test_preds_a5 = np.clip(test_preds, a_min=-2., a_max=2.)
    test_truth_a5 = np.clip(test_truth, a_min=-2., a_max=2.)

    mae = np.mean(np.absolute(test_preds - test_truth))
    corr = np.corrcoef(test_preds, test_truth)[0][1]
    mult_a7 = multiclass_acc(test_preds_a7, test_truth_a7)
    mult_a5 = multiclass_acc(test_preds_a5, test_truth_a5)
    
    non_zeros = np.array([i for i, e in enumerate(test_truth) if e != 0])
    non_zeros_binary_truth = (test_truth[non_zeros] > 0)
    non_zeros_binary_preds = (test_preds[non_zeros] > 0)

    non_zeros_acc2 = accuracy_score(non_zeros_binary_preds, non_zeros_binary_truth)
    non_zeros_f1_score = f1_score(non_zeros_binary_preds, non_zeros_binary_truth, average='weighted')
    
    eval_results = {
        "Non0_acc_2":  round(non_zeros_acc2, 4),
        "Non0_F1_score": round(non_zeros_f1_score, 4),
        "Mult_acc_5": round(mult_a5, 4),
        "Mult_acc_7": round(mult_a7, 4),
        "MAE": round(mae, 4),
        "Corr": round(corr, 4)
    }
    return eval_results

def multiclass_acc(y_pred, y_true):
    y_pred = np.round(y_pred)
    y_true = np.round(y_true)

    acc = (y_pred == y_true).sum() / len(y_true)
    
    return acc


In [6]:
def train_model(model, train_loader, valid_loader, criterion, optimizer, scheduler, device, epochs):
    model.to(device)

    best_corr = 0.
    best_epoch = 0

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0

        for i, (vision, text, audio, labels) in enumerate(train_loader):
            vision, text, audio, labels = vision.to(device), text.to(device), audio.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(vision, text, audio)
            loss = criterion(outputs.view(-1), labels.view(-1))
            loss.backward()
            
            optimizer.step()
            scheduler.step()

            running_loss += loss.item()

        print(f'Epoch [{epoch+1}/{epochs}], Loss: {running_loss/len(train_loader):.4f}')

        val_corr = validate_model(model, valid_loader, criterion, device)

        if val_corr > best_corr:
            best_corr = val_corr
            best_epoch = epoch
            torch.save(model.state_dict(), 'best_model.pth')

    print(f"Best model saved with val_corr {best_corr} at epoch {best_epoch}.")


In [7]:
def validate_model(model, valid_loader, criterion, device):
    model.eval()
    valid_loss = 0.0

    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for vision, text, audio, labels in valid_loader:
            vision, text, audio, labels = vision.to(device), text.to(device), audio.to(device), labels.to(device)

            outputs = model(vision, text, audio)
            loss = criterion(outputs.squeeze(), labels.squeeze())

            valid_loss += loss.item()

            all_preds.append(outputs)
            all_labels.append(labels)

    print(f'Validation Loss: {valid_loss/len(valid_loader):.4f}')
    
    all_preds = torch.cat(all_preds, dim=0)
    all_labels = torch.cat(all_labels, dim=0)

    eval_results = eval_mosi_regression(all_preds, all_labels)
    print(eval_results)

    return eval_results["Corr"]

In [17]:
def main():
    
    seed = 42
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    print(device)
    
    criterion = nn.SmoothL1Loss()
    
    learning_rate = 1e-3
    epochs = 20
    
    model = model = MultimodalSentimentAnalysisModel()

    data_path = './mosi_raw.pkl'
    # add 初始化训练集和验证集的数据集类
    train_dataset = MOSIDataset(data_path, split='train')
    valid_dataset = MOSIDataset(data_path, split='valid')
    # add 初始化训练集和验证集的加载器，要求batch_size=16
    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    valid_loader = DataLoader(valid_dataset, batch_size=16, shuffle=False)

    # Initialize the optimizer and scheduler.
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = CosineAnnealingLR(optimizer, T_max=epochs*len(train_loader))

    train_model(model, train_loader, valid_loader, criterion, optimizer, scheduler, device, epochs)

    best_model_state = torch.load('best_model.pth')
    model.load_state_dict(best_model_state)

    test_dataset = MOSIDataset(data_path, split='test')
    test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)
    
    print("\n========== test results: ==========\n")
    validate_model(model, test_loader, criterion, device)

In [18]:
if __name__ == "__main__":
    main()

cuda:0
Epoch [1/20], Loss: 0.8023
Validation Loss: 0.7525
{'Non0_acc_2': 0.7313, 'Non0_F1_score': 0.7353, 'Mult_acc_5': np.float64(0.2336), 'Mult_acc_7': np.float64(0.2196), 'MAE': np.float32(1.1882), 'Corr': np.float64(0.5174)}
Epoch [2/20], Loss: 0.5454
Validation Loss: 0.5959
{'Non0_acc_2': 0.7711, 'Non0_F1_score': 0.7845, 'Mult_acc_5': np.float64(0.3551), 'Mult_acc_7': np.float64(0.3084), 'MAE': np.float32(1.0081), 'Corr': np.float64(0.6519)}
Epoch [3/20], Loss: 0.4668
Validation Loss: 0.4821
{'Non0_acc_2': 0.8159, 'Non0_F1_score': 0.8177, 'Mult_acc_5': np.float64(0.4112), 'Mult_acc_7': np.float64(0.3271), 'MAE': np.float32(0.8829), 'Corr': np.float64(0.7279)}
Epoch [4/20], Loss: 0.3585
Validation Loss: 0.4634
{'Non0_acc_2': 0.8308, 'Non0_F1_score': 0.834, 'Mult_acc_5': np.float64(0.4112), 'Mult_acc_7': np.float64(0.3318), 'MAE': np.float32(0.8624), 'Corr': np.float64(0.7579)}
Epoch [5/20], Loss: 0.3084
Validation Loss: 0.5424
{'Non0_acc_2': 0.7662, 'Non0_F1_score': 0.7637, 'Mult_a

  best_model_state = torch.load('best_model.pth')




Validation Loss: 0.7478
{'Non0_acc_2': 0.6616, 'Non0_F1_score': 0.6614, 'Mult_acc_5': np.float64(0.293), 'Mult_acc_7': np.float64(0.2741), 'MAE': np.float32(1.1563), 'Corr': np.float64(0.5464)}
