<a href="https://colab.research.google.com/github/Karthik-Ragunath/Deep_Learning_Notebooks/blob/master/4-2.Seq2Seq(Attention)/Seq2Seq(Attention).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
# code by Tae Hwan Jung @graykode
# Reference : https://github.com/hunkim/PyTorchZeroToAll/blob/master/14_2_seq2seq_att.py
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt

# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is short than time steps

def make_batch():
    input_batch = [np.eye(n_class)[[word_dict[n] for n in sentences[0].split()]]]
    output_batch = [np.eye(n_class)[[word_dict[n] for n in sentences[1].split()]]]
    target_batch = [[word_dict[n] for n in sentences[2].split()]]

    # make tensor
    return torch.FloatTensor(input_batch), torch.FloatTensor(output_batch), torch.LongTensor(target_batch)

class Attention(nn.Module):
    def __init__(self):
        super(Attention, self).__init__()
        self.enc_cell = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5)
        self.dec_cell = nn.RNN(input_size=n_class, hidden_size=n_hidden, dropout=0.5)
        print("Hidden Layer Size:", n_hidden)
        # Linear for attention
        self.attn = nn.Linear(n_hidden, n_hidden)
        self.out = nn.Linear(n_hidden * 2, n_class)

    def forward(self, enc_inputs, hidden, dec_inputs):
        enc_inputs = enc_inputs.transpose(0, 1)  # enc_inputs: [n_step(=n_step, time step), batch_size, n_class]
        dec_inputs = dec_inputs.transpose(0, 1)  # dec_inputs: [n_step(=n_step, time step), batch_size, n_class]

        # enc_outputs : [n_step (sequence_len), batch_size, num_directions(=1) * n_hidden], matrix F
        # enc_hidden : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
        enc_outputs, enc_hidden = self.enc_cell(enc_inputs, hidden) 

        trained_attn = []
        hidden = enc_hidden
        n_step = len(dec_inputs) # n_step - sequence_length
        model = torch.empty([n_step, 1, n_class]) # same torch.empty(n_step, 1, n_class) - gives tensor of shape (seq_len, 1, input_feature_size)

        for i in range(n_step):  # each time step
            # dec_output : [n_step(=1), batch_size(=1), num_directions(=1) * n_hidden]
            # hidden : [num_layers(=1) * num_directions(=1), batch_size(=1), n_hidden]
            dec_output, hidden = self.dec_cell(dec_inputs[i].unsqueeze(0), hidden) # unsqueezing is done to maintain input_shape
            attn_weights = self.get_att_weight(dec_output, enc_outputs)  # attn_weights : [1, 1, n_step] # to compute impact of rest of timestamps on one timestamp, 
                                                                                                         # thats why decoder output is 1 element (unsqueezed) and encoder output is in list 
            trained_attn.append(attn_weights.squeeze().data.numpy())

            # matrix-matrix product of matrices [1,1,n_step] x [1,n_step,n_hidden] = [1,1,n_hidden]
            context = attn_weights.bmm(enc_outputs.transpose(0, 1)) # performing batch matrix multiplication
            dec_output = dec_output.squeeze(0)  # dec_output : [batch_size(=1), num_directions(=1) * n_hidden]
            context = context.squeeze(1)  # [1, num_directions(=1) * n_hidden]
            model[i] = self.out(torch.cat((dec_output, context), 1))

        # make model shape [n_step, n_class]
        return model.transpose(0, 1).squeeze(0), trained_attn

    def get_att_weight(self, dec_output, enc_outputs):  # get attention weight one 'dec_output' with 'enc_outputs'
        n_step = len(enc_outputs) # n_step = seq_len
        attn_scores = torch.zeros(n_step)  # attn_scores : [n_step]
        # dec
        for i in range(n_step):
            attn_scores[i] = self.get_att_score(dec_output, enc_outputs[i])

        # Normalize scores to weights in range 0 to 1
        return F.softmax(attn_scores).view(1, 1, -1) # gets softmax output from list of size seq_len / time_stamps

    def get_att_score(self, dec_output, enc_output):  # enc_outputs [batch_size, num_directions(=1) * n_hidden]
        score = self.attn(enc_output)  # score : [batch_size, n_hidden]
        # attn = nn.Linear(n_hidden, n_hidden) - enc_output = (batch_size, n_hidden*num_dirns)
        # (batch_size, n_hidden*num_dirns) * (n_hidden * n_hidden) = (batch_size * n_hidden)
        return torch.dot(dec_output.view(-1), score.view(-1))  # inner product make scalar value

