In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import fastai.layers as L

torch.backends.cudnn.deterministic = True

In [2]:
def embed_initializer(weight_matrix, non_trainable = True):
    
    num_embeds, dim_embeds = weight_matrix.shape
    weight_matrix = torch.from_numpy(weight_matrix)
    
    embed_layer = nn.Embedding(num_embeds, dim_embeds, padding_idx = 1).to('cuda')
    embed_layer.load_state_dict({'weight' : weights_matrix})
    
    embed_layer.weight.requires_grad = False if non_trainable else True
    
    return embed_layer

In [3]:
def Residual_Block(input_channels, num_layers, self_attention = True, bottle = False, 
                   leaky = True):
    
    channel = input_channels
    
    layers = []
    
    conv_kwargs = {'is_1d': True, 'self_attention': self_attention, 'leaky' : leaky}
    
    for i in range(0, num_layers):
        layers.append(L.res_block(channel, dense = True, bottle = bottle, **conv_kwargs))
        layers.append(nn.Dropout(inplace = True))
    
    return nn.Sequential(*layers)

In [4]:
class Bottle_Drop(nn.Module):
    
    def __init__(self, input_channels):
        
        super().__init__()
        self.BottleNeck = L.conv1d(input_channels, input_channels * 2, stride = 2)
        
    def forward(self, X):
        
        X_dropped = F.dropout(X, training = self.training)
        
        return X_dropped + self.BottleNeck(X.orig)
    
def Trans_Block(input_channels):
    
    t_block = L.SequentialEx
    (
    L.conv1d(input_channels, input_channels * 2, ks = 3, stride = 2, padding = 1),
    Bottle_Drop(input_channels)
    )
    
    return t_block

In [5]:
class ResDongNet(nn.Module):
    
    def __init__(self, weights_matrix, layers = (3, 4, 5, 3), 
                 input_channel = 16, bottle=False, embed_dim = 50, 
                 static = True):
        
        super(Residual, self).__init__(**kwargs)
        
        self.embedding = embed_initializer(weights_matrix, static)
        self.init_conv = L.conv1d(1, input_channel, ks = (3, embed_dim), 
                                  stride = 1, padding = (1,0), bias = False)
        
        self.features = nn.Sequential(
        ('Conv1d', self.init_conv),
        ('BatchNorm', nn.BatchNorm1d(input_channel)),
        ('LeakyReLU', nn.LeakyReLU(inplace=True))
        )
        
        for no, layer in enumerate(layers):
            self.features.add_module('Residual_Block %d' %(i+1),
                                    self.Residual_Block(input_channels, num_layers))
            
            if i < len(layers):
                self.features.add_module('Trans_Block %d' %(i+1),
                                        Trans_Block(input_channel))
                
        self.LeakyReLU = L.LeakyReLU(inplace=True)
        self.Avgpool = nn.AdaptiveAvgPool1d(1)
        self.FC1 = nn.Linear(input_channel, input_channel * 2)
        self.classifier = nn.Linear(input_channel * 2, input_channel * 4)
        
        for mod in self.modules():
            if isinstance(mod, nn.BatchNorm1d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            
            elif isinstance(mod, nn.Linear):
                nn.init.constant_(m.bias, 0)
                
    def forward(self, X):
        
        X = self.embedding(X).unsqueeze(1)
        
        for no, layer in enumerate(self.features):
            if i == 1:
                X = X.squeeze(3)
            X = layer(X)
            
        X = self.Avgpool(X).view(x.size(0), -1)
        X = self.FC1(X)
        X = self.LeakyReLU(x)
        X = self.classifier(X)
        
        return X