In [1]:
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.optim.lr_scheduler import ExponentialLR, StepLR
from torch.nn.utils import clip_grad_norm_
from torch.utils.data import Dataset, DataLoader

import numpy as np
import matplotlib.pyplot as plt

from time import sleep
from tqdm.notebook import tqdm, trange

# LSTM with Attention

In [25]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        
        self.input_embedder = nn.Linear(input_dim, hidden_dim)
        self.encoder = nn.LSTM(hidden_dim, hidden_dim)
            
    def forward(self, x):
        enc_input = self.input_embedder(x)
        out, (h, c) = self.encoder(enc_input)
        return out

In [970]:
class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()
        
        self.MLP_q = nn.Linear(hidden_dim, hidden_dim)
        self.MLP_k = nn.Linear(hidden_dim, hidden_dim)
        self.MLP_v = nn.Linear(hidden_dim, hidden_dim)
        
    def attention(self, h_enc, h_dec):
        q = self.MLP_q(h_dec).unsqueeze(1)
    
        score_list = list()
        for t in range(ht):
            k = self.MLP_k(h_enc[:,t,:]).unsqueeze(1)
            
            q_dot_k = torch.bmm(q, k.permute(0,2,1)).squeeze(2).squeeze(1)
            sqrt_H = torch.tensor(hidden_dim).float().sqrt()
            score = q_dot_k / sqrt_H 
            score_list.append(score)
            

        score = torch.stack(score_list, dim = 1)
        att_q_k = nn.Softmax(dim = 1)(score).unsqueeze(2)
        return att_q_k
    
    def context(self, att, h_enc):
        v = self.MLP_v(h_enc)
        context = (att * v).sum(dim = 1)
        return context
    
    def forward(self, h_enc, h_dec):
        attention = self.attention(h_enc, h_dec)
        context = self.context(attention, h_enc)
        return context

In [898]:
class AdditiveAttention(nn.Module):
    def __init__(self):
        super(AdditiveAttention, self).__init__()
        
        self.Wq_dot_ = nn.Linear(hidden_dim, hidden_dim)
        self.Wk_dot_ = nn.Linear(hidden_dim, hidden_dim)
        self.wa_dot_ = nn.Linear(hidden_dim, 1)

    def forward(self, h_enc, h_dec):
        
        # Calculate Attention
        q = h_dec
        score_list = list()
        for t in range(ht):
            k = h_enc[:,t,:]
            Wk_dot_k = self.Wk_dot_(k)
            
            q = h_dec.squeeze()
            Wq_dot_q = self.Wq_dot_(q)
            
            a = nn.Tanh()(Wq_dot_q + Wk_dot_k)
            wa_dot_a = self.wa_dot_(a)
            
            score_list.append(wa_dot_a)
        score = torch.stack(score_list, dim = 1)
            
        att_q_k = nn.Softmax(dim = 1)(score)
        
        # Calculate Context Vector
        v = h_enc
        context = (att_q_k * v).sum(dim = 1)
            
        return context

In [931]:
class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        
        self.decoder_cell = nn.LSTMCell(hidden_dim, hidden_dim)
        #self.attention = AdditiveAttention()
        self.attention = ScaledDotProductAttention()
        self.output_embedder = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, h_enc):
        
        h = torch.zeros(h_enc.shape[0], hidden_dim)
        c = torch.zeros(h_enc.shape[0], hidden_dim)
        
        h_dec = list()
        for t in range(ft):
            x = self.attention(h_enc, h)
            h, c = self.decoder_cell(x, (h,c))
            h_dec.append(h)
            
        h_dec = torch.stack(h_dec, dim = 1)
        
        logits = self.output_embedder(h_dec)
        log_pis = nn.LogSoftmax(dim = -1)(logits)
        return log_pis

In [138]:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        
        self.encoder = Encoder()
        self.decoder = Decoder()
        
    def forward(self, x):
        
        h_enc = self.encoder(x)
        y = self.decoder(h_enc)
        return y

In [902]:
def CrossEntropyLoss(log_p_y_x, y):
    
    y_OH = batch_to_one_hot(y, num_cats)
    E_i_t = (log_p_y_x * y_OH).sum(dim = 2) 
    E_i = E_i_t.sum(dim = 1) 
    E = E_i.mean(dim = 0)
    
    return -E

In [903]:
bs = 256

input_dim = 10
hidden_dim = 32
output_dim = 10

ht = 8
ft = 3

# Attention is All you need paper implementation

