In [None]:
"""
The model is designed to analyze the consensus sequences in DNA sequences

The input of this model will be in the 2D matrix:    
    dimension: n x 1000, n is the amount of the input samples

The output of the model will be in the 3D matrix:
    dimension: n x 1000 x 4, n is the amoutn of the input samples
               the meaning of the output is the position-wise appearance for each alphabets(ATCG)

"""

In [None]:
import math
import torch
import time
import os
import numpy as np
from collections import Counter
from torch import nn, Tensor
from torch.nn import TransformerEncoder, TransformerEncoderLayer,\
    TransformerDecoder, TransformerDecoderLayer
from torch.nn.functional import softmax

In [None]:
'''
Global variables
'''

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

alphabet_dict = {'A' : 1, 'T' : 2, 'C' : 3, 'G' : 4}

In [None]:
f = open('','r')
seq = {}

for line in f:
    if line.startswith('>'):
        name = line.replace('>', '').split()[0]
        seq[name] = ''
    else:
        seq[name] += line.replace('\n','').strip()
        
f.close()

seq_ls = []
for name in seq:
    seq_ls.append(seq[name])

In [None]:
class TransformerModel(nn.Module): #done
    def __init__(
        self,
        d_model,
        dropout,
        max_len,
        nhead,
        encoder_layer_nums,
        decoder_lyaer_nums,
        dim_ff,
        ntoken,
    ):
        '''
        
        '''
        super(TransformerModel, self).__init__()
        
        self.d_model = d_model
        self.pos_encode = PositionalEmbedding(d_model, dropout, max_len)
        encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_ff, dropout)
        self.encoder = TransformerEncoder(encoder_layer, encoder_layer_nums)
        decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_ff, dropout)
        self.decoder = TransformerDecoder(decoder_layer, decoder_layer_nums)
        self.embedding = nn.Embedding(ntoken, d_model)
        self.out_embed = nn.Embedding(ntoken, d_model)
        self.output_linear = nn.Linear(d_model, ntoken)
        self.output_softmax = nn.Softmax(dim = -1)
        
        self.init_weights()

    
    def init_weights(self):
        initrange = 0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.output_linear.bias.data.zero_()
        self.output_linear.weight.data.uniform_(-initrange, initrange)
    
    def forward(
        self,
        src,
        tgt,
        src_mask = None,
        tgt_mask = None,
        memory_mask = None,
        src_key_padding_mask = None,
        tgt_key_padding_mask = None,
        memory_key_padding_mask = None,
    ):
        
        src = self.embedding(src)
        tgt = self.embedding(tgt)
        
        src = self.pos_encode(src)
        tgt = self.pos_encode(tgt)
        
        memory = self.encoder(src, mask = src_mask, src_key_padding_mask = src_key_padding_mask)
        output = self.decoder(tgt, memory, 
                              tgt_mask = tgt_mask, 
                              memory_mask = memory_mask, 
                              tgt_key_padding_mask = tgt_key_padding_mask,
                              memory_key_padding_mask = memory_key_padding_mask
                             )
        output = self.output_linear(output)
        output = self.output_softmax(output)
        
        return output

In [None]:
class PositionalEmbedding(nn.Module): #done
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 1000):
        super(PositionalEmbedding, self).__init__()
        self.dropout = nn.Dropout(p = dropout)
        
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp( torch.arange(0, d_model, 2) * (-math.log(10000) / d_model) )
        pe = torch.zeros(1, max_len, d_model)
        pe[0, :, 0::2] = torch.sin(position * div_term)
        pe[0, :, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)
    
    def forward(self, x: Tensor):
        x = x + self.pe[0, :x.size(1), :].requires_grad_(False)
        output = self.dropout(x)
        return output
    

