In [3]:
# https://zhuanlan.zhihu.com/p/419092667
import torch
import numpy

class SVM(torch.nn.Module):
    def __init__(self, C = 1):
        super(SVM,self).__init__()
        self.w = torch.nn.Parameter(torch.ones(324))
        self.b = torch.nn.Parameter(torch.zeros(1))
        # C正相关越界点的个数
        self.C = C
        self.relu = torch.nn.ReLU()

    def forward(self, positive, negative, optimizer):
        optimizer.zero_grad()
        w2 = torch.pow(self.w, 2).sum()
        positive = 1 - (torch.matmul(positive, self.w) + self.b)    # 正类
        negative = 1 + (torch.matmul(negative, self.w) + self.b)    # 负类

        loss = (self.relu(positive).sum() + self.relu(negative).sum()) / self.C + w2

        loss.backward()
        optimizer.step()

    def predict(self, x, y):# 忽略间隔
        size = x.size(0)
        w = self.w /self.w.norm()
        output = torch.matmul(x, w) + self.b
        accuracy = 0
        for i in range(size):
            if (output[i].sum() > 0 and int(y[i]) == 1) or (output[i].sum() < 0 and int(y[i]) == 0):
                accuracy += 1
        return accuracy / size

In [5]:
x_train_positive = torch.tensor(numpy.load("../train_x_positive.npy", allow_pickle=True))
x_train_negative = torch.tensor(numpy.load("../train_x_negative.npy", allow_pickle=True))
x_test = torch.tensor(numpy.load("../test_x.npy", allow_pickle=True))
y_test = torch.tensor(numpy.load("../test_y.npy", allow_pickle=True))

# x_train_positive, x_train_negative = torch.unsqueeze(x_train_positive, dim = 3), torch.unsqueeze(x_train_negative, dim = 3)
# x_test = torch.unsqueeze(x_test, dim = 3)
# print(x_train_positive.shape)
# print(x_train_negative.shape)
# print(x_test.shape)
# print(x_test.shape)

x_train_positive = x_train_positive.reshape(x_train_positive.shape[0], -1).cuda()
x_train_negative = x_train_negative.reshape(x_train_negative.shape[0], -1).cuda()
x_test, y_test = x_test.reshape(x_test.shape[0], -1).cuda(), y_test.cuda()
print(x_train_positive.shape)
print(x_train_negative.shape)
print(x_test.shape)
print(y_test.shape)

torch.Size([39074, 324])
torch.Size([30046, 324])
torch.Size([7680, 324])
torch.Size([7680])


In [None]:
model = SVM().cuda()
optimizer = torch.optim.SGD(model.parameters(), lr = 1e-3)

for i in range(500):# 训练与检测
    model.train()
    model(x_train_positive, x_train_negative, optimizer)

    model.eval()
    with torch.no_grad():
        pre = model.predict(x_test, y_test)
        print(f"Epoch{i + 1}, 准确率：{100 * pre :> 0.3f}%")

torch.save(model.state_dict(), './SVM_EEG.pth')

In [9]:
# 记录距离，分类标签与实际标签
import pandas

model = SVM().cuda()
model.load_state_dict(torch.load('./SVM_EEG.pth'))

x = torch.tensor(numpy.load("../x_EEG.npy", allow_pickle=True)).cuda()
x = x.reshape(x.shape[0], -1).cuda()
y = torch.tensor(numpy.load("../y_EEG.npy", allow_pickle=True)).cuda()

b, w = model.b, model.w
output = torch.matmul(x, w) + b

distance, classify_label, real_label = [], [], []
for i in range(output.shape[0]):
    distance.append(int(output[i].sum()))
    if int(y[i]) == 1:
        real_label.append(1)
    else:
        real_label.append(0)
    if distance[i] > 0:
        classify_label.append(1)
    else:
        classify_label.append(0)

data = pandas.DataFrame({'distance':distance, 'classify_label': classify_label, 'real_label': real_label})
data.to_csv("../SVM_distance_CCNN.csv", index = True)

In [None]:
# Part.1
# 读取源数据，做基本处理，区分标签与数据
from torcheeg import transforms
from torcheeg.datasets import DEAPDataset
from torcheeg.datasets.constants.emotion_recognition.deap import DEAP_CHANNEL_LIST
from torcheeg.datasets.constants.emotion_recognition.deap import DEAP_CHANNEL_LOCATION_DICT

