In [1]:

import pickle
import numpy as np
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from scipy import signal
from scipy.fft import fft
import matplotlib.pyplot as plt
import seaborn as sns

torch.manual_seed(42)
np.random.seed(42)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")


data_path = '/kaggle/input/deap123'
all_data = []
labels = []
for file in os.listdir(data_path):
    if file.endswith('.dat'):
        with open(os.path.join(data_path, file), 'rb') as f:
            subject_data = pickle.load(f, encoding='latin1')
            all_data.append(subject_data['data'][:, :32, :])
            labels.append(subject_data['labels'])
data = np.concatenate(all_data, axis=0)
labels = np.concatenate(labels, axis=0)
valence = labels[:, 0]
arousal = labels[:, 1]
valence_labels = (valence > 5).astype(np.int32)
arousal_labels = (arousal > 5).astype(np.int32)
fs = 128
freq_bands = [(4, 8), (8, 13), (13, 30), (30, 45)]

# 计算多种特征
def differential_entropy(data, axis=-1):
    std = np.std(data, axis=axis, keepdims=True) + 1e-8
    de = 0.5 * np.log(2 * np.pi * np.e * std**2)
    return de.squeeze()

de_data = []
psd_data = []
time_data = []
for low, high in freq_bands:
    b, a = signal.butter(4, [low, high], btype='band', fs=fs)
    filtered = signal.filtfilt(b, a, data, axis=2)
    de = differential_entropy(filtered, axis=2)
    fft_data = np.abs(fft(filtered, axis=2))[:, :, :64]
    psd = np.mean(fft_data ** 2, axis=-1)
    time_mean = np.mean(filtered, axis=2)
    de_data.append(de)
    psd_data.append(psd)
    time_data.append(time_mean)
de_data = np.stack(de_data, axis=1)
psd_data = np.stack(psd_data, axis=1)
time_data = np.stack(time_data, axis=1)

# 分段为3秒片段
segment_len = fs * 3
segments = []
de_segments = []
psd_segments = []
time_segments = []
for i in range(de_data.shape[0]):
    trial = data[i]
    de_trial = de_data[i]
    psd_trial = psd_data[i]
    time_trial = time_data[i]
    for j in range(0, trial.shape[-1] - segment_len + 1, segment_len):
        segments.append(trial[:, j:j + segment_len])
        de_segments.append(de_trial)
        psd_segments.append(psd_trial)
        time_segments.append(time_trial)
