# EEG SSVEP Test

## 一、Dataset类创建

In [7]:
from typing import Any, Callable, Dict, List, Optional, Tuple
from torchvision.datasets import MNIST
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import torch
import torch.nn as nn
import numpy as np
import scipy
from scipy import signal
from tqdm import tqdm

class Benchmark(Dataset):
    
    classes = {
        
    }
    
    stim_event_freq = [8., 8.2, 8.4, 8.6, 8.8, 9., 9.2, 9.4, 9.6, 9.8, 10., 10.2, 10.4, 10.6,
                       10.8, 11., 11.2, 11.4, 11.6, 11.8, 12., 12.2, 12.4, 12.6, 12.8, 13., 13.2, 13.4,
                       13.6, 13.8, 14., 14.2, 14.4, 14.6, 14.8, 15., 15.2, 15.4, 15.6, 15.8]
    
    def __init__(
        self,
        root: str = '',
        train: bool = True,
        transform: Optional[Callable] = None,
        target_transform: Optional[Callable] = None,
    ) -> None:
        super(Dataset).__init__()
        self.root = root
        self.train = train
        self.transform = transform
        self.target_transform = target_transform
        self.subject_num = 35
        # 采样率1000，降采样至 250Hz
        self.samp_rate = 250
        # 预处理滤波器设置
        '''没看懂'''
        self.filterB, self.filterA = self.__get_pre_filter(self.samp_rate)
        self.data, self.pre_data, self.label = self.load_data()
    
    def load_data(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        channels = [53, 54, 55, 57, 58, 59, 61, 62, 63]
        channels = [i - 1 for i in channels]
        
        if self.train:
            # train data
            print("---------- 训练数据加载 ----------")            
            data = np.zeros((200*self.subject_num, len(channels), 1375))
            pre_data = np.zeros((200*self.subject_num, len(channels), 125))
            label = np.zeros(200*self.subject_num, dtype=int)
        else:
            # test data
            print("---------- 测试数据加载 ----------")
            data = np.zeros((40*self.subject_num, len(channels), 1375))
            pre_data = np.zeros((40*self.subject_num, len(channels), 125))
            label = np.zeros(40*self.subject_num, dtype=int)
            
        for sub_num in tqdm(range(1, self.subject_num+1)):
            f = scipy.io.loadmat(self.root + f"/S{sub_num}.mat")
            # print(f"mat{sub_num}文件大小: {f['data'].shape}")
            for block in range(6):
                for target in range(40):
                    if self.train and block!=5:
                        data[(sub_num - 1) * 200 + block * 40 + target] = f["data"][channels, 125:, target, block]
                        pre_data[(sub_num - 1) * 200 + block * 40 + target] = f["data"][channels, :125, target, block]
                        label[(sub_num - 1) * 200 + block * 40 + target] = int(target)
                    elif not self.train and block==5:
                        data[(sub_num - 1) * 40 + target] = f["data"][channels, 125:, target, block]
                        pre_data[(sub_num - 1) * 40 + target] = f["data"][channels, :125, target, block]
                        label[(sub_num - 1) * 40 + target] = int(target)
        return data, pre_data, label
    
    def __get_pre_filter(self, samp_rate):
        fs = samp_rate
        f0 = 50
        q = 35
        b, a = signal.iircomb(f0, q, ftype='notch', fs=fs)
        return b, a
    
    def __preprocess(self, data):
        filter_data = signal.filtfilt(self.filterB, self.filterA, data)
        return filter_data
    
    
    def __len__(self) -> int:
        return len(self.data)
    
    def __getitem__(self, index) -> Tuple[Any, Any]:
        eeg, target = self.data[index], self.label[index]
        
        # 滤波处理
        eeg = self.__preprocess(eeg)
        
        if self.transform is not None:
            eeg = self.transform(eeg.copy())
            
        if self.target_transform is not None:
            target = self.target_transform(target.copy())
        
        eeg = eeg.float()
        return eeg, target


In [8]:
# torch.set_default_dtype(torch.float64)
train_data = Benchmark("E:\Datasets\BCI\SSVEP\Benchmark", train = True, 
                       transform = transforms.Compose([
                           transforms.ToTensor(),
                       ]))
test_data = Benchmark("E:\Datasets\BCI\SSVEP\Benchmark", train = False, 
                       transform = transforms.Compose([
                           transforms.ToTensor(),
                       ]))

---------- 训练数据加载 ----------


100%|██████████| 35/35 [00:19<00:00,  1.78it/s]


---------- 测试数据加载 ----------


100%|██████████| 35/35 [00:19<00:00,  1.82it/s]


In [9]:
print("train_data:\n", len(train_data))
print(f"shape: {train_data[0][0].shape}, type: {type(train_data[0][0])}")

print("test_data:\n", len(test_data))
print(f"shape: {test_data[0][0].shape}, type: {type(test_data[0][0])}")

print(train_data[2][1])

train_data:
 7000
shape: torch.Size([1, 9, 1375]), type: <class 'torch.Tensor'>
test_data:
 1400
shape: torch.Size([1, 9, 1375]), type: <class 'torch.Tensor'>
2


## 二、EEGNet构建

In [10]:
from torchsummary import summary
import time

device = 'cuda' if torch.cuda.is_available() else 'cpu'

class DepthwiseSeparableConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, depth_multiplier=1):
        super(DepthwiseSeparableConv2d, self).__init__()
        self.depthwise = nn.Conv2d(in_channels, in_channels * depth_multiplier, kernel_size=kernel_size,
                                   stride=(1, 1), padding=(0, 0 if kernel_size[0]>kernel_size[1] else max(kernel_size)//2), groups=in_channels, bias=False)
        self.pointwise = nn.Conv2d(in_channels * depth_multiplier, out_channels, kernel_size=(1, 1),
                                   stride=(1, 1), padding=(0, 0), bias=False)

    def forward(self, x):
        x = self.depthwise(x)
        x = self.pointwise(x)
        return x


class EEGNet(nn.Module):
    def __init__(self, nb_classes, Chans=64, Samples=128, dropoutRate=0.5, kernLength=64,
                 F1=8, D=2, F2=16, norm_rate=0.25, dropoutType='Dropout'):
        super(EEGNet, self).__init__()

        if dropoutType == 'SpatialDropout2D':
            self.dropoutType = nn.Dropout2d
        elif dropoutType == 'Dropout':
            self.dropoutType = nn.Dropout
        else:
            raise ValueError('dropoutType must be one of SpatialDropout2D '
                             'or Dropout, passed as a string.')

        self.block1 = nn.Sequential(
            nn.Conv2d(1, F1, (1, kernLength), padding=(0, kernLength//2), bias=False),
            nn.BatchNorm2d(F1),
            DepthwiseSeparableConv2d(F1, F1, kernel_size=(Chans, 1), depth_multiplier=D),
            nn.BatchNorm2d(F1),
            nn.ELU(),
            nn.AvgPool2d((1, 4)),
            self.dropoutType(dropoutRate)
        )

        self.block2 = nn.Sequential(
            DepthwiseSeparableConv2d(F1, F2, kernel_size=(1, 16), depth_multiplier=1),
            nn.BatchNorm2d(F2),
            nn.ELU(),
            nn.AvgPool2d((1, 8)),
            self.dropoutType(dropoutRate)
        )

        self.block3 = nn.Sequential(
            nn.Flatten(),
            nn.Linear(F2*int(np.floor((np.floor((Samples+1)/4)+1)/8)), nb_classes),
            nn.Softmax(dim=1)
        )

    def forward(self, input):
        x = self.block1(input)
        x = self.block2(x)
        x = self.block3(x)
        return x

learning_rate = 1e-3
nb_classes = 40
Chans = 9
Samples = 1375
model = EEGNet(nb_classes, Chans, Samples)
model = model.to(device)
# torch.set_default_dtype(torch.float64)
# print(model.state_dict()['block1.0.weight'].dtype)
# print(device)
print(summary(model, input_size=(1, 9, 1375), device='cuda'))


----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 8, 9, 1376]             512
       BatchNorm2d-2           [-1, 8, 9, 1376]              16
            Conv2d-3          [-1, 16, 1, 1376]             144
            Conv2d-4           [-1, 8, 1, 1376]             128
DepthwiseSeparableConv2d-5           [-1, 8, 1, 1376]               0
       BatchNorm2d-6           [-1, 8, 1, 1376]              16
               ELU-7           [-1, 8, 1, 1376]               0
         AvgPool2d-8            [-1, 8, 1, 344]               0
           Dropout-9            [-1, 8, 1, 344]               0
           Conv2d-10            [-1, 8, 1, 345]             128
           Conv2d-11           [-1, 16, 1, 345]             128
DepthwiseSeparableConv2d-12           [-1, 16, 1, 345]               0
      BatchNorm2d-13           [-1, 16, 1, 345]              32
              ELU-14      

In [11]:
from torch.utils.tensorboard import SummaryWriter
import os
# os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

# tensorboard 记录训练结果
writer = SummaryWriter("./logs_train")

# dataloader
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=16, shuffle=False)

# 损失函数
criterion = nn.CrossEntropyLoss()

# 优化器
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, foreach=False)

# Training Loop
start_time = time.time()
total_train_step = 0
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    print(f"---------- EPOCH {epoch+1} ----------\n")
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        # 优化器清除梯度
        optimizer.zero_grad()
        outputs = model(inputs)
        # 交叉熵计算损失
        loss = criterion(outputs, labels.long())
        # 优化器优化模型
        loss.backward()
        optimizer.step()
        # 误差分析
        total_train_step += 1
        if total_train_step % 100 == 0:
            end_time = time.time()
            print(f"{end_time-start_time}\t 训练次数：{total_train_step}, Loss：{loss}")
            writer.add_scalar("train_loss", loss.item(), total_train_step)
            
# 测试
model.eval()
with torch.no_grad():
    total_correct = 0        
    total_samples = 0
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        values, predicted = torch.max(outputs, dim=1)
        total_correct += (predicted == labels).sum().item()
        total_samples += len(labels)
    
    accuracy = total_correct / total_samples
    print(f"Test Accuracy: {accuracy:.4f}")

    torch.save(model, f".\Weights/eeg_gpu.pth")
    # torch.save(light.state_dict(), f"Weights/light_{epoch}.pth")
    print("模型已保存")
writer.close()

---------- EPOCH 1 ----------
0.8241136074066162	 训练次数：100, Loss：3.69568133354187
1.420565128326416	 训练次数：200, Loss：3.6918246746063232
2.025568723678589	 训练次数：300, Loss：3.6824088096618652
2.6121058464050293	 训练次数：400, Loss：3.658505439758301
---------- EPOCH 2 ----------
3.2184486389160156	 训练次数：500, Loss：3.3628623485565186
3.818922281265259	 训练次数：600, Loss：3.4165542125701904
4.409793376922607	 训练次数：700, Loss：3.315293312072754
4.999252080917358	 训练次数：800, Loss：3.1848251819610596
---------- EPOCH 3 ----------

5.563786268234253	 训练次数：900, Loss：3.342639207839966
6.145760536193848	 训练次数：1000, Loss：3.3122804164886475
6.715238571166992	 训练次数：1100, Loss：3.366192579269409
7.293615102767944	 训练次数：1200, Loss：3.1675117015838623
7.869288444519043	 训练次数：1300, Loss：3.445911407470703
---------- EPOCH 4 ----------
8.412700414657593	 训练次数：1400, Loss：3.4322941303253174
8.95007061958313	 训练次数：1500, Loss：3.1146395206451416
9.499636888504028	 训练次数：1600, Loss：3.17037034034729
10.048905849456787	 训练次数：1700, 

In [None]:
# 测试集
model.eval()
with torch.no_grad():
    total_correct = 0
    total_samples = 0
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        values, predicted = torch.max(outputs, dim=1)
        total_correct += (predicted == labels).sum().item()
        total_samples += len(labels)
    
    accuracy = total_correct / total_samples
    print(f"Test Accuracy: {accuracy:.4f}")

    torch.save(model, f".\Weights/eeg_gpu.pth")
    # torch.save(light.state_dict(), f"Weights/light_{epoch}.pth")
    print("模型已保存")
writer.close()

In [290]:
print(inputs.shape)
print(type(inputs))
# print(inputs)
# inputs = torch.tensor(inputs, requires_grad=True)
inputs.requires_grad
inputs.retain_grad()
limit = max(inputs.min().abs(), inputs.max().abs())
print(inputs*limit)

torch.Size([8, 1, 9, 1375])
<class 'torch.Tensor'>
tensor([[[[-1.5510e+00,  7.4966e+01,  1.0219e+02,  ..., -3.0856e+02,
           -3.0519e+02, -4.5943e+02],
          [ 6.8855e+00,  4.2774e+01,  1.3900e+01,  ..., -4.0390e+02,
           -3.7533e+02, -4.8778e+02],
          [-9.6577e+00,  1.2393e+02,  1.1500e+02,  ..., -4.3115e+02,
           -4.5605e+02, -5.1908e+02],
          ...,
          [-1.2002e+02,  3.3730e+01,  3.5003e+01,  ..., -4.0603e+02,
           -4.3409e+02, -6.0819e+02],
          [-2.2596e+02, -7.0180e+01,  7.1042e+01,  ..., -4.2841e+02,
           -3.1199e+02, -6.0364e+02],
          [-1.5700e+02, -1.3420e+02,  2.4541e+02,  ..., -6.6344e+02,
           -2.2165e+02, -4.9569e+02]]],


        [[[-5.0854e+01, -3.0689e+02, -2.5286e+01,  ..., -9.7597e+01,
           -1.3549e+01,  1.3606e+02],
          [-4.5244e+01, -3.0550e+02, -3.0624e-01,  ..., -1.9304e+02,
           -6.7049e+01,  8.9986e+01],
          [-7.5914e+01, -3.4179e+02, -1.2330e+01,  ..., -2.1149e+02,
     

In [301]:
a1 = torch.tensor(np.random.randint(1000, 1050, size=(2, 2, 2))).long()
b1 = torch.tensor(np.random.randint(1, 10, size=(2, 2, 2))).long()
print(a1, "\n", b1)
c1 = a1-b1
print(c1)
print((c1-a1).float().abs().mean())

tensor([[[1012, 1012],
         [1039, 1004]],

        [[1013, 1000],
         [1046, 1046]]]) 
 tensor([[[2, 5],
         [2, 9]],

        [[9, 5],
         [6, 8]]])
tensor([[[1010, 1007],
         [1037,  995]],

        [[1004,  995],
         [1040, 1038]]])
tensor(5.7500)


In [262]:
outputs = model(inputs)
print(labels)
print(labels.long().dtype)
# 交叉熵计算损失
loss = criterion(outputs, labels.long())
print(inputs.min())
print(inputs.max())
print(inputs.mean())
print(max(inputs.min().abs(), inputs.max().abs()))

tensor([32, 33, 34, 35, 36, 37, 38, 39], device='cuda:0', dtype=torch.int32)
torch.int64
tensor(-54.5292, device='cuda:0', grad_fn=<MinBackward1>)
tensor(84.2379, device='cuda:0', grad_fn=<MaxBackward1>)
tensor(0.0194, device='cuda:0', grad_fn=<MeanBackward0>)
tensor(84.2379, device='cuda:0', grad_fn=<AbsBackward0>)


In [241]:
loss.backward()

In [243]:
# inputs.grad
print(inputs.is_leaf)
# 梯度清零
# print(inputs.grad.fill_(0))
# 梯度取符号
print(inputs.grad*torch.max(inputs))
# 返回梯度的符号，而不改变梯度值
# print(inputs.grad.sign())
# print(inputs.grad)
# 改变梯度值
# print(inputs.grad.sign_())

False
tensor([[[[-84.2379, -84.2379, -84.2379,  ...,  84.2379,  84.2379,  84.2379],
          [-84.2379, -84.2379, -84.2379,  ...,  84.2379,  84.2379,  84.2379],
          [ 84.2379,  84.2379,  84.2379,  ...,  84.2379, -84.2379, -84.2379],
          ...,
          [ 84.2379,  84.2379,  84.2379,  ..., -84.2379, -84.2379, -84.2379],
          [ 84.2379,  84.2379,  84.2379,  ..., -84.2379, -84.2379, -84.2379],
          [ 84.2379,  84.2379,  84.2379,  ..., -84.2379, -84.2379, -84.2379]]],


        [[[ 84.2379,  84.2379,  84.2379,  ..., -84.2379, -84.2379, -84.2379],
          [ 84.2379,  84.2379,  84.2379,  ..., -84.2379, -84.2379, -84.2379],
          [-84.2379, -84.2379, -84.2379,  ...,  84.2379,  84.2379,  84.2379],
          ...,
          [-84.2379, -84.2379, -84.2379,  ...,  84.2379,  84.2379,  84.2379],
          [-84.2379, -84.2379, -84.2379,  ...,  84.2379,  84.2379,  84.2379],
          [-84.2379, -84.2379, -84.2379,  ...,  84.2379,  84.2379,  84.2379]]],


        [[[-84.2379,

In [259]:
ll = inputs.detach().cpu().numpy()
print((inputs.detach().cpu().numpy() == inputs.detach().cpu().numpy()).sum()/len(ll.flatten()))

1.0


In [313]:
def mat(x, y):
    return x@y

aa = torch.tensor([12, 32, 11], dtype=torch.float64, requires_grad=True)
bb = torch.randint(2, 5, size=(3, 1), dtype=torch.float64, requires_grad=True)
cc = aa.clone().detach()
cc.requires_grad=True
dd = mat(aa, bb)
print(dd)
dd.backward()

aa = aa.clone().detach()
print(aa.grad)
print(cc.grad)
print(aa.is_leaf)
print(cc.is_leaf)
print(cc.requires_grad)


tensor([153.], dtype=torch.float64, grad_fn=<SqueezeBackward4>)
None
None
True
True
True


In [213]:
mm = torch.tensor(np.random.randint(0, 50, size=5))
print(mm)
print(train_loader.dataset.data[mm].shape)
print("haha")
print(torch.LongTensor(mm.size()).fill_(1))
print(mm.size())

tensor([32, 23, 47, 13, 18], dtype=torch.int32)
(5, 9, 1375)
haha
tensor([1, 1, 1, 1, 1])
torch.Size([5])


In [264]:
print(torch.tensor([[1, 2], [2, 3]]).dim())

2


In [273]:
qjs = np.random.randint(1, 5, size=(1, 2, 3))
trans=transforms.Compose(
    [
        transforms.ToTensor(),

    ])
xx = trans(qjs)
print(xx.shape)
print(xx.permute(1, 2, 0).shape)

torch.Size([3, 1, 2])
torch.Size([1, 2, 3])


In [279]:
print(inputs[0])
b = inputs/inputs.max()
print(b)
print(b.detach().cpu().numpy())
print(b.device)

tensor([[[-0.0184,  0.8899,  1.2132,  ..., -3.6629, -3.6229, -5.4540],
         [ 0.0817,  0.5078,  0.1650,  ..., -4.7947, -4.4556, -5.7905],
         [-0.1146,  1.4712,  1.3652,  ..., -5.1182, -5.4138, -6.1621],
         ...,
         [-1.4247,  0.4004,  0.4155,  ..., -4.8200, -5.1531, -7.2199],
         [-2.6824, -0.8331,  0.8434,  ..., -5.0857, -3.7036, -7.1658],
         [-1.8637, -1.5931,  2.9133,  ..., -7.8758, -2.6312, -5.8844]]],
       device='cuda:0', grad_fn=<SelectBackward0>)
tensor([[[[-2.1858e-04,  1.0564e-02,  1.4402e-02,  ..., -4.3483e-02,
           -4.3008e-02, -6.4745e-02],
          [ 9.7033e-04,  6.0279e-03,  1.9588e-03,  ..., -5.6919e-02,
           -5.2893e-02, -6.8740e-02],
          [-1.3610e-03,  1.7465e-02,  1.6207e-02,  ..., -6.0759e-02,
           -6.4268e-02, -7.3151e-02],
          ...,
          [-1.6913e-02,  4.7534e-03,  4.9328e-03,  ..., -5.7219e-02,
           -6.1174e-02, -8.5708e-02],
          [-3.1844e-02, -9.8900e-03,  1.0012e-02,  ..., -6.0373e

In [15]:
# SummaryWriter测试
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter(log_dir="./log_test", filename_suffix="qjs")
for i in range(10):
    writer.add_scalar("qjs/x**3",scalar_value=i**3, global_step=i)
    writer.add_scalar("qjs/x*3", scalar_value=i*3, global_step=i)
writer.close()

In [78]:
import numpy as np
# 自动求导测试
def func1(x):
    return x**2
x = np.rand(size=(3, ), requires_grad=True)
print(x)
y = func1(x)
print(y)
y.backward(torch.ones_like(x))
x.grad
x.grad.sign()

tensor([-0.2066, -0.5919, -0.3954], grad_fn=<SubBackward0>)
tensor([0.0427, 0.3503, 0.1564], grad_fn=<PowBackward0>)


  x.grad
  x.grad.sign()


AttributeError: 'NoneType' object has no attribute 'sign'

In [103]:
a = torch.Tensor([[1, 2, 3, 4, 5, 6, 7], [8, 5, 2, 5, 4, 6 ,1]])
b = torch.Tensor([1, 3, 2, 5, 4, 6, 7])
print(torch.eq(a, b).float().mean().data)
torch.argmax(a, dim=1)

tensor(0.5000)


tensor([6, 0])