In [286]:
def batch_to_one_hot(batch_cat_id, num_cats):
    """
    Arguments
    ---------
    batch_cat_id : torch.tensor [bs, seq_len, 1]
    
    Returns
    -------
    batch_cat_OH : torch.tensor [bs, seq_len, num_cats]
    
    """
    cat_samples = batch_cat_id.chunk(len(batch_cat_id), dim = 0)
    batch_cat_OH = list()
    for cat_sample in cat_samples:
        cat_id = cat_sample.squeeze()
        cat_OH = torch.zeros(len(cat_id), num_cats)
        cat_OH[torch.arange(len(cat_id)), cat_id] = 1
        batch_cat_OH.append(cat_OH)

    return torch.stack(batch_cat_OH, dim = 0)

In [285]:
class SingleHeadAttention(nn.Module):
    """Single Attention Head
    
    Arguments
    ---------
    Q : torch.tensor [bs, seq_len, d_model]
    K : torch.tensor [bs, seq_len, d_model]
    V : torch.tensor [bs, seq_len, d_model]
    
    Returns
    -------
    Y : torch.tensor [bs, seq_len, d_v]
    """
    
    def __init__(self):
        super(SingleHeadAttention, self).__init__()
        
        self.query_weights = nn.ModuleList([
            nn.Linear(d_model, d_q) 
            for ts in range(max_seq_len)
        ])
        
        self.key_weights = nn.ModuleList([
            nn.Linear(d_model, d_k) 
            for ts in range(max_seq_len)
        ])
        
        self.value_weights = nn.ModuleList([
            nn.Linear(d_model, d_v) 
            for ts in range(max_seq_len)
        ])
        
        
    def linear(self, Q, K, V):

        Q = torch.stack([self.query_weights[ts](Q[:,ts,:])
                        for ts in range(Q.shape[1])], 
                        dim = 1)
        
        K = torch.stack([self.key_weights[ts](K[:,ts,:])
                        for ts in range(K.shape[1])], 
                        dim = 1)
        
        V = torch.stack([self.value_weights[ts](V[:,ts,:])
                        for ts in range(V.shape[1])], 
                        dim = 1)
        return Q, K, V
    
    
    def attention(self, Q, K, V):
        S = torch.bmm(Q, K.permute(0,2,1)) / torch.sqrt(torch.tensor(d_k))
        W = nn.Softmax(dim = 2)(S)
        Y = torch.bmm(W, V)
        return Y

    
    def forward(self, Q, K, V):
        Q, K, V = self.linear(Q, K, V)
        Y = self.attention(Q, K, V)
        return Y

In [284]:
class MaskedSingleHeadAttention(SingleHeadAttention):
    """Single Attention Head
    
    Arguments
    ---------
    Q : torch.tensor [bs, seq_len, d_model]
    K : torch.tensor [bs, seq_len, d_model]
    V : torch.tensor [bs, seq_len, d_model]
    
    Returns
    -------
    Y : torch.tensor [bs, seq_len, d_v]
    
    
    Difference Compared to Regular SingleHeadAttention is the changed Attention weight Matrix.
    It ensures that every Prediction timestep has only access to itself and previous timesteps.
    
    [w 0 0 0 0]    |
    [w w w 0 0]   output
    [w w w w 0]    |
    [w w w w w]    |
    
    <--input-->
     
    """
    
    def __init__(self):
        super(MaskedSingleHeadAttention, self).__init__()
    
    def put_on_mask(self, S):
        
        actual_seq_len = S.shape[1]
        
        mask = torch.ones((actual_seq_len, actual_seq_len))
        neg_infs = torch.zeros((actual_seq_len, actual_seq_len))
        
        for t in range(actual_seq_len):
            for d in range(actual_seq_len):
                if d > t:
                    mask[t,d] = 0
                    neg_infs[t, d] = -np.inf
                    
        S_masked = S * mask + neg_infs
        return S_masked
    
    
    
    def attention(self, Q, K, V):
        S = torch.bmm(Q, K.permute(0,2,1)) / torch.sqrt(torch.tensor(d_k))
        S = self.put_on_mask(S)
        W = nn.Softmax(dim = 2)(S)
        Y = torch.bmm(W, V)
        return Y

