In [2]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split

In [3]:

class EEGDataset(Dataset):
    def __init__(self, data_dir, channels, timesteps):
        self.data_dir = data_dir # 数据集文件夹路径
        self.channels = channels # 要选择的 EEG 通道
        self.timesteps = timesteps  # 每个样本的时间步数

        self.samples = []  # 用于存储所有样本
        self.labels = []   # 用于存储每个样本的标签

        # 初始化标准化处理
        self.scaler = StandardScaler()

        # 遍历Control和MDD文件夹
        for label, class_name in enumerate(['Control', 'MDD']):
            class_dir = os.path.join(data_dir, class_name)
            for file_name in os.listdir(class_dir):
                if file_name.endswith('.npy'):
                    file_path = os.path.join(class_dir, file_name)
                    data = np.load(file_path)

                    # 选择指定的电极通道（去掉参考电极）
                    selected_data = data[self.channels, :]
                    
                    # 裁剪到最短的长度（根据数据的最短时间长度来裁剪）
                    min_length = min(selected_data.shape[1], 75089)
                    selected_data = selected_data[:, :min_length]  # 确保数据长度一致
                    
                    # 对数据进行标准化
                    selected_data = self.scaler.fit_transform(selected_data.T).T  # 按列进行标准化

                    # 将数据分成多个小样本，每个样本包含timesteps长度的时间序列数据
                    for i in range(0, selected_data.shape[1] - self.timesteps + 1, self.timesteps):
                        sample = selected_data[:, i:i + self.timesteps]  # 每次选择timesteps长度的数据片段
                        sample = np.array(sample, dtype=np.float32)
                        sample = sample.T

                        # 将numpy数组转换为PyTorch张量
                        sample = torch.from_numpy(sample).float()
                        self.samples.append(sample)  # 添加到样本列表
                        self.labels.append(label)  # 标签为0(Control)或1(MDD)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = self.samples[idx]
        label = self.labels[idx]
        return sample, label

In [4]:
data_dir = "../data/EEG_dataset"  # 数据路径
channels = list(range(128))  # 选择前128个电极通道
timesteps = 1000  # 每个样本的时间步长，例如每个样本包含1000个时间点

# 创建 EEG 数据集
dataset = EEGDataset(data_dir=data_dir, channels=channels, timesteps=timesteps)

In [5]:
print(len(dataset.samples)) #样本数量

3975


In [6]:
# 样本和标签
samples = dataset.samples
labels = dataset.labels

samples = np.array(samples)  # 转换为 NumPy 数组，确保它是一个多维数组

# 划分训练集和临时集（验证集+测试集）
X_train, X_temp, y_train, y_temp = train_test_split(samples, labels, test_size=0.3, random_state=42)

# 将临时集划分为验证集和测试集
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# 确保数据形状是合适的，转换为PyTorch张量
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)

X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val, dtype=torch.long)

X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# 创建DataLoader
train_data = TensorDataset(X_train_tensor, y_train_tensor)
val_data = TensorDataset(X_val_tensor, y_val_tensor)
test_data = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_loader = DataLoader(val_data, batch_size=32, shuffle=False)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

In [7]:
save_path='../data/Dataloader_1D'
if not os.path.exists(save_path):
    os.makedirs(save_path)

# 保存DataLoader的对象
torch.save(train_loader, os.path.join(save_path, 'train_loader.pth'))
torch.save(val_loader, os.path.join(save_path, 'val_loader.pth'))
torch.save(test_loader, os.path.join(save_path, 'test_loader.pth'))