This assignment explores two key concepts – sub-word modeling and convolutional networks – and applies them to the NMT system we built in the previous assignment. The Assignment 4 NMT model can be thought of as four stages:

1. Embedding layer: Converts raw input text (for both the source and target sentences) to a sequence of dense word vectors via lookup.
2. Encoder: A RNN that encodes the source sentence as a sequence of encoder hidden states.
3. Decoder: A RNN that operates over the target sentence and attends to the encoder hidden states to produce a sequence of decoder hidden states.
4. Output prediction layer: A linear layer with softmax that produces a probability distribution for the next target word on each decoder timestep.

- In Section 1 of this assignment, we will replace (1) with a character-based convolutional encoder
- and in Section 2 we will enhance (4) by adding a character-based LSTM decoder

# Section 1

![](../images/ex5_1.png)

## code for VocabEntry

In [1]:
from collections import Counter
from docopt import docopt
from itertools import chain
import json
import torch
from typing import List
from utils import read_corpus, pad_sents, pad_sents_char

[nltk_data] Downloading package punkt to /home/quantran/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [2]:
class VocabEntry(object):
    """ Vocabulary Entry, i.e. structure containing either
    src or tgt language terms.
    """

    def __init__(self, word2id=None):
        """ Init VocabEntry Instance.
        @param word2id (dict): dictionary mapping words 2 indices
        """
        if word2id:
            self.word2id = word2id
        else:
            self.word2id = dict()
            self.word2id['<pad>'] = 0  # Pad Token
            self.word2id['<s>'] = 1  # Start Token
            self.word2id['</s>'] = 2  # End Token
            self.word2id['<unk>'] = 3  # Unknown Token
        self.unk_id = self.word2id['<unk>']
        self.id2word = {v: k for k, v in self.word2id.items()}

        ## Additions to the A4 code:
        self.char_list = list(
            """ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789,;.!?:'\"/\\|_@#$%^&*~`+-=<>()[]""")

        self.char2id = dict()  # Converts characters to integers
        self.char2id['∏'] = 0  # <pad> token
        self.char2id['{'] = 1  # start of word token
        self.char2id['}'] = 2  # end of word token
        self.char2id['Û'] = 3  # <unk> token
        for i, c in enumerate(self.char_list):
            self.char2id[c] = len(self.char2id)
        self.char_pad = self.char2id['∏']
        self.char_unk = self.char2id['Û']
        self.start_of_word = self.char2id["{"]
        self.end_of_word = self.char2id["}"]
        assert self.start_of_word + 1 == self.end_of_word

        self.id2char = {v: k for k, v in self.char2id.items()}  # Converts integers to characters
        ## End additions to the A4 code

    def __getitem__(self, word):
        """ Retrieve word's index. Return the index for the unk
        token if the word is out of vocabulary.
        @param word (str): word to look up.
        @returns index (int): index of word
        """
        return self.word2id.get(word, self.unk_id)

    def __contains__(self, word):
        """ Check if word is captured by VocabEntry.
        @param word (str): word to look up
        @returns contains (bool): whether word is contained
        """
        return word in self.word2id

    def __setitem__(self, key, value):
        """ Raise error, if one tries to edit the VocabEntry.
        """
        raise ValueError('vocabulary is readonly')

    def __len__(self):
        """ Compute number of words in VocabEntry.
        @returns len (int): number of words in VocabEntry
        """
        return len(self.word2id)

    def __repr__(self):
        """ Representation of VocabEntry to be used
        when printing the object.
        """
        return 'Vocabulary[size=%d]' % len(self)

    def id2word(self, wid):
        """ Return mapping of index to word.
        @param wid (int): word index
        @returns word (str): word corresponding to index
        """
        return self.id2word[wid]

    def add(self, word):
        """ Add word to VocabEntry, if it is previously unseen.
        @param word (str): word to add to VocabEntry
        @return index (int): index that the word has been assigned
        """
        if word not in self:
            wid = self.word2id[word] = len(self)
            self.id2word[wid] = word
            return wid
        else:
            return self[word]

    def words2charindices(self, sents):
        """ Convert list of sentences of words into list of list of list of character indices.
        @param sents (list[list[str]]): sentence(s) in words
        @return word_ids (list[list[list[int]]]): sentence(s) in indices
        """
        return [[[self.char2id.get(c, self.char_unk) for c in ("{" + w + "}")] for w in s] for s in sents]

    def words2indices(self, sents):
        """ Convert list of sentences of words into list of list of indices.
        @param sents (list[list[str]]): sentence(s) in words
        @return word_ids (list[list[int]]): sentence(s) in indices
        """
        return [[self[w] for w in s] for s in sents]

    def indices2words(self, word_ids):
        """ Convert list of indices into words.
        @param word_ids (list[int]): list of word ids
        @return sents (list[str]): list of words
        """
        return [self.id2word[w_id] for w_id in word_ids]

    def to_input_tensor_char(self, sents: List[List[str]], device: torch.device) -> torch.Tensor:
        """ Convert list of sentences (words) into tensor with necessary padding for
        shorter sentences.

        @param sents (List[List[str]]): list of sentences (words)
        @param device: device on which to load the tensor, i.e. CPU or GPU

        @returns sents_var: tensor of (max_sentence_length, batch_size, max_word_length)
        """
        ### YOUR CODE HERE for part 1e
        ### TODO:
        ###     - Use `words2charindices()` from this file, which converts each character to its corresponding index in the
        ###       character-vocabulary.
        list_of_indices = self.words2charindices(sents) # list of list of list
        ###     - Use `pad_sents_char()` from utils.py, which pads all words to max_word_length of all words in the batch,
        ###       and pads all sentences to max length of all sentences in the batch. Read __init__ to see how to get
        ###       index of character-padding token
        sents_var = torch.tensor(pad_sents_char(list_of_indices,self.char_pad),dtype=torch.long, device=device).permute(1,0,2)
        sents_var = sents_var.contiguous()
        ###     - Connect these two parts to convert the resulting padded sentences to a torch tensor.
        return sents_var
        ### HINT:
        ###     - You may find .contiguous() useful after reshaping. Check the following links for more details:
        ###         https://pytorch.org/docs/stable/tensors.html#torch.Tensor.contiguous
        ###         https://pytorch.org/docs/stable/tensors.html#torch.Tensor.view

        ### END YOUR CODE

    def to_input_tensor(self, sents: List[List[str]], device: torch.device) -> torch.Tensor:
        """ Convert list of sentences (words) into tensor with necessary padding for 
        shorter sentences.

        @param sents (List[List[str]]): list of sentences (words)
        @param device: device on which to load the tesnor, i.e. CPU or GPU

        @returns sents_var: tensor of (max_sentence_length, batch_size)
        """
        word_ids = self.words2indices(sents)
        sents_t = pad_sents(word_ids, self['<pad>'])
        sents_var = torch.tensor(sents_t, dtype=torch.long, device=device)
        return torch.t(sents_var)

    @staticmethod
    def from_corpus(corpus, size, freq_cutoff=2):
        """ Given a corpus construct a Vocab Entry.
        @param corpus (list[str]): corpus of text produced by read_corpus function
        @param size (int): # of words in vocabulary
        @param freq_cutoff (int): if word occurs n < freq_cutoff times, drop the word
        @returns vocab_entry (VocabEntry): VocabEntry instance produced from provided corpus
        """
        vocab_entry = VocabEntry()
        word_freq = Counter(chain(*corpus))
        valid_words = [w for w, v in word_freq.items() if v >= freq_cutoff]
        print('number of word types: {}, number of word types w/ frequency >= {}: {}'
              .format(len(word_freq), freq_cutoff, len(valid_words)))
        top_k_words = sorted(valid_words, key=lambda w: word_freq[w], reverse=True)[:size]
        for word in top_k_words:
            vocab_entry.add(word)
        return vocab_entry