In [5]:
class MultiHeadAttention(nn.Module):
    
    def __init__(self):
        super(MultiHeadAttention, self).__init__()
        
        self.h = h
        
        self.heads = nn.ModuleList([
            SingleHeadAttention()
            for _ in range(h)
        ])
        
        self.dense = nn.Linear(h * d_v, d_model)
        
    def forward(self, Q, K, V):
        y_SH_cat = torch.cat([self.heads[i](Q, K, V) 
                           for i in range(self.h)], 
                          dim = 2)
        y_MH = self.dense(y_SH_cat)
        y_MH = nn.Softmax(dim = 2)(y_MH)
        return y_MH

In [6]:
class MaskedMultiHeadAttention(MultiHeadAttention):
    
    def __init__(self):
        super(MaskedMultiHeadAttention, self).__init__()
                
        self.heads = nn.ModuleList([
            MaskedSingleHeadAttention()
            for _ in range(h)
        ])

In [8]:
class FeedForward(nn.Module):
    
    def __init__(self):
        super(FeedForward, self).__init__()
        
        self.linear_in = nn.Linear(d_model, d_ff)
        self.linear_out = nn.Linear(d_ff, d_model)
        
    def forward(self, x):
        h = nn.ReLU()(self.linear_in(x))
        y = self.linear_out(h)
        return y

In [283]:
class EncoderLayer(nn.Module):
    """Single Encoder Layer 
    
    Attributes
    ----------
    multi_head_attention
    add_and_norm
    
    feed_forward 
    add_and_norm
    
    """
    
    def __init__(self):
        super(EncoderLayer, self).__init__()
        
        self.multi_head_attention = MultiHeadAttention()
        self.add_and_norm_MHA = AddAndNorm()
        
        self.feed_forward = FeedForward()
        self.add_and_norm_ff = AddAndNorm()
        
    def forward(self, x_emb):
        MHA = self.multi_head_attention(Q = x_emb, K = x_emb, V = x_emb)
        MHA_norm = self.add_and_norm_MHA(MHA, x_emb)
        
        ff = self.feed_forward(MHA_norm)
        z = self.add_and_norm_ff(ff, MHA_norm)
        
        return z

In [281]:
class DecoderLayer(nn.Module):
    """Single Decoder Layer 
    
    Attributes
    ----------
    masked_multi_head_attention 
    add_and_norm
    
    multi_head_attention
    add_and_norm
    
    feed_forward 
    add_and_norm
    
    """
    def __init__(self):
        super(DecoderLayer, self).__init__()
        
        self.masked_multi_head_attention = MaskedMultiHeadAttention()
        self.add_and_norm_MMHA = AddAndNorm()
        
        self.multi_head_attention = MultiHeadAttention()
        self.add_and_norm_MHA = AddAndNorm()
        
        self.feed_forward = FeedForward()
        self.add_and_norm_ff = AddAndNorm()
    
    def forward(self, output, latent):
        MMHA = self.masked_multi_head_attention(Q = output, K = output, V = output)
        MMHA_norm = self.add_and_norm_MMHA(output, MMHA)

        MHA = self.multi_head_attention(Q = MMHA_norm, K = latent, V = latent)
        MHA_norm = self.add_and_norm_MHA(MMHA_norm, MHA)
        
        ff = self.feed_forward(MHA_norm)
        ff_norm = self.add_and_norm_ff(ff, MHA_norm)
        
        return ff_norm

In [280]:
class AddAndNorm(nn.Module):
    """Add State and it's copy that has passed through Attention and use Layer Normalization
    """
    def __init__(self):
        super(AddAndNorm, self).__init__()
        self.normalize = nn.LayerNorm(d_model)
        
    def forward(self, x, y):
        return self.normalize(x + y)

In [279]:
class Encoder(nn.Module):
    """Sequence of Encoder Layers
    
    Attributes
    ----------
    N : int
        Number Encoder Layers
        
    """
    
    def __init__(self):
        super(Encoder, self).__init__()
        
        self.layers = nn.ModuleList([
            EncoderLayer()
            for _ in range(N)
        ])
    
    def forward(self, x_emb):
        z = x_emb
        N_z = list()
        for layer in self.layers:
            z = layer(z)
            N_z.append(z)
            
        return N_z

In [277]:
class Decoder(nn.Module):
    
    """Sequence of Decoder Layers
    
    Attributes
    ----------
    N : int
        Number Decoder Layers
        
    """
    def __init__(self):
        super(Decoder, self).__init__()
        
        self.layers = nn.ModuleList([
            DecoderLayer()
            for _ in range(N)
        ])
        
    def forward(self, y, N_z):    
        for n, layer in enumerate(self.layers):
            z = N_z[n]
            y = layer(y,z) 
        return y

