In [128]:
import copy
import math

import numpy as np
import pandas as pd
import torch
from torch import nn, optim
import torch.nn.functional as F
from collections import OrderedDict
from scipy.io import savemat
from torch.utils.data import DataLoader
import matplotlib

torch.set_printoptions(precision=2)

In [135]:
path = "./Lanekeeping/"
proj_name = "Lanekeeping_4ch_head8"

model_path = f'{path}{proj_name}_model.pt'
model_state_path = f'{path}{proj_name}_model_state_dict_best.pt'
mask_model_path = f'{path}{proj_name}_eye_model.pt'
mask_model_state_path = f'{path}{proj_name}_eye_model_state_dict_best.pt'

# ch_name = ['sine', 'sawtooth', 'random']
ch_name = ['Fz', 'Cz', 'Pz', 'Oz']
# ch_name = ['Fz', 'F3', 'F4', 'Cz', 'C3', 'C4', 'Pz', 'Oz']
head = 8
in_channel = len(ch_name)
total_epoch = 100
time_len = 100
d_model = 512
kernel_size = 15
lr = 0.001

use_cuda = 1
device = torch.device("cuda" if (torch.cuda.is_available() & use_cuda) else "cpu")

matplotlib.use('TkAgg')

In [136]:
'''mask model'''

def clones(module, N):
    """Produce N identical layers."""
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])


def mask_attention(query, key, value, mask=None, dropout=None):
    """Compute 'Scaled Dot Product Attention'"""
    d_k = query.size(-1)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
    print(f'attention size: {scores}')
    scores = scores.masked_fill(mask == 1, -1e9)
    print(f'mask attention size: {scores}')
    p_attn = F.softmax(scores, dim=-1)
    if dropout is not None:
        p_attn = dropout(p_attn)
    return torch.matmul(p_attn, value), p_attn


class MultiHeadedAttention(nn.Module):
    def __init__(self, h, d_model, dropout=0.1):
        """Take in model size and number of heads."""
        super(MultiHeadedAttention, self).__init__()
        assert d_model % h == 0
        # We assume d_v always equals d_k
        self.d_k = d_model // h
        self.h = h
        self.linears = clones(nn.Linear(d_model, d_model), 4)
        self.attn = None
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, query, key, value, mask=None):
        nbatches = query.size(0)

        # 1) Do all the linear projections in batch from d_model => h x d_k
        query, key, value = \
            [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
             for l, x in zip(self.linears, (query, key, value))]

        # 2) Apply attention on all the projected vectors in batch.
        x, self.attn = mask_attention(query, key, value, mask=mask,
                                 dropout=self.dropout)

        # print(f'x: {x.size()}')
        # print(f'attention score: {self.attn.size()}')
        print(f'attention score: {self.attn}')

        # 3) "Concat" using a view and apply a final linear.
        x = x.transpose(1, 2).contiguous() \
            .view(nbatches, -1, self.h * self.d_k)
        return self.linears[-1](x), self.attn


class MyModel(nn.Module):
    def __init__(self, in_channel=1, time_len=1, kernel_size=1, head=1, d_model=1):
        super(MyModel, self).__init__()
        self.head = head
        self.a = None

        self.Conv1d01 = nn.Conv1d(in_channel, out_channels=in_channel, kernel_size=kernel_size, padding='same')
        self.BN1d01 = nn.BatchNorm1d(in_channel)
        self.ReLu01 = nn.ReLU(inplace=True)
        self.Conv1d02 = nn.Conv1d(in_channel, out_channels=in_channel, kernel_size=kernel_size, padding='same')
        self.BN1d02 = nn.BatchNorm1d(in_channel)
        self.ReLu02 = nn.ReLU(inplace=True)

        if self.head != 0:
            self.Encode = nn.Linear(time_len, d_model)
            self.Q = nn.Linear(d_model, d_model)
            self.K = nn.Linear(d_model, d_model)
            self.V = nn.Linear(d_model, d_model)
            self.Attentionlayer = MultiHeadedAttention(h=self.head, d_model=d_model)
            self.Decode = nn.Linear(d_model, time_len)

        self.Conv1d03 = nn.Conv1d(in_channels=in_channel, out_channels=in_channel, kernel_size=1, padding="same")

    def forward(self, x):
        x = self.ReLu01(self.BN1d01(self.Conv1d01(x)))
        h = self.ReLu02(self.BN1d02(self.Conv1d02(x)))
        if self.head != 0:
            mask = torch.eye(x.size(1)).unsqueeze(0).unsqueeze(0)
            mask = mask.expand(x.size(0), self.head, x.size(1), x.size(1))
            print(mask)
            h = self.Encode(h)
            q, k, v = self.Q(h), self.K(h), self.V(h)
            h, self.a = self.Attentionlayer(q, k, v, mask)
            print(f'self.a: {self.a}')
            h = self.Decode(h)
        output = self.Conv1d03(h)
        return output, self.a
    
