In [1]:
import numpy as np
import os
import pandas as pd
from pandas import DataFrame
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
# 读取数据
load_path = '../global_data/time_76800x128x9x9/'

trials = np.load(load_path + 'trials.npy')
bases = np.load(load_path + 'bases.npy')
labels = np.load(load_path + 'labels.npy')
# chw格式
print(trials.shape, bases.shape, labels.shape)

(76800, 128, 9, 9) (1280, 128, 9, 9) (1280, 2)


## 数据预处理

In [3]:
# 去基线
trials_de_base = []
for i, base in enumerate(bases):
    trials_de_base.append(trials[i * 60 : (i + 1) * 60] - base)
trials_de_base = np.array(trials_de_base)
trials_de_base = trials_de_base.reshape((-1, 128, 9, 9))
trials_de_base.shape

(76800, 128, 9, 9)

In [4]:
# 离散化标签
labels = np.where(labels >= 5, 1, 0)

# 复制以对齐样本
labels = np.repeat(labels, 60, axis = 0)
print(labels.shape)

(76800, 2)


In [5]:
def data_split(features, labels, IDs, train_ratio = 0.9):
    print('split dataset...')
    sample_cnt = labels.shape[0]
    # 数据集划分
    train_ratio = train_ratio
    shuffer_list = np.arange(sample_cnt)
    np.random.shuffle(shuffer_list)

    features_shuffer = features[shuffer_list] # trials_de_base
    labels_shuffer = labels[shuffer_list]
    IDs_shuffer = IDs[shuffer_list]

    x_train = features_shuffer[:int(train_ratio * sample_cnt)]
    y_train = labels_shuffer[:int(train_ratio * sample_cnt)]
    ID_train = IDs_shuffer[:int(train_ratio * sample_cnt)]
    x_test = features_shuffer[int(train_ratio * sample_cnt):]
    y_test = labels_shuffer[int(train_ratio * sample_cnt):]
    ID_test = IDs_shuffer[int(train_ratio * sample_cnt):]
    return x_train, y_train, ID_train, x_test, y_test, ID_test

In [6]:
def z_norm(x_train, x_test):# z归一化
    print('z_norm...')
    chan_to_1020={0:[0,3],1:[1,3],2:[2,2],3:[2,0],4:[3,1],5:[3,3],6:[4,2],7:[4,0],8:[5,1],
                  9:[5,3],10:[6,2],11:[6,0],12:[7,3],13:[8,3],14:[8,4],15:[6,4],16:[0,5],
                  17:[1,5],18:[2,4],19:[2,6],20:[2,8],21:[3,7],22:[3,5],23:[4,4],24:[4,6],
                    25:[4,8],26:[5,7],27:[5,5],28:[6,6],29:[6,8],30:[7,5],31:[8,5]}

    # 需要归一化的位置
    need_norm_pos = np.zeros(shape=(9,9), dtype=int)

    for val in chan_to_1020.values():
        need_norm_pos[val[0]][val[1]] = 1
    # print(need_norm_pos)

    for i in range(9):
        for j in range(9):
            if need_norm_pos[i][j] == 0:
                continue
            for t in range(128):
                mean = np.mean(x_train[:, t, i, j])
                std = np.std(x_train[:, t, i, j])
                x_train[:, t, i, j] -= mean
                x_train[:, t, i, j] /= std
                x_test[:, t, i, j] -= mean
                x_test[:, t, i, j] /= std
    return x_train, x_test

In [26]:
# 超参设置
batch_size = 32
max_epoch = 1000
emotion_dim = 0
lr = 0.001

patient = 100
emo_imp = 0.9

model_save_path = 'model/MTL_model/'
IndiviTransNet_model_load_path = 'model/transformer_model/'

In [27]:
from torch.utils.data import TensorDataset
import torch
from torch.utils.data import DataLoader

from torch import nn
from torchviz import make_dot
from torch.autograd import Variable

# 考虑流出一部分人，不用于特征映射，但也用于训练MTL，且只用情感损失来监督学习，这是否对新的测试受试者有利？

In [28]:
import math
from IndiviTransNet import IndiviTransNet
from MTL_Net import MTL_Net

