# Simplest Encoder-Decoder Model


In [1]:
# import libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
import re
import os
import unicodedata
import numpy as np

In [2]:
device = torch.device("cpu")

MAX_LENGTH = 10  # Maximum sentence length

# Default word tokens
PAD_token = 0  # Used for padding short sentences
SOS_token = 1  # Start-of-sentence token
EOS_token = 2  # End-of-sentence token

# 1. Sequence-to-Sequence Model Architecture

<img src="seq2seq_model.png">

# 2. Record the mapping in `Voc`

In [3]:
class Voc:
    def __init__(self, name):
        self.name = name
        self.trimmed = False
        self.word2index = {}
        self.word2count = {}
        self.index2word = {PAD_token: "PAD", SOS_token: "SOS", EOS_token: "EOS"}
        self.num_words = 3  # Count SOS, EOS, PAD

    def addSentence(self, sentence):
        for word in sentence.split(' '):
            self.addWord(word)

    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.num_words
            self.word2count[word] = 1
            self.index2word[self.num_words] = word
            self.num_words += 1
        else:
            self.word2count[word] += 1

    # Remove words below a certain count threshold
    def trim(self, min_count):
        if self.trimmed:
            return
        self.trimmed = True
        keep_words = []
        for k, v in self.word2count.items():
            if v >= min_count:
                keep_words.append(k)

        print('keep_words {} / {} = {:.4f}'.format(
            len(keep_words), len(self.word2index), len(keep_words) / len(self.word2index)
        ))
        # Reinitialize dictionaries
        self.word2index = {}
        self.word2count = {}
        self.index2word = {PAD_token: "PAD", SOS_token: "SOS", EOS_token: "EOS"}
        self.num_words = 3 # Count default tokens
        for word in keep_words:
            self.addWord(word)

In [4]:
# Lowercase and remove non-letter characters
def normalizeString(s):
    s = s.lower()
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
    return s

In [5]:
# Takes string sentence, returns sentence of word indexes
def indexesFromSentence(voc, sentence):
    return [voc.word2index[word] for word in sentence.split(' ')] + [EOS_token]

# 3. Encoder

In encoder, it iterates a sentence each timestep (i.e. word) by timestep. At each timestep, it outputs a hidden state vector and an output vector. The hidden state vector is passed to the next timestep. The output vector is recorded.

The last hidden state vector is stated as **"Encoder state"**, also known as **"Context Vector"**.

The first hidden state vector fed to the first timestep is all zeros.

The sentence is pad with *PAD_token* to a certain length.


In [6]:
class Encoder(nn.Module):
    def __init__(self, hidden_dim, vocab_size, embedding_dim, n_layers=1, dropout=0):
        """
        constructor
        @param hidden_dim     hidden dimension
        @param vocab_size     vocabulary size
        @param embedding_dim  embedding size
        @param n_layers       number of recurrent layers
        @param dropout        dropout rate
        """
        super(Encoder, self).__init__()
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.n_layers = n_layers
        self.dropout = dropout
        
        # embedding layer
        self.embedding = nn.Embedding(
            num_embeddings=vocab_size, 
            embedding_dim=embedding_dim)
        
        # recurrent layer
        self.rnn = nn.GRU(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=n_layers,
            dropout=(0 if n_layers == 1 else dropout),
            bidirectional=True)
        
    def forward(self, input_seq, input_lengths, hidden=None):
        # convert word indexes to embeddings
        embedded = self.embedding(input_seq)
        
        # pack padded batch of sequences for RNN module
        packed = torch.nn.utils.rnn.pack_padded_sequence(embedded, input_lengths)
        
        # forward pass through recurrent layer
        outputs, hidden = self.rnn(packed, hidden)
        
        # sum bidirectional rnn outputs
        outputs = outputs[:, :, :self.hidden_size] + outputs[:, : ,self.hidden_size:]
        
        # return output and final hidden state
        return outputs, hidden