In [3]:
temp = VocabEntry()
temp.words2charindices([['I','love','you'],['I','know']])

[[[1, 12, 2], [1, 41, 44, 51, 34, 2], [1, 54, 44, 50, 2]],
 [[1, 12, 2], [1, 40, 43, 44, 52, 2]]]

In [4]:
[temp.id2char[i] for i in [1, 40, 43, 44, 52, 2]]

['{', 'k', 'n', 'o', 'w', '}']

In [5]:
temp1 = temp.to_input_tensor_char([['I','loveee','you'],['I','know']],torch.device('cuda:0'))
temp1

tensor([[[ 1, 12,  2,  0,  0,  0,  0,  0],
         [ 1, 12,  2,  0,  0,  0,  0,  0]],

        [[ 1, 41, 44, 51, 34, 34, 34,  2],
         [ 1, 40, 43, 44, 52,  2,  0,  0]],

        [[ 1, 54, 44, 50,  2,  0,  0,  0],
         [ 0,  0,  0,  0,  0,  0,  0,  0]]], device='cuda:0')

In [6]:
temp.char2id['∏']

0

## code for highway

In [7]:
import torch.nn as nn
import torch.nn.functional as F
import torch

![](../images/ex5_2.png)

In [8]:
class Highway(nn.Module):
    def __init__(self,e_word):
        super().__init__()
        self.e_word = e_word
        self.w_proj = nn.Linear(e_word,e_word)
        self.w_gate = nn.Linear(e_word,e_word)
        # init linear weight and bias?
    def forward(self,x_conv_out):
        """
         
        raw_input x_padded: (max_sentence_length,bs,max_word_length aka m)
        which should be output of to_input_tensor_char()
        
        --char_emb()-->
        x_emb: (max_sentence_length,bs,max_word_length,e_char)
        with e_char is size of character embedding. 
        
        --reshape()-->
        x_reshaped: (max_sentence_length,bs,e_char,max_word_length)
        
        --cnn()-->
        x_conv: (max_sentence_length,bs,e_word,max_word_length-k+1)
        with k is kernel size,e_word is the desired word embedding size
        TODO: do a loop for each sentence?
        
        --relu_and_globalmaxpool()-->
        x_conv_out: (max_sentence_length,bs,e_word)
        
        --high_way()-->
        x_highway: (max_sentence_length,bs,e_word)
        
        --dropout()-->
        x_word_emb: (max_sentence_length,bs,e_word)
        
        input: x_conv_out shape (bs,max_sentence_length,e_word)
        output: x_highway shape (bs,max_sentence_length,e_word) (no dropout applied)
        """
        
        x_proj = F.relu(self.w_proj(x_conv_out))
        x_gate = torch.sigmoid(self.w_gate(x_conv_out))
        x_highway = x_gate * x_proj + (1-x_gate) * x_conv_out
        return x_highway