In [275]:
class Transformer(nn.Module):
    """Transformer - Timeseries Forecasting
    
    Attributes
    ----------
    task_type : str 
    
        'regression' : input / output OHE [bs, time, cat]   
        'classification' : input / output [bs, time, dim]
    
    Methods
    -------
    run_encoder()
        Run all parts of the Encoding process
        
    run_decoder_inference()
        Run all parts of the Decoding process in the Prediction Mode
        
    run_decoder_train()
        Run all parts of the Decoding process in the Training Mode, when labels are known in advance
        
    predict()
        Run full Trainsformer in the Prediction Mode, returning a prediction
        
    train_loss()
        Run full Trainsformer in the Training Mode, returning the training loss
        
    """
    def __init__(self):
        super(Transformer, self).__init__()
        
        self.task_type = 'regression'# 'classification' 
        
        self.enc_input_embedder = nn.Linear(d_input, d_model)
        self.encoder = Encoder()
        
        self.dec_ouput_embedder = nn.Linear(d_input, d_model)
        self.decoder = Decoder()
        
        self.dec_linear = nn.Linear(d_model, d_output)
                
    def run_encoder(self, x):
        """Run All parts of the Encoding process
        
        Arguments
        ---------
        x : torch.tensor [bs, ht, d_input]
        
        Returns
        -------
        N_z : list of torch.tensor [bs, ht, d_model]
        
        """
        
        x_emb = self.enc_input_embedder(x)
        x_PE = positional_encoding(seq_len, d_model)
        x_enc_in = x_emb + x_PE
        
        N_z = self.encoder(x_enc_in)
        return N_z
    
    def run_decoder_inference(self, y_previous, N_z):
        """ Decoder Inference Mode: Iteratively
        
        Arguments
        ---------
        N_z : list of torch.tensor [bs, ht, d_model]
            Encoder outputs of history sequence of each of the N layers.
            
        y_previous : torch.tensor [bs, 1, d_input] 
            Initial decoder input. 
            We just copy the last timestep of the history sequence here.
            
        Procedure
        ---------
        First input to the decoder is just like start token.
        The last output timestep of the decoder gets apppended to the input for the next iteration.
        With every Iteration, both the in and output sequence grow, until the desired sequence length is reached.
        The last output sequence is the final output.
        
        Iteration 1: in:[y0]       > out:[y1] 
        Iteration 2: in:[y0,y1]    > out:[y1,y2]
        Iteration 3: in:[y0,y1,y2] > out:[y1,y2,y3]
        
        y_pred = [y1, y2, ..., y_ft]
        
        Returns
        -------
        y_pred : torch.tensor [bs, ft, d_output]
        
        """
        for t in range(ft):
            
            # embedding
            y_emb = self.dec_ouput_embedder(y_previous)
            y_PE = positional_encoding(y_emb.shape[1], d_model)
            y_dec_in = y_emb + y_PE
            
            # decoder 
            y_next = self.decoder(y_dec_in, N_z)
            y_next = y_next[:,-1,:].unsqueeze(1)
            y_next = self.dec_linear(y_next)   
            
            # Ouput Distribution
            if self.task_type == 'classification':
                p_y_x = nn.Softmax(dim = -1)(y_next)
                y_next = p_y_x
                
            if self.task_type == 'regression':
                # hier gegebenfalls Normal distribution aufstellen
                mu = y_next
                y_next = mu
                
            y_previous = torch.cat([y_previous, y_next], dim = 1)
                
        y_pred = y_previous[:,1:,:]
        
        if self.task_type == 'classification':
            cat_id = y_pred.argmax(dim = -1).unsqueeze(-1)
            cat_OH = batch_to_one_hot(cat_id, d)
            return cat_OH
                
        if self.task_type == 'regression':
            return y_pred
        
    
    def run_decoder_train(self, y_0, y_gt, N_z):
        
        """ Decoder Training Mode: Simultaniously all timesteps
        
        Arguments
        ---------
        N_z : list of torch.tensor [bs, ht, d_model]
            Encoder outputs of history sequence of each of the N layers.
            
        y_0 : torch.tensor [bs, 1, d_input] 
            Initial decoder input. 
            We just copy the last timestep of the history sequence here.
            
        y_gt : torch.tensor [bs, ft, d_output]
            
        Procedure
        ---------
        All timesteps are calculated simultaneously.
        
        
        in: [y0, y1, ..., y_ft-1] -> out: [y1, y2, ... , y_ft]
        
          cat[y0 | y_gt[:-1]]     ->           y_gt
                
        Returns
        -------
        y_pred : torch.tensor [bs, ft, d_output]
        
        """
        # concat first token to ground truth
        y_shifted_right = torch.cat([y_0, y_gt[:,:-1,:]], dim = 1)
        
        # embedder
        y_emb = self.dec_ouput_embedder(y_shifted_right)
        y_PE = positional_encoding(y_emb.shape[1], d_model)
        y_dec_in = y_emb + y_PE
        
        # decoder
        y_pred = self.decoder(y_dec_in, N_z)
        y_pred = self.dec_linear(y_pred)
                
        # Ouput Distribution + Loss
        if self.task_type == 'classification':
            p_y_x = nn.Softmax(dim = -1)(y_pred)
            loss = CESequenceLoss(p_y_x, y_gt)
            
        if self.task_type == 'regression':
            # Hier gegebnfalls Normal Distribution
            mu = y_pred
            y_pred = mu
            loss = MSESequenceLoss(y_pred, y_gt)

        return loss
    
    def predict(self, x):
        N_z = self.run_encoder(x)
        first_dec_out = x[:,-1,:].unsqueeze(1)
        y_pred = self.run_decoder_inference(first_dec_out, N_z)
        return y_pred
    
    def train_loss(self, x, y_gt):
        N_z = self.run_encoder(x)
        y_0 = x[:,-1,:].unsqueeze(1)
        loss = self.run_decoder_train(y_0, y_gt, N_z)
        return loss

