In [1]:
import sys
sys.path.append("D:\\WETrak")

import torch
from torch import nn
from typing import Callable, Optional
from torch import Tensor
import torch.nn.functional as F

### Encoder

In [2]:
class Encoder(nn.Module):
    def __init__(self,channel):
        super(Encoder,self).__init__()

        self.down_block1 = nn.Sequential(nn.Conv2d(1, channel, 3, padding=1), 
                                         nn.BatchNorm2d(channel), 
                                         nn.ReLU(True), 
                                         nn.Dropout(p=0.05), 
                                         nn.MaxPool2d((5,1), stride=(5,1)))
        
        self.down_block2 = nn.Sequential(nn.Conv2d(channel, channel*4, 3, padding=1), 
                                         nn.BatchNorm2d(channel*4), 
                                         nn.ReLU(True), 
                                         nn.Dropout(p=0.05), 
                                         nn.MaxPool2d((4,1), stride=(4,1)))
        
        self.down_block3 = nn.Sequential(nn.Conv2d(channel*4, channel*8, 3, padding=1), 
                                         nn.BatchNorm2d(channel*8), 
                                         nn.ReLU(True), 
                                         nn.Dropout(p=0.05), 
                                         nn.MaxPool2d((2,1), stride=(2,1)))
        
        # initialize weights
        self._initialize_weights()
        
    def forward(self,x):# 
        # downsample
        x = self.down_block1(x) # [batch, 32, 200, 1]
        x = self.down_block2(x) # [batch, 128, 50, 1]
        x = self.down_block3(x) # [batch, 256, 25, 1]
        
        return x
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

### Attention

In [3]:
class BasicConv(nn.Module):
    def __init__(self, in_planes, out_planes, kernel_size, stride=1, padding=0, dilation=1, groups=1, relu=True, bn=True, bias=False):
        super(BasicConv, self).__init__()
        self.out_channels = out_planes
        self.conv = nn.Conv2d(in_planes, out_planes, kernel_size=kernel_size, stride=stride, padding=padding, dilation=dilation, groups=groups, bias=bias)
        self.bn = nn.BatchNorm2d(out_planes,eps=1e-5, momentum=0.01, affine=True) if bn else None
        self.relu = nn.ReLU() if relu else None
        
        # initialize weights
        self._initialize_weights()

    def forward(self, x):
        x = self.conv(x)
        if self.bn is not None:
            x = self.bn(x)
        if self.relu is not None:
            x = self.relu(x)
        return x
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)
                

class ChannelPool(nn.Module): # cat([B,1,H,W],[B,1,H,W]) -> [B,2,H,W]
    def forward(self, x):
        out = torch.cat( (torch.max(x,1)[0].unsqueeze(1), torch.mean(x,1).unsqueeze(1)), dim=1 )
        
        return out
    