# 4. Decoder

In decoder, it tries to reproduce the sentence each timestep (i.e. word) by timestep. At each timestep, it also outputs a hidden state vector and an output vector. The first initial hidden state at the first timestep is same as the "Context Vector" of the Encoder.


In [7]:
class Decoder(nn.Module):
    def __init__(self, hidden_dim, vocab_size, embedding_dim, output_dim, n_layers=1, dropout=0.1):
        """
        constructor
        @param hidden_dim     hidden dimension
        @param vocab_size     vocabulary size
        @param embedding_dim  embedding size
        @param output_dim     output dimension
        @param n_layers       number of recurrent layers
        @param dropout        dropout rate
        """
        super(Decoder, self).__init__()
        
        # keep for reference
        self.hidden_dim = hidden_dim
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.output_dim = output_dim
        self.n_layers = n_layers
        self.dropout = dropout
        
        # embedding layer
        self.embedding = nn.Embedding(
            num_embeddings=vocab_size, 
            embedding_dim=embedding_dim)
        
        # embedding dropout
        self.embedding_dropout = nn.Dropout(dropout)
        
        # recurrent layer
        self.rnn = nn.GRU(input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=n_layers,
            dropout=(0 if n_layers == 1 else dropout),
            bidirectional=False)
        
        # fc layer 1
#         self.concat = nn.Linear(hidden_dim * 2, hidden_dim)
        
        # fc layer 2
        self.out = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, input_step, last_hidden, encoder_outputs):
        # get embedding of current input word
        embedded = self.embedding(input_step)
        embedded = self.embedding_dropout(embedded)
        
        # forward through recurrent layer
        rnn_output, hidden = self.rnn(embedded, last_hidden)
        
        # concatenate rnn output
        rnn_output = rnn_output.squeeze(0)
        
        # predict next word
        output = self.out(rnn_output)
        output = F.softmax(output, dim=1)
        
        # return output and final hidden state
        return output, hidden

# 5. Evaluate

In [8]:
def evaluate(searcher, voc, sentence, max_length=MAX_LENGTH):
    # format input sentence as a batch
    # words -> indexes
    indexes_batch = [indexesFromSentence(voc, sentence)]
    
    # create lengths tensor
    lengths = torch.tensor([len(indexes) for indexes in indexes_batch])
    
    # transpose dimensions of batch to match models' expectations
    input_batch = torch.LongTensor(indexes_batch).transpose(0, 1)
    
    # use appropriate device
    input_batch = input_batch.to(device)
    lengths = lengths.to(device)
    
    # decoder sentence with searcher
    tokens, scores = searcher(input_batch, lengths, max_length)
    
    # indexes -> words
    decoded_words = [voc.index2word[token.item()] for token in tokens]
    
    return decoded_words

In [9]:
# evaluate inputs from user input (stdin)
def evaluateInput(searcher, voc):
    input_sentence = ''
    while True:
        try:
            # get input sentence
            input_sentence = input('> ')
            
            # check if it is quit case
            if input_sentence == 'q' or input_sentence == 'quit':
                break
            
            # normalize sentence
            input_sentence = normalizeString(input_sentence)
            
            # evaluate sentence
            output_words = evaluate(searcher, voc, input_sentence)
            
            # format and print response sentence
            output_words[:] = [x for x in output_words if not (x == 'EOS' or x == 'PAD')]
        except KeyError:
            print("Error: Encoutered unknown word.")

In [10]:
# Normalize input sentence and call evaluate()
def evaluateExample(sentence, searcher, voc):
    print("> " + sentence)
    
    # Normalize sentence
    input_sentence = normalizeString(sentence)
    
    # Evaluate sentence
    output_words = evaluate(searcher, voc, input_sentence)
    output_words[:] = [x for x in output_words if not (x == 'EOS' or x == 'PAD')]
    print('Bot:', ' '.join(output_words))