In [None]:
class EM_algorithm(object):
    def __init__(self, source_data, W):
        
        self.source_data = source_data
        self.data_info = self.info_get()
        self.W = W
        self.N = None
        self.l = None
        self.X = None
        self.n = None
        self.z = None
        self.I = None
        
        self.init_variables()
        
        
    def init_variables(self):
        '''
        initialize the followings: X, f_i, z, I
        '''
        
        # record N and small L
        self.N = len(self.source_data)
        self.l = list()
        for seq in self.source_data:
            self.l.append(len(seq))

        # init small z
        z = np.ones((N, max(l)))
        for idx in range(N):
            z[idx][(l[idx]- self.W + 1):] = 0
        
        # init W-mer set X
        X_ls = list()
        for i in range(self.N):
            X_ls += [ self.source_data[i][j:j+self.W] for j in range( self.l[i] - self.W + 1 ) ]
        self.X = X_ls
        self.n = len(self.X)
        
        # init indicator
        self.I = self.indicator_function()
    
    
    def indicator_function(self): #done
        '''
        In article, it is the I(k,a) function for eq(7), (8)
        There will transfer the alphabets to the index

        return
        indicator: transfer input string into tensor indicator, size is n x W
        '''
        assert isinstance(self.X, list), 'Type of X is not list'
        indicator = list()
        for seq in self.X:
            indicator.append(list(map(lambda x: alphabet_dict[x], seq)))

        return np.array(indicator, dtype = 'int')
        
    
    def condi_distribution(self, freq_letter):
        '''
        Calculate the conditional distribution p(Xi | theta_j)
        eq(7),(8) in the MEME article
        To avoid the computation error for the digits, it will use ln() to make it being summation
    
        Arguments:
    
        freq_letter: the frequences for each letter in each position, size: L x 4
                     background ( 1 x 4 ) + motif ( (L-1) x 4 )
        n: amount of the input sequences
        W: length of the input sequences
        ====================================================
        return:
    
        p_Xi_1: conditional distribution of motif sequence, size: n x 1
        p_Xi_2: conditional distribution of background, size: n x 1
        '''
    
        p_Xi_1 = np.zeros(n)
        p_Xi_2 = np.zeros(n)
        f_0 = freq_letter[0]
        f_j = freq_letter[1:]
    
        for subseq in range(n):
            for pos in range(W):
                p_Xi_1[subseq] += np.log( f_j[ self.I[subseq][pos] - 1 ] )
                p_Xi_2[subseq] += np.log( f_0[ self.I[subseq][pos] - 1 ] )
        
        return [p_Xi_1, np, p_Xi_2]
    
    def count_letter(self): #done
        '''
        count the total appearance times for each alphabets
        Arguments:
        data: input source W-mer data, size: n x W
        return:
        count: counting results, size: n x 4
        '''

        count = np.zeros( (self.n, 4) , dtype = 'int')

        for i in range(self.n):
            C = Counter(self.X[i])
            count[i][0] = C['A']
            count[i][1] = C['T']
            count[i][2] = C['C']
            count[i][3] = C['G']

        return count
    
    def update_erase(self):
        '''
        update the erasing values
        in article, in the MM implementation section
        '''
        
        return 
    
    def E_step(self, condi_dis, lamb): #done
        '''
        calculate the Z_ij, in article's eq(4)

        Arguments:
        condi_dis: conditional distribution, from the defined function, size: n x 2
        lamd: probability for using models, size: 1 x 2
        return:
        Z: membership probability, size: n x 2
        '''

        multi_results = condi_dis * np.tile(lamb, (self.n, 1))
        summation = np.sum(multi_results, axis = 1, keepdims = True)
        Z = multi_results / summation

        return Z
    
    def M_step(self, Z, I, count):
        '''
        Arguments:
        Z: membership from E-step, size: n x 2
        I: indicator function, size: n x W
        count: count the appearance time for each alphabet in every sequences, size: n x 4
        '''

        n, W = len(I), len(I[0])
        Z = Z.transpose()
        count = count.transpose()

        # update lambda, eq(5)
        lamb = np.mean(Z, axis = 0)

        # update f_ij
        # calculate the c_0k and c_jk
        c_0k = np.zeros((1, 4))
        c_jk = np.zeros((W, 4))

        for i in range(4):
            c_0k[0][i] = np.sum( Z[1] * count[i] )