segments = np.array(segments)
de_segments = np.array(de_segments)
psd_segments = np.array(psd_segments)
time_segments = np.array(time_segments)
valence_labels = np.repeat(valence_labels, 8064 // segment_len)
arousal_labels = np.repeat(arousal_labels, 8064 // segment_len)

# 标准化
de_segments = (de_segments - de_segments.mean()) / (de_segments.std() + 1e-8)
psd_segments = (psd_segments - psd_segments.mean()) / (psd_segments.std() + 1e-8)
time_segments = (time_segments - time_segments.mean()) / (time_segments.std() + 1e-8)

# 脑区划分
regions = {
    'Frontal': [0, 1, 2, 3, 16, 17, 18, 19],
    'Prefrontal': [4, 5, 20, 21],
    'Parietal': [6, 7, 8, 9, 22, 23],
    'Temporal': [10, 11, 12, 13, 24, 25],
    'Occipital': [14, 15, 26, 27, 28, 29]
}
num_nodes = len(regions)  # 5

# 提取区域特征
def extract_region_features(de_data, psd_data, time_data, regions):
    region_de = []
    region_psd = []
    region_time = []
    for region in regions.values():
        region_de.append(de_data[:, :, region].mean(axis=-1))
        region_psd.append(psd_data[:, :, region].mean(axis=-1))
        region_time.append(time_data[:, :, region].mean(axis=-1))
    return np.stack(region_de, axis=2), np.stack(region_psd, axis=2), np.stack(region_time, axis=2)

region_de, region_psd, region_time = extract_region_features(de_segments, psd_segments, time_segments, regions)
print(f"region_de shape: {region_de.shape}, region_psd shape: {region_psd.shape}, region_time shape: {region_time.shape}")

# GCN层定义
class GCNLayer(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.weight = nn.Parameter(torch.FloatTensor(in_features, out_features))
        self.bn = nn.BatchNorm1d(out_features)
        nn.init.xavier_normal_(self.weight)
    
    def forward(self, x, adj):
        batch_size = x.size(0)
        support = torch.bmm(x, self.weight.unsqueeze(0).expand(batch_size, -1, -1))
        adj_batch = adj.unsqueeze(0).expand(batch_size, -1, -1)  # 扩展为 [batch_size, num_nodes, num_nodes]
        output = torch.bmm(adj_batch, support)
        output = output.transpose(1, 2).contiguous()
        output = self.bn(output).transpose(1, 2)
        return output

# Transformer 层
class TransformerLayer(nn.Module):
    def __init__(self, d_model, nhead=4):
        super().__init__()
        self.attn = nn.MultiheadAttention(d_model, nhead)
        self.norm = nn.LayerNorm(d_model)
        self.ff = nn.Sequential(
            nn.Linear(d_model, d_model * 2),
            nn.ReLU(),
            nn.Linear(d_model * 2, d_model)
        )
    
    def forward(self, x):
        x = x.transpose(0, 1)  # [nodes, batch, features]
        attn_output, _ = self.attn(x, x, x)
        x = self.norm(x + attn_output)
        ff_output = self.ff(x)
        x = self.norm(x + ff_output)
        return x.transpose(0, 1)

# 自适应特征选择模块
class FeatureSelector(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.weights = nn.Parameter(torch.ones(in_features))
        self.fc = nn.Linear(in_features, out_features)
    
    def forward(self, x):
        weights = F.softmax(self.weights, dim=0)
        x = x * weights.view(1, 1, -1)  # 自适应加权
        return F.relu(self.fc(x))


class GraphTimeNet(nn.Module):
    def __init__(self, in_features=12, hidden_features=64, num_classes=2):
        super().__init__()
        self.selector = FeatureSelector(in_features, in_features // 2)  # 12 -> 6
        self.gcn1 = GCNLayer(in_features // 2, hidden_features)
        self.gcn2 = GCNLayer(hidden_features, hidden_features // 2)
        self.transformer = TransformerLayer(hidden_features // 2)
        self.fc_valence = nn.Linear((hidden_features // 2) * num_nodes, num_classes)
        self.fc_arousal = nn.Linear((hidden_features // 2) * num_nodes, num_classes)
        self.dropout = nn.Dropout(0.3)
    
    def forward(self, x_de, x_psd, x_time, adj):
        x = torch.cat([x_de, x_psd, x_time], dim=-1)  # [batch_size, num_nodes, 12]
        x = self.selector(x)  # [batch_size, num_nodes, 6]
        x = F.relu(self.gcn1(x, adj))
        x = F.relu(self.gcn2(x, adj))
        x = self.transformer(x)
        x = x.contiguous().view(x.size(0), -1)
        x = self.dropout(x)
        valence_out = self.fc_valence(x)
        arousal_out = self.fc_arousal(x)
        return valence_out, arousal_out


X_de = torch.tensor(region_de, dtype=torch.float32).to(device)
X_psd = torch.tensor(region_psd, dtype=torch.float32).to(device)
X_time = torch.tensor(region_time, dtype=torch.float32).to(device)
y_valence = torch.tensor(valence_labels, dtype=torch.long).to(device)
y_arousal = torch.tensor(arousal_labels, dtype=torch.long).to(device)
print(f"X_de shape: {X_de.shape}, X_psd shape: {X_psd.shape}, X_time shape: {X_time.shape}")

# 构建固定邻接矩阵
adj = torch.ones(num_nodes, num_nodes, device=device) - torch.eye(num_nodes, device=device)
adj = adj / (adj.sum(dim=1, keepdim=True) + 1e-8)


def train_and_evaluate(X_de, X_psd, X_time, y_valence, y_arousal):
    X_de_train, X_de_test, X_psd_train, X_psd_test, X_time_train, X_time_test, \
    y_val_train, y_val_test, y_aro_train, y_aro_test = train_test_split(
        X_de, X_psd, X_time, y_valence, y_arousal, test_size=0.2, random_state=42, shuffle=True
    )
    train_dataset = torch.utils.data.TensorDataset(X_de_train, X_psd_train, X_time_train, y_val_train, y_aro_train)
    test_dataset = torch.utils.data.TensorDataset(X_de_test, X_psd_test, X_time_test, y_val_test, y_aro_test)
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)
    
    model = GraphTimeNet().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
    criterion = nn.CrossEntropyLoss()
    
    train_losses = []
    val_accuracies = []
    aro_accuracies = []
    best_val_acc = 0
    best_aro_acc = 0
    
    for epoch in range(200):
        model.train()
        train_loss = 0
        for x_de_batch, x_psd_batch, x_time_batch, y_val_batch, y_aro_batch in train_loader:
            optimizer.zero_grad()
            val_out, aro_out = model(x_de_batch.transpose(1, 2), x_psd_batch.transpose(1, 2), x_time_batch.transpose(1, 2), adj)
            loss_val = criterion(val_out, y_val_batch)
            loss_aro = criterion(aro_out, y_aro_batch)
            loss = 0.5 * (loss_val + loss_aro)  # 多任务损失平衡
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        train_losses.append(train_loss / len(train_loader))
        
        model.eval()
        val_correct = 0
        aro_correct = 0
        total = 0
        with torch.no_grad():
            for x_de_batch, x_psd_batch, x_time_batch, y_val_batch, y_aro_batch in test_loader:
                val_out, aro_out = model(x_de_batch.transpose(1, 2), x_psd_batch.transpose(1, 2), x_time_batch.transpose(1, 2), adj)
                val_pred = val_out.argmax(dim=1)
                aro_pred = aro_out.argmax(dim=1)
                val_correct += (val_pred == y_val_batch).sum().item()
                aro_correct += (aro_pred == y_aro_batch).sum().item()
                total += y_val_batch.size(0)
        val_acc = val_correct / total
        aro_acc = aro_correct / total
        val_accuracies.append(val_acc)
        aro_accuracies.append(aro_acc)
        
        if val_acc > best_val_acc:
            best_val_acc = val_acc
        if aro_acc > best_aro_acc:
            best_aro_acc = aro_acc
        
        print(f"Epoch {epoch+1}: Train Loss: {train_losses[-1]:.4f}, Val Acc: {val_acc:.4f}, Aro Acc: {aro_acc:.4f}")
    
    print(f"Best Valence Test Accuracy: {best_val_acc:.4f}")
    print(f"Best Arousal Test Accuracy: {best_aro_acc:.4f}")
    return best_val_acc, best_aro_acc

print("Training Multi-Task Model:")
val_acc, aro_acc = train_and_evaluate(X_de, X_psd, X_time, y_valence, y_arousal)





Using device: cuda
region_de shape: (26880, 4, 5), region_psd shape: (26880, 4, 5), region_time shape: (26880, 4, 5)
X_de shape: torch.Size([26880, 4, 5]), X_psd shape: torch.Size([26880, 4, 5]), X_time shape: torch.Size([26880, 4, 5])
Training Multi-Task Model:
Epoch 1: Train Loss: 0.6934, Val Acc: 0.5539, Aro Acc: 0.5971
Epoch 2: Train Loss: 0.6769, Val Acc: 0.5653, Aro Acc: 0.5476
Epoch 3: Train Loss: 0.6718, Val Acc: 0.5618, Aro Acc: 0.6090
Epoch 4: Train Loss: 0.6685, Val Acc: 0.5763, Aro Acc: 0.5999
Epoch 5: Train Loss: 0.6666, Val Acc: 0.5634, Aro Acc: 0.6287
Epoch 6: Train Loss: 0.6629, Val Acc: 0.5809, Aro Acc: 0.6153
Epoch 7: Train Loss: 0.6600, Val Acc: 0.5858, Aro Acc: 0.6137
Epoch 8: Train Loss: 0.6588, Val Acc: 0.5761, Aro Acc: 0.5921
Epoch 9: Train Loss: 0.6559, Val Acc: 0.6027, Aro Acc: 0.6334
Epoch 10: Train Loss: 0.6531, Val Acc: 0.6150, Aro Acc: 0.6168
Epoch 11: Train Loss: 0.6470, Val Acc: 0.5547, Aro Acc: 0.6254
Epoch 12: Train Loss: 0.6441, Val Acc: 0.6071, Aro Ac