In [1]:
# 使用伪码表生成wave从而得到对比训练集，防止恒正运算
# server给subject提供一个伪码表，subject利用伪码表生成对应的对照集
# 伪码与最终的hash key或者BCH code无关
# 判别器需要保留一定的分辨能力
# data index range [2, 66]
import torch
import torch.nn as nn
from torch.nn import *
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import random
import os

from torch.utils.data.dataset import Dataset
from torch.utils.data.dataloader import DataLoader

from preprocess import Process

In [2]:
series = []
items = 1
step = 3
subject_num = 0

for num in range(2, 66):
    try:
        series += Process(num).prepro(1024, step, items)
        subject_num += 1
    except:
        print(f'subject {num} abandoned')

subject 13 abandoned
subject 16 abandoned
subject 17 abandoned
subject 18 abandoned
subject 20 abandoned
subject 26 abandoned


The maximal number of iterations maxit (set to 20 by the program)
allowed for finding a smoothing spline with fp=s has been reached: s
too small.
There is an approximation returned but the corresponding weighted sum
of squared residuals does not satisfy the condition abs(fp-s)/s < tol.
  result = super().mean(axis=axis, dtype=dtype, **kwargs)[()]
  return _methods._var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,


subject 42 abandoned
subject 47 abandoned
subject 48 abandoned
subject 50 abandoned


In [3]:
class MyDataset(Dataset):
    def __init__(self, series, items, subject_num):
        self.series = series
        self.codes = []
        self.subject_num = subject_num
        self.labels = np.zeros((len(self.series), self.subject_num), dtype='double')
        for i in range(self.subject_num):
            for j in range(items):
                self.labels[i + j][i] = 1.0            

    # need to overload
    def __len__(self):
        return len(self.series)

    # need to overload
    def __getitem__(self, idx):
        return torch.tensor(self.series[idx]), torch.tensor(self.labels[idx])

In [4]:
dataset = MyDataset(series, items=items, subject_num=subject_num)
dataloader = torch.utils.data.DataLoader(dataset,batch_size=16, shuffle=True)

IndexError: index 54 is out of bounds for axis 0 with size 54

In [None]:
class ConvolutionBlock(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(ConvolutionBlock, self).__init__()
        self.block = nn.Sequential(nn.Conv1d(input_size, hidden_size, kernel_size=3, padding=1),
            nn.BatchNorm1d(hidden_size),
            nn.ReLU(),
            nn.Conv1d(hidden_size, hidden_size, kernel_size=3, padding=1),
            nn.BatchNorm1d(hidden_size),
            nn.ReLU()
        )
        
    def forward(self, x):
        x = self.block(x)
        return x

In [None]:
class TransformerClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes, num_layers, num_heads, dropout, num_conv_blocks):
        super(TransformerClassifier, self).__init__()
        self.conv1 = ConvolutionBlock(input_size, hidden_size)
        self.conv_blocks = nn.ModuleList([
            ConvolutionBlock(hidden_size, hidden_size) for _ in range(num_conv_blocks)
        ])
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(hidden_size, num_heads, dim_feedforward=hidden_size, dropout=dropout),
            num_layers
        )
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        for conv_block in self.conv_blocks:
            x = conv_block(x)
        x = x.permute(0, 2, 1)  # Reshape to (batch_size, hidden_size, seq_len)
        x = self.transformer(x)
        x = x.mean(dim=1)  # Average the sequence dimension
        x = torch.sigmoid(self.fc(x))
        return x

In [None]:
device='cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
# Example usage
input_size = 1
hidden_size = 128
num_classes = 54
num_layers = 6
num_heads = 4
dropout = 0.2
batch_size = 32
num_epochs = 10
num_conv_blocks = 2

In [None]:
cls = TransformerClassifier(input_size, hidden_size, num_classes, num_layers, num_heads, dropout, num_conv_blocks).to(device)
cls = cls.double()

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(cls.parameters(), lr=1e-6)
# learning 2e-5, momentum 0.1, rate=50% max = 66.7%
# learning 2e-5, momentum 2e-3, max = 94.4%
# lr 2e-2, momentum 2e-3, max = 67.3%
# with sigmoid
# SGD lr 2e-5 momentum 2e-3 max = 33.3%

In [None]:
C_loss = []
C_acc = []

In [None]:
for epoch in range(3000):
    epoch_iterator = tqdm(dataloader, desc="Training Epoch %d" % (epoch + 1), ncols = 100)
    #初始化损失值
    c_epoch_loss = 0
    acc_num = 0
    num = 0
    count = len(dataloader) #返回批次数
    #对数据集进行迭代
    for step, (subject, label) in enumerate(epoch_iterator):
        subject = torch.reshape(subject, (subject.size(0), 1, subject.size(1)))
        subject = subject.to(device) #把数据放到设备上
        label = label.to(device)
        size = subject.size(0)
        num += size
        
        class_train = cls(subject)
        c_loss = criterion(class_train, label)
        c_loss.backward()
        optimizer.step()
        
        for i in range(subject.size(0)):
            idx1 = torch.argmax(class_train[i])
            idx2 = torch.argmax(label[i])
            if idx1 == idx2:
                acc_num += 1
        
        #累计每一个批次的loss
        with torch.no_grad():
            c_epoch_loss += c_loss
        epoch_iterator.set_postfix({"c_loss": '{0:1.5f}'.format(c_epoch_loss), "accuracy": '{0:1.3f}'.format(acc_num / num)})
        epoch_iterator.update(1)
            
    #求平均损失
    with torch.no_grad():
        c_epoch_loss /= count
        acc = acc_num / num
        C_loss.append(c_epoch_loss)
        C_acc.append(acc)
    if acc >=0.9:
        break

In [None]:
c_curve = [i.cpu() for i in C_loss]
plt.plot(c_curve)

In [None]:
a_curve = [i for i in C_acc]
plt.plot(a_curve)

In [None]:
np.save('c_curve.npy', c_curve)
np.save('a_curve.npy', a_curve)

In [None]:
torch.save(cls, "classification")