if __name__ == '__main__':
    n_step = 5 # number of cells(= number of Step)
    n_hidden = 128 # number of hidden units in one cell

    sentences = ['ich mochte ein bier P', 'S i want a beer', 'i want a beer E']

    word_list = " ".join(sentences).split()
    word_list = list(set(word_list))
    word_dict = {w: i for i, w in enumerate(word_list)}
    number_dict = {i: w for i, w in enumerate(word_list)}

    # print(word_list, word_dict, number_dict)
    n_class = len(word_dict)  # vocab list

    # hidden : [num_layers(=1) * num_directions(=1), batch_size, n_hidden]
    hidden = torch.zeros(1, 1, n_hidden)
    # print('n_class:', n_class)
    # print('*****************')
    # print('hidden:', hidden)
    # print('len hidden:', len(hidden[0][0]))

    model = Attention()
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    # input_batch, output_batch, target_batch = make_batch()

    # # Train
    # for epoch in range(2000):
    #     optimizer.zero_grad()
    #     output, _ = model(input_batch, hidden, output_batch)

    #     loss = criterion(output, target_batch.squeeze(0))
    #     if (epoch + 1) % 400 == 0:
    #         print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))

    #     loss.backward()
    #     optimizer.step()

    # # Test
    # test_batch = [np.eye(n_class)[[word_dict[n] for n in 'SPPPP']]]
    # test_batch = torch.FloatTensor(test_batch)
    # predict, trained_attn = model(input_batch, hidden, test_batch)
    # predict = predict.data.max(1, keepdim=True)[1]
    # print(sentences[0], '->', [number_dict[n.item()] for n in predict.squeeze()])

    # # Show Attention
    # fig = plt.figure(figsize=(5, 5))
    # ax = fig.add_subplot(1, 1, 1)
    # ax.matshow(trained_attn, cmap='viridis')
    # ax.set_xticklabels([''] + sentences[0].split(), fontdict={'fontsize': 14})
    # ax.set_yticklabels([''] + sentences[2].split(), fontdict={'fontsize': 14})
    # plt.show()

n_class: 11
*****************
hidden: tensor([[[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]]])
len hidden: 128


In [9]:
# Sample RNN
rnn = nn.RNN(10, 20, 2)
input = torch.randn(5, 3, 10)
h0 = torch.randn(2, 3, 20)
output, hn = rnn(input, h0)

# rnn = nn.RNN(10, 20, 2) - # number_of_features = 10 (Used for one-hot encoding), hidden_size = 20 (output size), num_layers = 2
# input = torch.randn(5, 3, 10) - # number_of_sequences = 5 (input_token_len [Intuitive number of cells]), number_of_batches = 3, input_size = 10 # num_features must be the same as input_size; used for one-hot encoding
# h0 = torch.randn(2, 3, 20) - # directions * num_layers = 2, number_of_batches = 3, hidden_size = 20
# output, hn = rnn(input, h0) - # output - (L,N,D∗Hout) - (num_sequences = 5, number_of_batches = 3, D*Hout (Dimension * Hidden_Size) = 1*20), hn - (D∗num_layers - (1*2), number_of_batches - 3, Hout (Hidden Size) - 20) 

In [10]:
rnn

RNN(10, 20, num_layers=2)

In [11]:
input