In [9]:
# test highway
temp_highway = Highway(2)
temp_conv_out = torch.randn(4,3,2)
temp_result = temp_highway(temp_conv_out)

In [10]:
temp_result.shape

torch.Size([4, 3, 2])

## code for cnn

In [171]:
class CNN(nn.Module):
    def __init__(self,e_char,e_word,k=5,padding=1):
        super().__init__()
        self.conv1d = nn.Conv1d(e_char, e_word, kernel_size = k, padding = padding)
        self.mp1d = nn.AdaptiveMaxPool1d(1)
        self.e_word = e_word
    def forward(self,x_reshaped):
        """
        input: x_reshaped: (max_sentence_length,bs,e_char,max_word_length)
        
        output:  x_conv_out: (max_sentence_length,bs,e_word)
            - e_word is the desired word embedding size
        """
#         x_conv_out2 = []
#         for each_sen in torch.split(x_reshaped,1,dim=0):
#             each_sen = each_sen.squeeze(dim=0) # bs,e_char,max_word_length
            
#             x_conv = self.conv1d(each_sen) # (bs,e_word,max_word_length-k+1). 
#             #relu
#             result = F.relu(x_conv) # (bs,e_word,max_word_length-k+1)
#             #maxpool
#             result = self.mp1d(result).squeeze(2) # (bs,e_word,1) to (bs,e_word) after squeezing
            
#             x_conv_out2.append(result)
            
#         x_conv_out2 = torch.stack(x_conv_out2,dim=0)
        
        # you can combine first and second dimension to avoid loop while conv1d
        sent_length,bs = x_reshaped.shape[0],x_reshaped.shape[1]
        new_view = (sent_length * bs,x_reshaped.shape[2],x_reshaped.shape[3])        
        x_reshaped2 = x_reshaped.view(new_view)
#         (max_sentence_length * bs ,e_char,max_word_length)
        
        x_conv = self.conv1d(x_reshaped2)  # (sent_length*bs,e_word,max_word_length-k+1).
        x_conv_out = F.relu(x_conv)
        x_conv_out = self.mp1d(x_conv_out).squeeze(-1) # (sent_length*bs,e_word,1) to (sent_length*bs,e_word)
        x_conv_out = x_conv_out.view(sent_length,bs,self.e_word)
        
        return x_conv_out.contiguous()

In [172]:
# test cnn
temp_conv = nn.Conv1d(3,4,2) #in_channels,out_channels,kernel_size

In [173]:
temp_w = temp_conv.weight.data
temp_b = temp_conv.bias.data

In [174]:
temp_x = torch.randn(1, 3, 2) # bs,in_channels aka emb size,number_of_items

In [175]:
temp3 = temp_conv(temp_x)
temp3

tensor([[[-0.6414],
         [-0.8552],
         [-0.3947],
         [ 0.2296]]], grad_fn=<SqueezeBackward1>)

In [176]:
temp3.shape # bs,out_channels,new_number_of_items

torch.Size([1, 4, 1])

In [177]:
# manual calculation
(temp_w[0] * temp_x[0]).sum() + temp_b[0]

tensor(-0.6414)

In [178]:
# testing cnn + maxpool

