# Imports

In [2]:
import torch
import torch.nn as nn


# Models

## Uni-directional LSTM (without attention)

### Encoder

In [3]:
class Encoder1(nn.Module):
    def __init__(self, input_dim, emb_dim, enc_hid_dim, n_layers, dropout):
        super().__init__()

        self.emb_dim = emb_dim
        self.enc_hid_dim = enc_hid_dim
        self.dropout = dropout
        self.n_layers = n_layers

        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.lstm = nn.LSTM(emb_dim, enc_hid_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs, (hidden, cell) = self.lstm(embedded)
       
        return hidden, cell



### Dcoder

In [None]:
class Decoder1(nn.Module):
    def __init__(self, output_dim, emb_dim, dec_hid_dim, n_layers, dropout):
        super().__init__()

        self.emb_dim = emb_dim
        self.output_dim = output_dim
        self.dec_hid_dim = dec_hid_dim
        self.n_layers = n_layers
        self.dropout = dropout

        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.lstm = nn.LSTM(emb_dim, dec_hid_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(dec_hid_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, input, hidden, cell):
        input = input.unsqueeze(0)
        embedded = self.dropout(self.embedding(input))
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc_out(hidden.squeeze(0)) # linear expects as rank 2 tensor as input
        return prediction, hidden, cell



### Seq2Seq

In [None]:
class Seq2Seq1(nn.Module):
    ''' This class contains the implementation of complete sequence to sequence network.
    It uses to encoder to produce the context vectors.
    It uses the decoder to produce the predicted target sentence.
    Args:
        encoder: A Encoder class instance.
        decoder: A Decoder class instance.
    '''
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = trg.shape[1]
        max_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim
        outputs = torch.zeros(max_len, batch_size, trg_vocab_size).to(self.device)
        hidden, cell = self.encoder(src)
        input = trg[0, :]

        for t in range(1, max_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[t] = output
            use_teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.max(1)[1]
            input = (trg[t] if use_teacher_force else top1)
        return outputs

## Uni-directional LSTM (with attention)

### Encoder

In [None]:
class Encoder2(nn.Module):
    def __init__(self, input_dim, emb_dim, enc_hid_dim, n_layers, dropout):
        super().__init__()

        self.emb_dim = emb_dim
        self.enc_hid_dim = enc_hid_dim
        self.dropout = dropout
        self.n_layers = n_layers

        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.lstm = nn.LSTM(emb_dim, enc_hid_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs, (hidden, cell) = self.lstm(embedded)
        return outputs, (hidden.squeeze(0), cell)



### Attention

In [None]:
class Attention2(nn.Module):
    def __init__(self, enc_hid_dim, dec_hid_dim):
        super().__init__()
        
        self.W_a = nn.Linear(enc_hid_dim + dec_hid_dim, dec_hid_dim)
        self.v_a = nn.Parameter(torch.rand(dec_hid_dim)) # same as doing nn.Linear(dec_hid_dim, 1, bias=False)
        self.neg_inf = torch.tensor(-1e7, device=device)
        
    def forward(self, hidden, encoder_outputs, attention_mask):
        #hidden = [batch size, dec hid dim]
        #encoder_outputs = [src len, batch size, enc hid dim]
        #attention_mask = [batch_size, src_len]
        
        batch_size = encoder_outputs.shape[1]
        src_len = encoder_outputs.shape[0]
        
        #repeat encoder hidden state src_len-1 times
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
        
        encoder_outputs = encoder_outputs.permute(1, 0, 2)
        
        #hidden = [batch size, src len, dec hid dim]
        #encoder_outputs = [batch size, src len, enc hid dim]
        
        # attention scoring function - part 1 - tanh(W_a[s;h])
        energy = torch.tanh(self.W_a(torch.cat((hidden, encoder_outputs), dim=2))) 
        
        #energy = [batch size, src len, dec hid dim]
        
        energy = energy.permute(0, 2, 1)
        
        #energy = [batch size, dec hid dim, src len]
        
        #v = [dec hid dim]
        
        v = self.v_a.repeat(batch_size, 1).unsqueeze(1)
        
        #v = [batch size, 1, dec hid dim]
        
        # attention scoring function - part 2 - v_a(tanh(W_a[s;h]))
        attention = torch.bmm(v, energy).squeeze(1)
        
        #attention= [batch size, src len]

        # before computing the softmax, set attention to pad tokens to -infinity
        attention[attention_mask] = self.neg_inf

        # attention scoring function - part 2 - softmax(v_a(tanh(W_a[s;h])))
        return F.softmax(attention, dim=1)



### Decoder

In [None]:
class Decoder2(nn.Module):
    def __init__(self, output_dim, emb_dim, enc_hid_dim, dec_hid_dim, n_layers, dropout, attention):
        super().__init__()
        
        self.output_dim = output_dim
        self.attention = attention
        
        self.embedding = nn.Embedding(output_dim, emb_dim)
        
        self.rnn = nn.LSTM(enc_hid_dim + emb_dim, dec_hid_dim, n_layers, dropout=dropout)
        
        self.fc_out = nn.Linear(enc_hid_dim + dec_hid_dim + emb_dim, output_dim)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, input, hidden, cell, encoder_outputs, attention_mask):
             
        #input = [batch size]
        #hidden = [batch size, dec hid dim]
        #encoder_outputs = [src len, batch size, enc hid dim]
        #attention_mask = [batch_size, src_len]
        input = input.unsqueeze(0)
        
        #input = [1, batch size]
        
        embedded = self.dropout(self.embedding(input))
        
        #embedded = [1, batch size, emb dim]
        # get the attention probabilities
        attention_weights = self.attention(hidden, encoder_outputs, attention_mask)
                
        #attention_weights = [batch size, src len]
        
        attention_weights = attention_weights.unsqueeze(1)
        
        #a = [batch size, 1, src len]
        
        encoder_outputs = encoder_outputs.permute(1, 0, 2)
        
        #encoder_outputs = [batch size, src len, enc hid dim]
        # perform weighted sum of encoder hidden states to get attention output
        weighted = torch.bmm(attention_weights, encoder_outputs)
        
        #weighted = [batch size, 1, enc hid dim]
        
        weighted = weighted.permute(1, 0, 2)
        
        #weighted = [1, batch size, enc hid dim]
        # concatenate the attention outputs (or context vectors) with the current decoder input
        rnn_input = torch.cat((embedded, weighted), dim = 2)
        
        #rnn_input = [1, batch size, (enc hid dim) + emb dim]
        output, (hidden, cell) = self.rnn(rnn_input, (hidden.unsqueeze(0), cell))
        
        #output = [seq len, batch size, dec hid dim * n directions]
        #hidden = [n layers * n directions, batch size, dec hid dim]
        
        #seq len, n layers and n directions will always be 1 in this decoder, therefore:
        #output = [1, batch size, dec hid dim]
        #hidden = [1, batch size, dec hid dim]
        #this also means that output == hidden
        assert (output == hidden).all()
        
        embedded = embedded.squeeze(0)
        output = output.squeeze(0)
        weighted = weighted.squeeze(0)
        
        # classification over the entire word vocabulary
        prediction = self.fc_out(torch.cat((output, weighted, embedded), dim = 1))
        
        #prediction = [batch size, output dim]
        
        return prediction, hidden.squeeze(0), cell, attention_weights



### Seq2Seq

In [None]:
class Seq2Seq2(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        
        #src = [src len, batch size]
        #trg = [trg len, batch size]
        #teacher_forcing_ratio is probability to use teacher forcing
        #e.g. if teacher_forcing_ratio is 0.75 we use teacher forcing 75% of the time
        batch_size = src.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim

        # create attention mask, set attention to pad tokens to -infinity 
        src_stoi = src_vocab.get_stoi()
        attention_mask = (src == src_stoi["<pad>"]).transpose(0, 1)
        
        #tensor to store decoder outputs
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        # save the encoder-decoder attention weights
        # all_attention_weights = [batch_size, trg len-1, src len ]
        all_attention_weights = torch.zeros(trg.shape[1], trg.shape[0]-1, src.shape[0])
        #encoder_outputs is all hidden states of the input sequence, back and forwards
        #hidden is the final forward and backward hidden states, passed through a linear layer
        encoder_outputs, (hidden, cell) = self.encoder(src)
                
        #first input to the decoder is the <sos> tokens
        input = trg[0, :]
        
        for t in range(1, trg_len):
            
            #insert input token embedding, previous hidden state and all encoder hidden states
            #receive output tensor (predictions) and new hidden state
            output, hidden, cell, attention_weights = self.decoder(input, hidden, cell, encoder_outputs, attention_mask)
            
            # all_attention_weights[t-1] = [src len, batch size]
            all_attention_weights[:, t-1, :] = attention_weights.squeeze(1)
            
            #place predictions in a tensor holding predictions for each token
            outputs[t] = output
            
            #decide if we are going to use teacher forcing or not
            teacher_force = random.random() < teacher_forcing_ratio
            
            #get the highest predicted token from our predictions
            top1 = output.argmax(1) 
            
            #if teacher forcing, use actual next token as next input
            #if not, use predicted token
            input = trg[t] if teacher_force else top1

        return outputs, all_attention_weights

## Bi-directional LSTM (with attention)

### Encoder

In [4]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, enc_hid_dim, n_layers, dropout, bidirectional):
        super().__init__()

        self.emb_dim = emb_dim
        self.enc_hid_dim = enc_hid_dim
        self.dropout = dropout
        self.n_layers = n_layers

        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.lstm = nn.LSTM(emb_dim, enc_hid_dim, n_layers, dropout=dropout, bidirectional=bidirectional)
        self.dropout = nn.Dropout(dropout)

        ### your code here ###
        self.fc_hidden = nn.Linear(enc_hid_dim*2, enc_hid_dim)
        self.fc_cell = nn.Linear(enc_hid_dim*2, enc_hid_dim)
        
    def forward(self, src):
        
        #src = [src len, batch size]
        embedded = self.dropout(self.embedding(src))
        
        #embedded = [src len, batch size, emb dim]
        
        encoder_outputs, (hidden, cell) = self.lstm(embedded)
        # encoder_outputs are always from the top hidden layer, if bidirectional outputs are concatenated.
        # encoder_outputs shape [sequence_length, batch_size, hidden_dim * num_directions]
        # hidden is of shape [num_layers * num_directions, batch_size, hidden_size]
        # cell is of shape [num_layers * num_directions, batch_size, hidden_size]

        ### your code here ###
        hidden = self.fc_hidden(torch.cat((hidden[0:1], hidden[1:2]), dim=2))
        cell = self.fc_cell(torch.cat((cell[0:1], cell[1:2]), dim=2))
        
        # hidden, cell = [num_layers, batch_size, enc_hid_dim]

        return encoder_outputs, hidden.squeeze(0), cell



### Attention

In [None]:
class Attention(nn.Module):
    def __init__(self, enc_hid_dim, dec_hid_dim):
        super().__init__()
        self.W_a = nn.Linear(enc_hid_dim * 2 + dec_hid_dim, dec_hid_dim) ### your code here ###
        self.v_a = nn.Parameter(torch.rand(dec_hid_dim)) # same as doing nn.Linear(dec_hid_dim, 1, bias=False)
        self.neg_inf = torch.tensor(-1e7, device=device)
        
    def forward(self, encoder_outputs, hidden, cell, attention_mask):
        #hidden = [batch size, dec hid dim]
        #encoder_outputs = [src len, batch size, enc hid dim * num directions]
        #attention_mask = [batch_size, src_len]
        
        batch_size = encoder_outputs.shape[1]
        src_len = encoder_outputs.shape[0]
        
        #repeat decoder hidden state src_len-1 times
        repeat_dec_hidden = hidden.unsqueeze(0).repeat(src_len, 1, 1) ### your code here ###

        #repeat_dec_hidden = [src len, batch size, dec hid dim]
        #encoder_outputs = [src len, batch size, enc hid dim * num directions]
        
        # attention scoring function - part 1 - tanh(W_a[s;h])
        energy = torch.tanh(self.W_a(torch.cat((repeat_dec_hidden, encoder_outputs), dim=2)))
        
        #energy = [src len, batch size, dec hid dim]

        # permute energy tensor to get right dim order before applying torch.bmm
        energy = energy.permute(1,2,0) ### your code here ###
        
        #energy = [batch size, dec hid dim, src len]
        
        #v = [dec hid dim]
        
        # repeat v 
        v = self.v_a.repeat(batch_size, 1).unsqueeze(1)
        
        #v = [batch size, 1, dec hid dim]
        
        # attention scoring function - part 2 - v_a(tanh(W_a[s;h]))
        # bmm docs: If mat1 is a (b×n×m) tensor, mat2 is a (b×m×p) tensor, out will be a (b×n×p) tensor.
        attention = torch.bmm(v, energy).squeeze(1)
        
        #attention= [batch size, src len]

        # before computing the softmax, set attention to pad tokens to -infinity
        attention[attention_mask] = self.neg_inf

        # attention scoring function - part 2 - softmax(v_a(tanh(W_a[s;h])))
        return F.softmax(attention, dim=1)

### Decoder

In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, enc_hid_dim, dec_hid_dim, n_layers, dropout, attention):
        super().__init__()
        
        self.output_dim = output_dim
        self.attention = attention
        
        self.embedding = nn.Embedding(output_dim, emb_dim)
        
        self.rnn = nn.LSTM(emb_dim + enc_hid_dim*2, dec_hid_dim, n_layers, dropout=dropout)  ### your code here ###
        
        self.fc_out = nn.Linear(enc_hid_dim*2 + dec_hid_dim + emb_dim, output_dim)  ### your code here ###
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, input, encoder_outputs, hidden, cell, attention_mask):
             
        #input = [batch size]
        #hidden = [batch size, dec hid dim]
        #encoder_outputs = [src len, batch size, enc hid dim * 2]
        #cell = [num layers, batch size, dec hid him]
        #attention_mask = [batch_size, src_len]
        input = input.unsqueeze(0)
        
        #input = [1, batch size]
        
        embedded = self.dropout(self.embedding(input))
        
        #embedded = [1, batch size, emb dim]

        # get the attention probabilities
        attention_weights = self.attention(encoder_outputs, hidden, cell, attention_mask)
                
        #attention_weights = [batch size, src len]
        
        attention_weights = attention_weights.unsqueeze(1)
        
        #a = [batch size, 1, src len]
        
        encoder_outputs = encoder_outputs.permute(1, 0, 2)
        
        #encoder_outputs = [batch size, src len, enc hid dim * 2]

        # perform weighted sum of encoder hidden states to get attention output
        # this results in our context vectors
        context_vectors = torch.bmm(attention_weights, encoder_outputs)
        
        #context_vectors = [batch size, 1, enc hid dim]
        
        context_vectors = context_vectors.permute(1, 0, 2)
        
        #context_vector = [1, batch size, enc hid dim * 2]
        
        # concatenate the attention outputs (or context vectors) with the current decoder input
        rnn_input = torch.cat((embedded, context_vectors), dim=2)
        
        #rnn_input = [1, batch size, (enc hid dim * 2) + emb dim]

        output, (hidden, _) = self.rnn(rnn_input, (hidden.unsqueeze(0), cell))
        
        #output = [seq len, batch size, dec hid dim * n directions]
        #hidden = [n layers * n directions, batch size, dec hid dim]
        
        #seq len, n layers and num directions will always be 1 in this decoder, therefore:
        #output = [1, batch size, dec hid dim]
        #hidden = [1, batch size, dec hid dim]
        
        #this also means that output == hidden
        assert (output == hidden).all()
        
        embedded = embedded.squeeze(0)
        output = output.squeeze(0)
        context_vectors = context_vectors.squeeze(0)
        
        # classification over the entire word vocabulary
        prediction = self.fc_out(torch.cat((output, context_vectors, embedded), dim=1))
        
        #prediction = [batch size, output dim]
        
        return prediction, hidden.squeeze(0), attention_weights

### Seq2Seq

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        
        #src = [src len, batch size]
        #trg = [trg len, batch size]
        #teacher_forcing_ratio is probability to use teacher forcing
        #e.g. if teacher_forcing_ratio is 0.75 we use teacher forcing 75% of the time
        batch_size = src.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim

        # create attention mask, set attention to pad tokens to -infinity 
        src_stoi = src_vocab.get_stoi()
        attention_mask = (src == src_stoi["<pad>"]).transpose(0, 1)
        
        #tensor to store decoder outputs
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)

        # save the encoder-decoder attention weights
        # all_attention_weights = [batch_size, trg len-1, src len ]
        all_attention_weights = torch.zeros(trg.shape[1], trg.shape[0] - 1, src.shape[0])

        # encoder_outputs is all hidden states of the input sequence, back and forwards
        # hidden and cell contain relevant information from forward and backward passes
        # obtained through applying nn.Linear() to each
        encoder_outputs, hidden, cell = self.encoder(src)
        # print("after encoder encoder_outputs, (hidden, cell):", encoder_outputs.shape, hidden.shape, cell.shape)

        #first input to the decoder is the <sos> tokens
        input = trg[0, :]
        
        for t in range(1, trg_len):
            # print(f"iter: {t}")
            
            #insert input token embedding, previous hidden state and all encoder hidden states
            #receive output tensor (predictions) and new hidden state
            output, hidden, attention_weights = self.decoder(input, encoder_outputs, hidden, cell, attention_mask) #<--------------

            # all_attention_weights[t-1] = [src len, batch size]
            all_attention_weights[:, t - 1, :] = attention_weights.squeeze(1)
            
            #place predictions in a tensor holding predictions for each token
            outputs[t] = output
            
            #decide if we are going to use teacher forcing or not
            teacher_force = random.random() < teacher_forcing_ratio
            
            #get the highest predicted token from our predictions
            top1 = output.argmax(1) 
            
            #if teacher forcing, use actual next token as next input
            #if not, use predicted token
            input = trg[t] if teacher_force else top1

        return outputs, all_attention_weights