mask_model = MyModel(in_channel=in_channel, kernel_size=kernel_size,
                head=head, d_model=d_model, time_len=time_len).to(device)

mask_model.load_state_dict(torch.load(mask_model_state_path, map_location=device))
mask_model.eval()

MyModel(
  (Conv1d01): Conv1d(4, 4, kernel_size=(15,), stride=(1,), padding=same)
  (BN1d01): BatchNorm1d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (ReLu01): ReLU(inplace=True)
  (Conv1d02): Conv1d(4, 4, kernel_size=(15,), stride=(1,), padding=same)
  (BN1d02): BatchNorm1d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (ReLu02): ReLU(inplace=True)
  (Encode): Linear(in_features=100, out_features=512, bias=True)
  (Q): Linear(in_features=512, out_features=512, bias=True)
  (K): Linear(in_features=512, out_features=512, bias=True)
  (V): Linear(in_features=512, out_features=512, bias=True)
  (Attentionlayer): MultiHeadedAttention(
    (linears): ModuleList(
      (0-3): 4 x Linear(in_features=512, out_features=512, bias=True)
    )
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (Decode): Linear(in_features=512, out_features=100, bias=True)
  (Conv1d03): Conv1d(4, 4, kernel_size=(1,), stride=(1,), padding=same)
)

In [137]:
def clones(module, N):
    """Produce N identical layers."""
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])


def attention(query, key, value, mask=None, dropout=None):
    """Compute 'Scaled Dot Product Attention'"""
    d_k = query.size(-1)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
    # print(f'attention size: {scores.size()}')
    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)
    p_attn = F.softmax(scores, dim=-1)
    if dropout is not None:
        p_attn = dropout(p_attn)
    return torch.matmul(p_attn, value), p_attn


class MultiHeadedAttention(nn.Module):
    def __init__(self, h, d_model, dropout=0.1):
        """Take in model size and number of heads."""
        super(MultiHeadedAttention, self).__init__()
        assert d_model % h == 0
        # We assume d_v always equals d_k
        self.d_k = d_model // h
        self.h = h
        self.linears = clones(nn.Linear(d_model, d_model), 4)
        self.attn = None
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, query, key, value, mask=None):
        """Implements Figure 2"""
        if mask is not None:
            # Same mask applied to all h heads.
            mask = mask.unsqueeze(1)
        nbatches = query.size(0)

        # 1) Do all the linear projections in batch from d_model => h x d_k
        query, key, value = \
            [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
             for l, x in zip(self.linears, (query, key, value))]

        # 2) Apply attention on all the projected vectors in batch.
        x, self.attn = attention(query, key, value, mask=mask,
                                 dropout=self.dropout)

        # print(f'x: {x.size()}')
        # print(f'attention score: {self.attn.size()}')
        # print(f'attention score: {self.attn}')

        # 3) "Concat" using a view and apply a final linear.
        x = x.transpose(1, 2).contiguous() \
            .view(nbatches, -1, self.h * self.d_k)
        return self.linears[-1](x), self.attn