In [179]:
temp_conv = nn.Conv1d(3,4,2)
temp_x = torch.randn(2, 3, 4)
temp3 = temp_conv(temp_x)

In [180]:
temp3,temp3.shape

(tensor([[[-0.0569, -0.5749,  0.4177],
          [ 0.2825,  0.5814,  0.1241],
          [-0.0021, -0.3691,  0.4847],
          [-0.3763, -0.5331,  0.4839]],
 
         [[-0.0877,  0.4142, -0.3739],
          [ 1.0563,  0.0531, -0.3173],
          [-0.2251, -0.1101, -1.3916],
          [-0.9606,  0.6323, -0.5496]]], grad_fn=<SqueezeBackward1>),
 torch.Size([2, 4, 3]))

In [181]:
temp_mp = nn.AdaptiveMaxPool1d(1)

In [182]:
temp4 = temp_mp(temp3)
temp4,temp4.shape

(tensor([[[ 0.4177],
          [ 0.5814],
          [ 0.4847],
          [ 0.4839]],
 
         [[ 0.4142],
          [ 1.0563],
          [-0.1101],
          [ 0.6323]]], grad_fn=<SqueezeBackward1>),
 torch.Size([2, 4, 1]))

In [183]:
temp4.squeeze(-1).shape

torch.Size([2, 4])

In [184]:
# test everything

In [185]:
temp_cnn = CNN(3,4,2)
temp_x = torch.randn(3,1,3,4)

In [186]:
temp_final = temp_cnn(temp_x)

In [188]:
temp_final[0].shape

torch.Size([3, 1, 4])

In [187]:
temp_final[0] == temp_final[1]

tensor([[[True, True, True, True]],

        [[True, True, True, True]],

        [[True, True, True, True]]])

## code for ModelEmbeddings

In [23]:
import torch.nn as nn

# Do not change these imports; your module names should be
#   `CNN` in the file `cnn.py`
#   `Highway` in the file `highway.py`
# Uncomment the following two imports once you're ready to run part 1(j)

from cnn import CNN
from highway import Highway


# End "do not change"

In [31]:
class ModelEmbeddings(nn.Module):
    """
    Class that converts input words to their CNN-based embeddings.
    """

    def __init__(self, word_embed_size, vocab):
        """
        Init the Embedding layer for one language
        @param word_embed_size (int): Embedding size (dimensionality) for the output word
        aka e_word
        
        @param vocab (VocabEntry): VocabEntry object. See vocab.py for documentation.

        Hints: - You may find len(self.vocab.char2id) useful when create the embedding
        """
        super(ModelEmbeddings, self).__init__()
        self.word_embed_size = word_embed_size
        self.vocab = vocab
        self.e_char = 50
        self.char_emb = nn.Embedding(len(vocab.char2id),self.e_char,padding_idx=vocab.char_pad)
        self.highway = Highway(self.word_embed_size)
        self.cnn = CNN(self.e_char,self.word_embed_size)
        self.dropout = nn.Dropout(p=0.3)
    def forward(self, x_padded):
        """
        Looks up character-based CNN embeddings for the words in a batch of sentences.
        @param x_padded: Tensor of integers of shape (sentence_length, batch_size, max_word_length) where
            each integer is an index into the character vocabulary
        @param x_word_emb: Tensor of shape (sentence_length, batch_size, word_embed_size), containing the
            CNN-based embeddings for each word of the sentences in the batch
        """
        
        
#         raw_input x_padded: (max_sentence_length,bs,max_word_length aka m)
#             - each integer is an index into the character vocabulary
#             - this should be output of to_input_tensor_char()
        
#         --char_emb()-->
#         x_emb: (max_sentence_length,bs,max_word_length,e_char)
#             - with e_char is size of character embedding.      
        x_emb = self.char_emb(x_padded)
        
#         --reshape()-->
#         x_reshaped: (max_sentence_length,bs,e_char,max_word_length)
        x_reshaped = x_emb.permute(0,1,3,2)
    
#         --cnn()-->
#         x_conv: (max_sentence_length,bs,e_word,max_word_length-k+1)
#             - with k is kernel size,e_word is the desired word embedding size
#             - do a loop for each sentence
#         --relu_and_globalmaxpool()-->
#         x_conv_out: (max_sentence_length,bs,e_word)
        x_conv_out = self.cnn(x_reshaped)

#         --high_way()-->
#         x_highway: (max_sentence_length,bs,e_word)
        x_highway = self.highway(x_conv_out)
#         --dropout()-->
#         x_word_emb: (max_sentence_length,bs,e_word)
        x_word_emb = self.dropout(x_highway)
        return x_word_emb

        
        