In [172]:
def CESequenceLoss(p_y_x, y):
    log_p_y_x = torch.log(p_y_x).clamp(min = -100)
    E_i_t = - (y * log_p_y_x).sum(dim = 2)
    E_i = E_i_t.sum(dim = 1)
    E = E_i.mean(dim = 0)
    return E

In [178]:
def MSESequenceLoss(y_pred, y):
    SE_dim_total = ((y_pred - y) ** 2).sum(2)
    SE_seq_total = SE_dim_total.sum(1)
    MSE = SE_seq_total.mean(dim = 0)
    return MSE

In [15]:
import math


def positional_encoding(seq_len, d_model):
    def p(t,k):
        def is_even(x):
            return x % 2 == 0
        def w(k):
            return torch.tensor(1/math.pow(10000, 2*k / d_model))

        if is_even(k):
            return torch.sin(w(k) * t)
        if not is_even(k):
            return torch.cos(w(k) * t)
        
    P = torch.zeros((seq_len, d_model))
    for t in range(seq_len):
        for k in range(d_model):
            P[t,k] = p(t,k)
    return P

def binary_to_float(x):
    d = len(x)
    return torch.tensor([math.pow(2, idx) * x[idx] for idx in range(d)]).sum()

In [267]:
# Model Parameters
ht = 10
ft = 10

seq_len = ht
max_seq_len = 10 # 512
N = 1
h = 1

d_input = 1
d_output = 1
d_model = 512
d_ff = 4 * d_model

d_k = int(d_model / h)
d_v = int(d_model / h)
d_q = int(d_model / h)

In [226]:
# Data for Classification
x = torch.randint(0, d-1, (1,ht,1))
x_OH = batch_to_one_hot(x, d)

y = torch.randint(0, d-1, (1,ft,1))
y_OH = batch_to_one_hot(y, d)

In [247]:
# Data for Regression
t = torch.arange(0, 20, 1)
y = torch.sin(t)
hist = torch.stack([y[:10]], dim = 1).unsqueeze(0)
fut = torch.stack([y[10:]], dim = 1).unsqueeze(0)

In [264]:
# Model
model = Transformer()
optimizer = Adam(model.parameters(), lr = 0.001)
scheduler = StepLR(optimizer, 100, 0.9)

#Training
num_epochs = 300
for epoch in range(num_epochs):
    loss = model.train_loss(hist, fut)
    loss.backward()
    clip_grad_norm_(model.parameters(), 10)
    optimizer.step()
    optimizer.zero_grad()
    scheduler.step()
    if epoch % 5 == 0:
        print(f'loss = {loss.detach()}')

        scheduler.step()

loss = 4.505513668060303
loss = 5.025459289550781
loss = 3.1072521209716797
loss = 1.6260898113250732
loss = 1.6819764375686646
loss = 1.0738043785095215
loss = 0.44290584325790405
loss = 0.4375051259994507
loss = 0.049974408000707626


KeyboardInterrupt: 