class MyModel(nn.Module):
    def __init__(self, in_channel=1, time_len=1, kernel_size=1, head=1, d_model=1):
        super(MyModel, self).__init__()
        self.head = head
        self.a = None

        self.Conv1d01 = nn.Conv1d(in_channel, out_channels=in_channel, kernel_size=kernel_size, padding='same')
        self.BN1d01 = nn.BatchNorm1d(in_channel)
        self.ReLu01 = nn.ReLU(inplace=True)
        self.Conv1d02 = nn.Conv1d(in_channel, out_channels=in_channel, kernel_size=kernel_size, padding='same')
        self.BN1d02 = nn.BatchNorm1d(in_channel)
        self.ReLu02 = nn.ReLU(inplace=True)

        if self.head != 0:
            self.Encode = nn.Linear(time_len, d_model)
            self.Q = nn.Linear(d_model, d_model)
            self.K = nn.Linear(d_model, d_model)
            self.V = nn.Linear(d_model, d_model)
            self.Attentionlayer = MultiHeadedAttention(h=self.head, d_model=d_model)
            self.Decode = nn.Linear(d_model, time_len)

        self.Conv1d03 = nn.Conv1d(in_channels=in_channel, out_channels=in_channel, kernel_size=1, padding="same")

    def forward(self, x):
        x = self.ReLu01(self.BN1d01(self.Conv1d01(x)))
        h = self.ReLu02(self.BN1d02(self.Conv1d02(x)))
        if self.head != 0:
            h = self.Encode(h)
            q, k, v = self.Q(h), self.K(h), self.V(h)
            h, self.a = self.Attentionlayer(q, k, v)
            h = self.Decode(h)
        output = self.Conv1d03(h)
        return output, self.a

model = MyModel(in_channel=in_channel, kernel_size=kernel_size,
                head=head, d_model=d_model, time_len=time_len).to(device)

model.load_state_dict(torch.load(model_state_path, map_location=device))
model.eval()