In [None]:
def condi_distribution(freq_letter, indicator, n, W):
    '''
    Calculate the conditional distribution p(Xi | theta_j)
    eq(7),(8) in the MEME article
    To avoid the computation error for the digits, it will use ln() to make it being summation
    
    Arguments:
    
    freq_letter: the frequences for each letter in each position, size: L x 4
                 background ( 1 x 4 ) + motif ( (L-1) x 4 )
    indicator: indicator function, size: n x L
    n: amount of the input sequences
    W: length of the input sequences
    ====================================================
    return:
    
    p_Xi_1: conditional distribution of motif sequence, size: n x 1
    p_Xi_2: conditional distribution of background, size: n x 1
    '''
    
    p_Xi_1 = np.zeros(n)
    p_Xi_2 = np.zeros(n)
    f_0 = freq_letter[0]
    f_j = freq_letter[1:]
    
    for subseq in range(n):
        for pos in range(W):
            p_Xi_1[subseq] += np.log( f_j[ indicator[subseq][pos] - 1 ] )
            p_Xi_2[subseq] += np.log( f_0[ indicator[subseq][pos] - 1 ] )
        
    return p_Xi_1, np, p_Xi_2

In [None]:
def indicator_function(data): #done
    '''
    In article, it is the I(k,a) function for eq(7), (8)
    There will transfer the alphabets to the index
    
    Arguments
    data: input W-mer data, dtype is list of string, size: n x W
    
    return
    indicator: transfer input string into tensor indicator, size is n x W
    '''
    
    if len(data) <= 1 or len(data[0]) <= 1:
        print('Error: the input data has less than 1 sequence')
    
    n, W = len(data), len(data[0])
    indicator = list()
    prev = W
    
    for seq in data:
        indicator.append(list(map(lambda x: alphabet_dict[x], seq)))
        assert len(seq) == W, "There is a sequence has length error(>W or <W, ...>W<?)"
    
    return np.array(indicator, dtype = 'int')

In [None]:
def group_membership(condi_dis, lamb): #done
    '''
    calculate the Z_ij, in article's eq(4)
    
    Arguments:
    condi_dis: conditional distribution, from the defined function, size: n x 2
    lamd: probability for using models, size: 1 x 2
    return:
    Z: membership probability, size: n x 2
    '''
    
    n = len(condi_dis)
    multi_results = condi_dis * np.tile(lamb, (n, 1))
    summation = np.sum(multi_results, axis = 1, keepdims = True)
    Z = multi_results / summation
    
    return Z

In [None]:
def count_letter(data): #done
    '''
    count the total appearance times for each alphabets
    Arguments:
    data: input source W-mer data, size: n x W
    return:
    count: counting results, size: n x 4
    '''
    
    n, W = len(data), len(data[0])
    
    count = np.zeros((n,4), dtype = 'int')
    
    for i in range(n):
        C = Counter(data[i])
        count[i][0] = C['A']
        count[i][1] = C['T']
        count[i][2] = C['C']
        count[i][3] = C['G']
    
    return count

In [None]:
def erase(data, W):
    '''
    Set up the erasing factor at first
    
    data: the original data, denoted as Y, size: N x l_i
    W: specify been W-mer
    l: small L, start with list to store the length for each input sequences
    '''
    
    N = len(data)
    l = list()
    
    for seq in data:
        l.append(len(seq))
    
    z = np.ones((N, max(l)))
    
    for idx in range(N):
        z[idx][(l[idx]- W + 1):] = 0
        
    return z

In [None]:
def M_step(Z, I, count):
    '''
    Arguments:
    Z: membership from E-step, size: n x 2
    I: indicator function, size: n x W
    count: count the appearance time for each alphabet in every sequences, size: n x 4
    '''
    
    n, W = len(I), len(I[0])
    Z = Z.transpose()
    count = count.transpose()
    
    # update lambda, eq(5)
    lamb = np.mean(Z, axis = 0)
    
    # update f_ij
    # calculate the c_0k and c_jk
    c_0k = np.zeros((1, 4))
    c_jk = np.zeros((W, 4))
    
    for i in range(4):
        c_0k[0][i] = np.sum( Z[1] * count[i] )
    

In [None]:
class trainer(object):
    def __init__(self, model):
        
        self.model = model
        
        
    def posterior_computation(self):
        
        
    
    def train(self):
        
        
        

In [None]:
if __name__ == '__main__':
    '''
    data_info = {amount of sequence, }
    '''
    # read data
    data_info
    # build model and train