class Attention(nn.Module):
    def __init__(self,time_step,finger_type):
        super(Attention, self).__init__()
        self.finger_type = finger_type
        
        kernel_size = 7
        self.compress = ChannelPool()
        self.spatial = BasicConv(2, 1, kernel_size, stride=1, padding=(kernel_size-1) // 2, relu=False)
        
        self.fc = nn.Sequential(nn.Flatten(),
                                nn.Linear(time_step*5,5))
        
        # initialize weights
        self._initialize_weights()
    
    def forward(self, x):
        x_compress = self.compress(x) #[B,2,H,W]
        x_out = self.spatial(x_compress) #[B,1,H,W]
        
        x_out = self.fc(x_out) # [B,W]
        scale = F.softmax(x_out,dim=1).unsqueeze(1).unsqueeze(2) # [B,1,1,W]
        
        return x * scale
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.constant_(m.weight, 0)
                
                if self.finger_type == ['Pinky']:
                    m.bias = nn.parameter.Parameter(torch.Tensor((1,0,0,0,1)))
                    
                elif self.finger_type == ['Thumb']:
                    m.bias = nn.parameter.Parameter(torch.Tensor((0,0,1,1,0)))
                    
                elif self.finger_type == ['Index','Middle','Ring']:
                    m.bias = nn.parameter.Parameter(torch.Tensor((0,1,0,0,1)))


### Embedding

In [4]:
def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d:
    """3x3 convolution with padding"""
    return nn.Conv2d(
        in_planes,
        out_planes,
        kernel_size=3,
        stride=stride,
        padding=dilation,
        groups=groups,
        bias=False,
        dilation=dilation,
    )

def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

class Bottleneck(nn.Module):

    expansion: int = 1

    def __init__(
        self,
        inplanes: int,
        planes: int,
        stride: int = 1,
        downsample: Optional[nn.Module] = None,
        groups: int = 1,
        base_width: int = 64,
        dilation: int = 1,
        norm_layer: Optional[Callable[..., nn.Module]] = None,
    ) -> None:
        super().__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        width = int(planes * (base_width / 64.0)) * groups
        # Both self.conv2 and self.downsample layers downsample the input when stride != 1
        self.conv1 = conv1x1(inplanes, width)
        self.bn1 = norm_layer(width)
        self.conv2 = conv3x3(width, width, stride, groups, dilation)
        self.bn2 = norm_layer(width)
        self.conv3 = conv1x1(width, planes * self.expansion)
        self.bn3 = norm_layer(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x: Tensor) -> Tensor:
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out
    

### Decoder

In [5]:
class Decoder(nn.Module):
    def __init__(self,finger_type,is_norm,channel=32):
        super(Decoder,self).__init__()

        self.up_blcok1 = nn.Sequential(nn.Conv2d(channel*8, channel*4, 3, padding=1), 
                                       nn.BatchNorm2d(channel*4), 
                                       nn.ReLU(True), 
                                       nn.Dropout(p=0.05), 
                                       nn.Upsample(scale_factor=(5,2)))
        
        self.up_blcok2 = nn.Sequential(nn.Conv2d(channel*4, channel, 3, padding=1), 
                                       nn.BatchNorm2d(channel), 
                                       nn.ReLU(True), 
                                       nn.Dropout(p=0.05), 
                                       nn.Upsample(scale_factor=(4,2)))
        
        if is_norm == True:
            self.up_blcok3 = nn.Sequential(nn.Conv2d(channel, 1, 3, padding=1), 
                                           nn.BatchNorm2d(1), 
                                           nn.Sigmoid(), 
                                           nn.Dropout(p=0.05), 
                                           nn.Upsample(scale_factor=(2,len(finger_type))))
            
        elif is_norm == False:
            self.up_blcok3 = nn.Sequential(nn.Conv2d(channel, 1, 3, padding=1), 
                                           nn.BatchNorm2d(1), 
                                           nn.ReLU(True), 
                                           nn.Dropout(p=0.05), 
                                           nn.Upsample(scale_factor=(2,len(finger_type))))
        
        # initialize weights
        self._initialize_weights()
        
    def forward(self,x):
        # upsample
        x = self.up_blcok1(x) # [batch, 128, 125, 4]
        x = self.up_blcok2(x) # [batch, 32, 500, 4]
        x = self.up_blcok3(x)
        
        return x
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

### Tracker Network

In [6]:
class Autoencoder(nn.Module):
    def __init__(self,finger_type,is_norm,emg_channel=5,time_step=25,channel=32,is_att=True):
        super(Autoencoder,self).__init__()
        self.emg_channel = emg_channel
        self.is_att = is_att
        
        # downsample
        self.encoder = Encoder(channel)
        
        # w/0 attention
        self.max_pool = nn.MaxPool2d((1,emg_channel), stride=(1,emg_channel))
        
        # Resnet
        self.resnet = Bottleneck(inplanes=channel*8,planes=channel*8)
        # upsample
        self.decoder = Decoder(finger_type,is_norm)
        
        # initialize weights
        self._initialize_weights()
        
        # attention
        self.attention = Attention(time_step,finger_type)
        
    def forward(self,x):# x:[1,1,1000,5or2]
        # downsample
        x = self.encoder(x)
        
        # w/o attention
        if self.is_att == False:
            x = self.max_pool(x)
        # attention
        elif self.is_att == True:
            x = self.attention(x)
            x = self.max_pool(x)
        
        # resnet
        x = self.resnet(x)
        x = self.resnet(x)
        x = self.resnet(x)
        x = self.resnet(x)
        x = self.resnet(x)    
        
        # upsample
        x = self.decoder(x)
        
        return x
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)