for sub_idx in range(32):
    
    transformer_sub_list = [_ for _ in range(32)]
    transformer_sub_list.remove(sub_idx)
    transfomer_group = []
    for _ in transformer_sub_list:
        ITN = IndiviTransNet(transformer_phase=True)
        ITN.load_state_dict(torch.load(IndiviTransNet_model_load_path + f'sub_{_}_model_parameter.pkl'))
        transfomer_group.append(ITN)
            
    model = MTL_Net(transfomer_group)
    # 初始化权重参数
    

    opt = torch.optim.Adam(model.parameters(), lr)
    ID_criterion = nn.CrossEntropyLoss()
    Emo_criterion = nn.CrossEntropyLoss()
    
    # 提前终止
    acc_value=0
    step=0
    
    train_mask = np.ones(shape=(32 * 2400), dtype=np.int64)
    train_mask[sub_idx * 2400 : (sub_idx + 1) * 2400] = 0
    train_mask = train_mask == 1
    
    x_train = trials_de_base[train_mask]
    emo_train = labels[train_mask, emotion_dim]
    IDs = torch.arange(31).unsqueeze(-1)
    ID_train = IDs.repeat(1, 2400).view(-1)
    
    x_train, emo_train, ID_train, x_vali, emo_vali, ID_vali = data_split(x_train, emo_train, ID_train, train_ratio = 0.9)
    x_train, x_vali = z_norm(x_train, x_vali)
    
    print('make tensor...')
    x_train = torch.tensor(x_train, dtype=torch.float)
    emo_train = torch.tensor(emo_train, dtype=torch.long).unsqueeze(-1)
    ID_train = torch.tensor(ID_train, dtype=torch.long).unsqueeze(-1)
    y_train = torch.concat([emo_train, ID_train], dim = 1)
    
    x_vali = torch.tensor(x_vali, dtype=torch.float)
    emo_vali = torch.tensor(emo_vali, dtype=torch.long).unsqueeze(-1)
    ID_vali = torch.tensor(ID_vali, dtype=torch.long).unsqueeze(-1)
    y_vali = torch.concat([emo_vali, ID_vali], dim = 1)
    
    data_train = TensorDataset(x_train, y_train)
    train_loader = DataLoader(dataset=data_train, batch_size=batch_size, shuffle=True)
    
    print('train...')
    for i in range(max_epoch):
        model.train()
        print(f'epoch:{i}')
        for batch_idx, batch_data in enumerate(train_loader):
            x, y = batch_data
            emo_out, id_out = model(x, x)
            
#             vis_graph = make_dot(model(x, x), params=dict(model.named_parameters()))
#             vis_graph.view()
            
            Emo_loss = Emo_criterion(emo_out, y[:, 0])
            ID_loss = ID_criterion(id_out, y[:, 1])
            loss = emo_imp * Emo_loss + (1 - emo_imp) * ID_loss
#             loss = Emo_loss
            opt.zero_grad()
            loss.backward()
            opt.step()
#             print(model.state_dict()['id_net.fc1.bias'])
            
            if batch_idx % 100 == 0:
                train_emo_loss = Emo_loss / x.shape[0]
                train_id_loss = ID_loss / x.shape[0]

                emo_out = torch.max(emo_out, 1)[1]
                train_emo_acc = (emo_out == y[:, 0]).sum() / x.shape[0]

                id_out = torch.max(id_out, 1)[1]
                train_id_acc = (id_out == y[:, 1]).sum() / x.shape[0]

                print(f'batch: {batch_idx}, train_emo_loss: {train_emo_loss:.6f}, train_id_loss: {train_id_loss:.6f}, train_emo_acc: {train_emo_acc:.6f}, train_id_acc: {train_id_acc:.6f}')
        
        model.eval()
        emo_out, id_out = model(x_vali, x_vali)
        vali_emo_loss = Emo_criterion(emo_out, y_vali[:, 0]) / x_vali.shape[0]
        vali_id_loss = ID_criterion(id_out, y_vali[:, 1]) / x_vali.shape[0]
        
        emo_out = torch.max(emo_out, 1)[1]
        vali_emo_acc = (emo_out == y_vali[:, 0]).sum() / x_vali.shape[0]
        
        id_out = torch.max(id_out, 1)[1]
        vali_id_acc = (id_out == y_vali[:, 1]).sum() / x_vali.shape[0]
        
        print(f'epoch: {i}, vali_emo_loss: {vali_emo_loss:.6f}, vali_id_loss: {vali_id_loss:.6f}, vali_emo_acc: {vali_emo_acc:.6f}, vali_id_acc: {vali_id_acc:.6f}')
        
        step = step + 1
        #模型性能有所提升则保存模型，并更新loss_value
        if  vali_emo_acc > acc_value:
            step = 0
            torch.save(model.state_dict(), model_save_path + f'sub_{sub_idx}_model_parameter.pkl')
            acc_value = vali_emo_acc
        
        if step >= patient:
            break
        
    # 测试
    x_test = trials_de_base[train_mask==False]
    emo_test = labels[train_mask==False]
    emo_test = emo_test[:, emotion_dim]

    x_train, x_test = z_norm(x_train.numpy(), x_test)

    print('make tensor...')
    x_test = torch.tensor(x_test, dtype=torch.float)
    emo_test = torch.tensor(emo_test, dtype=torch.long)

    model.load_state_dict(torch.load(model_save_path + f'sub_{0}_model_parameter.pkl'))
    model.eval()
    emo_out, id_out = model(x_test, x_test)
    test_emo_loss = Emo_criterion(emo_out, emo_test) / x_test.shape[0]

    emo_out = torch.max(emo_out, 1)[1]
    test_emo_acc = (emo_out == emo_test).sum() / x_test.shape[0]
    #------------------------------------------------看看测试者的id


    print(f'sub: {sub_idx}, test_emo_loss: {vali_emo_loss:.6f}, test_emo_acc: {test_emo_acc:.6f}')