tensor([[[-1.0831e+00,  1.1682e+00,  2.0543e+00, -1.1602e+00,  1.0265e+00,
           7.3769e-01,  4.0249e-01, -7.4208e-01,  2.1492e-01,  7.1503e-01],
         [ 1.0265e+00,  2.8302e+00,  9.5309e-01,  8.8380e-01, -8.7740e-01,
           1.3232e+00,  9.6722e-02, -1.0835e+00, -1.4574e+00, -9.0026e-01],
         [ 2.1591e-01,  1.3157e-01, -6.3713e-01,  9.0262e-01, -5.4343e-01,
           4.0305e-01, -1.2223e-02, -6.7788e-01, -8.7928e-01,  6.3687e-01]],

        [[-8.1321e-02, -4.0574e-01,  2.0943e-01, -2.0276e-01,  6.2889e-01,
          -1.9671e+00,  1.1834e-01, -7.1259e-01, -1.7655e+00,  2.6436e-01],
         [ 7.2928e-01,  2.1325e+00,  1.2982e+00, -3.8353e-01, -5.0894e-01,
          -3.8734e-01,  1.5055e+00, -2.0847e-01, -1.4812e-01,  7.0768e-01],
         [-1.0223e+00, -2.0602e+00,  2.0781e+00, -9.0671e-01, -4.0895e-02,
          -2.2190e-01,  3.0300e-01,  9.9097e-01, -5.6988e-01,  5.8662e-02]],

        [[-3.6564e-01,  1.1822e+00,  1.0823e-01, -7.5445e-01, -1.0674e+00,
           2.38

In [12]:
input.shape

torch.Size([5, 3, 10])

In [13]:
output.shape

torch.Size([5, 3, 20])

In [14]:
hn.shape

torch.Size([2, 3, 20])

In [16]:
# Sample RNN
rnn = nn.RNN(10, 20, 2)
input = torch.randn(5, 3, 10)
h0 = torch.randn(2, 3, 20)
output, hn = rnn(input, h0)

In [17]:
# linear layer
m = nn.Linear(20, 30) # 20 - size of each input sample, 30 - size of each output sample
input = torch.randn(128, 20) # 128 - No. of input samples, 20 - size of each input sample
output = m(input) # (128 * 20) * (20 * 30) = (128 * 30) - matrix dimension
print(output.size())

torch.Size([128, 30])


In [18]:
input.shape

torch.Size([128, 20])

In [19]:
output.shape

torch.Size([128, 30])

In [22]:
# nn.functional.linear
input_matrix = torch.randn(128, 64)
output_matrix = torch.randn(192, 64)
bias = torch.ones(192)

# bias got broadcasted

print(input_matrix.shape)
print(output_matrix.shape)
print(bias.shape)

m = nn.functional.linear(input_matrix, output_matrix, bias=bias) # (128 * 64) * (64 * 192) + (1 * 192)
print(m.shape)

torch.Size([128, 64])
torch.Size([192, 64])
torch.Size([192])
torch.Size([128, 192])


In [29]:
seq_len = 1
input_feature_size = 2
sample_model = torch.empty([seq_len, 1, input_feature_size])

In [30]:
type(sample_model)

torch.Tensor

In [25]:
sample_model.shape

torch.Size([10, 1, 100])

In [31]:
sample_model

tensor([[[-3.1539e-22,  3.0949e-41]]])

In [32]:
sample_model = torch.empty(seq_len, 1, input_feature_size)

In [33]:
sample_model.shape

torch.Size([1, 1, 2])

In [34]:
sample_model

tensor([[[-3.1539e-22,  3.0949e-41]]])

In [36]:
seq_len = 120
batches = 3
num_input_features = 5 # vocab size for one hot encoding
seq_index = 10
enc_inputs = torch.randn((120, 3, 5))
enc_inputs[seq_index].unsqueeze(0)

tensor([[[-0.8313,  2.0836, -0.6478,  1.7512, -1.6786],
         [ 0.4946, -0.2987, -0.3469,  1.8557, -0.4541],
         [ 0.4525,  1.2246, -0.3828,  0.4145, -0.9055]]])

In [38]:
(enc_inputs[seq_index].squeeze(0)).shape

torch.Size([3, 5])

In [39]:
enc_inputs[seq_index].squeeze(0)

tensor([[-0.8313,  2.0836, -0.6478,  1.7512, -1.6786],
        [ 0.4946, -0.2987, -0.3469,  1.8557, -0.4541],
        [ 0.4525,  1.2246, -0.3828,  0.4145, -0.9055]])

In [40]:
enc_inputs[seq_index].shape

torch.Size([3, 5])

In [41]:
# Performs batch matrix multiplication
input = torch.randn(10, 3, 4)
mat2 = torch.randn(10, 4, 5)
res = torch.bmm(input, mat2)
res.size()
#torch.Size([10, 3, 5])

torch.Size([10, 3, 5])