dataset = DEAPDataset(io_path = f'.../Data/deap_CCNN', root_path = '.../Data/data_preprocessed_python',
                    io_mode = 'pickle',
                    offline_transform = transforms.Compose([
                        transforms.BandDifferentialEntropy(sampling_rate = 128, apply_to_baseline = True),
                        transforms.BaselineRemoval(),
                        transforms.ToGrid(DEAP_CHANNEL_LOCATION_DICT)
                    ]),
                    online_transform = transforms.ToTensor(),
                    label_transform = transforms.Compose([
                        transforms.Select('valence'),
                        transforms.Binary(5.0),
                    ]),
                    chunk_size = 128,
                    baseline_chunk_size = 128,
                    num_baseline = 3,
                    num_worker = 4)

dataset = DEAPDataset(io_path=f'.../Data/deap_EEG', root_path='.../Data/data_preprocessed_python',
            online_transform=transforms.Compose([
                transforms.To2d(),
                transforms.ToTensor(),
            ]),
            io_mode='pickle',
            label_transform=transforms.Compose([
                transforms.Select('valence'),
                transforms.Binary(5.0),
            ]))# None

# dataset = DEAPDataset(io_path='.../Data/deap_TSCeption', root_path='.../Data/data_preprocessed_python',
#                     io_mode='pickle',
#                     offline_transform=transforms.Compose([
#                         transforms.PickElectrode(transforms.PickElectrode.to_index_list(
#                         ['FP1', 'AF3', 'F3', 'F7', 'FC5', 'FC1', 'C3', 'T7', 'CP5', 'CP1', 'P3', 'P7',
#                         'PO3','O1', 'FP2', 'AF4', 'F4', 'F8', 'FC6', 'FC2', 'C4', 'T8', 'CP6', 'CP2',
#                         'P4', 'P8', 'PO4', 'O2'], DEAP_CHANNEL_LIST)),
#                         transforms.To2d()
#                     ]),
#                     online_transform=transforms.ToTensor(),
#                     label_transform=transforms.Compose([
#                         transforms.Select('valence'),
#                         transforms.Binary(5.0),
#                     ]))

for i, (train_dataset, val_dataset) in enumerate(dataset):
    if i % 1000 == 0:
        print(f'进度：{(100 * i / 76799) :> 0.3f}%')
    train_dataset = torch.unsqueeze(train_dataset, dim = 0).cuda()
    val_dataset = torch.unsqueeze(torch.tensor(val_dataset), dim = 0).cuda()
    if i == 0:
        x, y = train_dataset, val_dataset
    else:
        x = torch.cat((x, train_dataset), 0).cuda()
        y = torch.cat((y, val_dataset), 0).cuda()

x = x.detach().cpu().numpy()
y = y.detach().cpu().numpy()
numpy.save("../x_EEG.npy", x)
numpy.save("../y_EEG.npy", y)

In [4]:
# 数据处理Part.2
# 对训练数据细分
from sklearn.model_selection import train_test_split

x = torch.tensor(numpy.load("../x_EEG.npy", allow_pickle=True))
y = torch.tensor(numpy.load("../y_EEG.npy", allow_pickle=True))

def data_presolve(x, y):
    i_1, i_2 = 0, 0
    for i in range(x.shape[0]):# 一种很别扭的拆分
        data = torch.unsqueeze(x[i], dim = 0).cuda()
        if int(y[i]) == 1:
            if i_1 == 0:
                i_1 = 1
                x_positive = data
            else:
                x_positive = torch.cat((data, x_positive), 0).cuda()
        else:
            if i_2 == 0:
                i_2 = 2
                x_negative = data
            else:
                x_negative = torch.cat((data, x_negative), 0).cuda()
    return x_positive, x_negative

x = x.reshape(x.shape[0], x.shape[2], -1)
y = y.reshape(y.shape[0])

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.1)# 测试集与训练集分割

print(x_train.shape)
print(y_train.shape)
print(x_test.shape)
print(y_test.shape)

train_x_positive, train_x_negative = data_presolve(x_train, y_train)# 做一个很烂的区分
train_x_positive = train_x_positive.detach().cpu().numpy()
train_x_negative = train_x_negative.detach().cpu().numpy()
numpy.save("../train_x_positive.npy", train_x_positive)
numpy.save("../train_x_negative.npy", train_x_negative)
numpy.save("../test_x.npy", x_test)
numpy.save("../test_y.npy", y_test)

torch.Size([69120, 9, 36])
torch.Size([69120])
torch.Size([7680, 9, 36])
torch.Size([7680])