split dataset...
z_norm...
make tensor...




train...
epoch:0
batch: 0, train_emo_loss: 0.022230, train_id_loss: 0.109053, train_emo_acc: 0.531250, train_id_acc: 0.062500
batch: 100, train_emo_loss: 0.020709, train_id_loss: 0.095327, train_emo_acc: 0.656250, train_id_acc: 0.156250
batch: 200, train_emo_loss: 0.017812, train_id_loss: 0.058151, train_emo_acc: 0.593750, train_id_acc: 0.312500
batch: 300, train_emo_loss: 0.018586, train_id_loss: 0.034609, train_emo_acc: 0.781250, train_id_acc: 0.718750
batch: 400, train_emo_loss: 0.019669, train_id_loss: 0.027489, train_emo_acc: 0.656250, train_id_acc: 0.750000
batch: 500, train_emo_loss: 0.015764, train_id_loss: 0.015648, train_emo_acc: 0.781250, train_id_acc: 0.937500
batch: 600, train_emo_loss: 0.013810, train_id_loss: 0.017668, train_emo_acc: 0.875000, train_id_acc: 0.781250
batch: 700, train_emo_loss: 0.012896, train_id_loss: 0.019595, train_emo_acc: 0.812500, train_id_acc: 0.812500
batch: 800, train_emo_loss: 0.010194, train_id_loss: 0.005600, train_emo_acc: 0.843750, train_id_

KeyboardInterrupt: 

In [15]:
# print(model.state_dict()['id_net.fc1.bias'])

tensor([-0.0151, -0.0278, -0.0150, -0.0164, -0.0173,  0.0016, -0.0066,  0.0091,
         0.0129, -0.0232, -0.0003, -0.0231, -0.0143, -0.0002, -0.0057,  0.0151,
         0.0020,  0.0042,  0.0618,  0.0283, -0.0025, -0.0111,  0.0416,  0.0689,
         0.0010, -0.0021, -0.0061,  0.0154,  0.0150, -0.0045,  0.0240, -0.0334,
         0.0157, -0.0150,  0.0126, -0.0149,  0.0193, -0.0128, -0.0071, -0.0535,
         0.0106, -0.0117, -0.0329, -0.0148,  0.0059, -0.0045, -0.0154, -0.0007,
         0.0107, -0.0082])


In [None]:
torch.save(model.state_dict(), model_save_path + f'sub_{0}_model_parameter.pkl')
x_test = trials_de_base[train_mask==False]
emo_test = labels[train_mask==False]
emo_test = emo_test[:, emotion_dim]

x_train, x_test = z_norm(x_train.numpy(), x_test)

print('make tensor...')
x_test = torch.tensor(x_test, dtype=torch.float)
emo_test = torch.tensor(emo_test, dtype=torch.long)
    
model.load_state_dict(torch.load(model_save_path + f'sub_{0}_model_parameter.pkl'))
model.eval()
emo_out, id_out = model(x_test, x_test)
test_emo_loss = Emo_criterion(emo_out, emo_test) / x_test.shape[0]

emo_out = torch.max(emo_out, 1)[1]
test_emo_acc = (emo_out == emo_test).sum() / x_test.shape[0]
#------------------------------------------------看看测试者的id


print(f'sub: {sub_idx}, test_emo_loss: {vali_emo_loss:.6f}, test_emo_acc: {test_emo_acc:.6f}')

In [None]:
# os.environ["PATH"] += os.pathsep +'C:/Program Files/Graphviz/bin'
# plot_model(model,'cnn2_dens2.jpg', show_shapes=True)

In [None]:
# # 建立模型


# callbacksList = [EarlyStopping(monitor = 'val_acc', patience=10, verbose = 1), # patience当连续多少个epochs时验证集精度不再变好终止训练
#                 ModelCheckpoint(filepath = f'model/model.h5',monitor='acc',save_best_only=True,)
# ]

# # 评估
# model = load_model('model/model.h5')
# score = model.evaluate(x_test, y__v_test, verbose = 1)

# """
# Epoch 39/200
# 2160/2160 [==============================] - 59s 27ms/step - loss: 0.0448 - acc: 0.9878 - val_loss: 0.5709 - val_acc: 0.9161
# """