MyModel(
  (Conv1d01): Conv1d(4, 4, kernel_size=(15,), stride=(1,), padding=same)
  (BN1d01): BatchNorm1d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (ReLu01): ReLU(inplace=True)
  (Conv1d02): Conv1d(4, 4, kernel_size=(15,), stride=(1,), padding=same)
  (BN1d02): BatchNorm1d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (ReLu02): ReLU(inplace=True)
  (Encode): Linear(in_features=100, out_features=512, bias=True)
  (Q): Linear(in_features=512, out_features=512, bias=True)
  (K): Linear(in_features=512, out_features=512, bias=True)
  (V): Linear(in_features=512, out_features=512, bias=True)
  (Attentionlayer): MultiHeadedAttention(
    (linears): ModuleList(
      (0-3): 4 x Linear(in_features=512, out_features=512, bias=True)
    )
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (Decode): Linear(in_features=512, out_features=100, bias=True)
  (Conv1d03): Conv1d(4, 4, kernel_size=(1,), stride=(1,), padding=same)
)

In [123]:
head = 0
Noattn_model_state_path = f'{path}Multitasking_4ch_head0_model_state_dict_best.pt'

class MyModel(nn.Module):
    def __init__(self, in_channel=1, time_len=1, kernel_size=1, head=1, d_model=1):
        super(MyModel, self).__init__()
        self.head = head
        self.a = None

        self.Conv1d01 = nn.Conv1d(in_channel, out_channels=in_channel, kernel_size=kernel_size, padding='same')
        self.BN1d01 = nn.BatchNorm1d(in_channel)
        self.ReLu01 = nn.ReLU(inplace=True)
        self.Conv1d02 = nn.Conv1d(in_channel, out_channels=in_channel, kernel_size=kernel_size, padding='same')
        self.BN1d02 = nn.BatchNorm1d(in_channel)
        self.ReLu02 = nn.ReLU(inplace=True)

        self.Conv1d03 = nn.Conv1d(in_channels=in_channel, out_channels=in_channel, kernel_size=1, padding="same")

    def forward(self, x):
        x = self.ReLu01(self.BN1d01(self.Conv1d01(x)))
        h = self.ReLu02(self.BN1d02(self.Conv1d02(x)))
        output = self.Conv1d03(h)
        return output, self.a

Noattn_model = MyModel(in_channel=in_channel, kernel_size=kernel_size,
                head=head, d_model=d_model, time_len=time_len).to(device)

Noattn_model.load_state_dict(torch.load(Noattn_model_state_path, map_location=device))
Noattn_model.eval()

MyModel(
  (Conv1d01): Conv1d(4, 4, kernel_size=(15,), stride=(1,), padding=same)
  (BN1d01): BatchNorm1d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (ReLu01): ReLU(inplace=True)
  (Conv1d02): Conv1d(4, 4, kernel_size=(15,), stride=(1,), padding=same)
  (BN1d02): BatchNorm1d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (ReLu02): ReLU(inplace=True)
  (Conv1d03): Conv1d(4, 4, kernel_size=(1,), stride=(1,), padding=same)
)

In [138]:
csv_name = "01_001.csv"
singal_csv_dict = path+csv_name

signal = pd.read_csv(singal_csv_dict).to_numpy().transpose()
signal = torch.tensor(signal, dtype=torch.float32).to(device)

signal = signal.unsqueeze(0)

shuffle_idx = torch.randperm(signal.size(2))
shuffle_signal = signal.clone()
shuffle_signal[:, 3, :] = shuffle_signal[:, 3, shuffle_idx]

res_a = np.zeros([15, head, 4, 4])
res_mask_a = np.zeros([15, head, 4, 4])
res_signal = np.zeros([4, 100, 15])
res_shuffle_signal = np.zeros([4, 100, 15])
res_predict = np.zeros([4, 100, 15])
res_mask_predict  = np.zeros([4, 100, 15])


for w in range(15):
       win_shuffle_signal = shuffle_signal[:, :, w*100:(w+1)*100]

       predict, a = model(win_shuffle_signal)
       mask_predict, mask_a = mask_model(win_shuffle_signal)

       res_a[w, :, :, :] = a.squeeze().detach().cpu().numpy()
       res_mask_a[w, :, :, :] = mask_a.squeeze().detach().cpu().numpy()
       res_signal[:, :, w] = signal[:, :, w*100:(w+1)*100].squeeze().detach().cpu().numpy()
       res_shuffle_signal[:, :, w] = shuffle_signal[:, :, w*100:(w+1)*100].squeeze().detach().cpu().numpy()
       res_predict[:, :, w] = predict.squeeze().detach().cpu().numpy()
       res_mask_predict[:, :, w]  = mask_predict.squeeze().detach().cpu().numpy()

savemat(f'{proj_name}_{csv_name[:-4]}.mat',
                mdict={'attention_weight': res_a,
                       'mask_attention_weight': res_mask_a,
                       'signal': res_signal,
                       'shuffle_signal': res_shuffle_signal,
                       'predict': res_predict,
                       'mask_predict': res_mask_predict
                       })

tensor([[[[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]],

         [[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]],

         [[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]],

         [[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]],

         [[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]],

         [[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]],

         [[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]],

         [[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]]]])
attention size: tensor([[[[ 1.04e-01,  3.61e+00,  2.18e+00,  4.61e+00],
          [-5.2

In [127]:
csv_name = "01_001.csv"
singal_csv_dict = path+csv_name

signal = pd.read_csv(singal_csv_dict).to_numpy().transpose()[:, 0:100]
signal = torch.tensor(signal, dtype=torch.float32).to(device)

signal = signal.unsqueeze(0)

shuffle_idx = torch.randperm(signal.size(2))
shuffle_signal = signal.clone()
shuffle_signal[:, 3, :] = shuffle_signal[:, 3, shuffle_idx]

predict, a = model(shuffle_signal)

mask_predict, mask_a = mask_model(shuffle_signal)

Noattn_predict, _ = Noattn_model(shuffle_signal)

a = a.squeeze().detach().cpu().numpy()
mask_a = mask_a.squeeze().detach().cpu().numpy()
signal = signal.squeeze().detach().cpu().numpy()
shuffle_signal = shuffle_signal.squeeze().detach().cpu().numpy()
predict = predict.squeeze().detach().cpu().numpy()
mask_predict  = mask_predict.squeeze().detach().cpu().numpy()
Noattn_predict = Noattn_predict.squeeze().detach().cpu().numpy()

print(f'a: {a}')
print(f'mask a: {mask_a}')

savemat(f'{proj_name}_{csv_name[:-4]}.mat',
                mdict={'attention_weight': a,
                       'mask_attention_weight': mask_a,
                       'signal': signal,
                       'shuffle_signal': shuffle_signal,
                       'predict': predict,
                       'mask_predict': mask_predict,
                       'Noattn_predict': Noattn_predict
                       })

tensor([[[[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]],

         [[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]],

         [[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]],

         [[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]],

         [[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]],

         [[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]],

         [[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]],

         [[1., 0., 0., 0.],
          [0., 1., 0., 0.],
          [0., 0., 1., 0.],
          [0., 0., 0., 1.]]]])
attention size: tensor([[[[ 0.69,  0.35, -0.89, -0.22],
          [-1.48, -0.28,  1.72,