# Variational auto encoder

In [512]:
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np

torch.manual_seed(1)

<torch._C.Generator at 0x7efe140a9970>

## Define parameters

In [513]:
class Parameters:
    def __init__(self,  
                 word_vocab_size):
        
        self.word_vocab_size = int(word_vocab_size)
        
        self.word_embed_size = 300
        
        self.encoder_rnn_hidden_size = 150
        self.encoder_rnn_num_layers = 2
        self.encoder_rnn_num_directions = 2
        
        self.latent_variable_size = 30
        
        self.decoder_k = 3
        self.decoder_dilations = [1, 2, 4]
        self.decoder_kernels = [(400, 
                                 self.latent_variable_size + self.word_embed_size, self.decoder_k), 
                                (450, 400, self.decoder_k), 
                                (500, 450, self.decoder_k)]
        self.decoder_num_layers = len(self.decoder_kernels)
        self.decoder_paddings = [Parameters.effective_k(k, self.decoder_dilations[i])-1
                                 for i, (_,_, k) in enumerate(self.decoder_kernels)]
    
    @staticmethod
    def effective_k(k, d):
        """
        :param k: kernel width
        :param d: dilation size
        :return: effective kernel width after dilation
        """
        return (k-1)*d + 1 # think like (k-1)(d-1) + k

In [514]:
# training batch specific data
temp_batch_size = 12
temp_seq_size = 10
temp_words_vocab_size = 10002

In [515]:
param = Parameters(temp_words_vocab_size)

## Embeddings

In [516]:
word_emb = nn.Embedding(param.word_vocab_size, param.word_embed_size)

## Encoder

In [517]:
class Encoder(nn.Module):
    def __init__(self, 
                 word_emb_size, 
                 rnn_hidden_size, 
                 rnn_num_layers, 
                 rnn_num_directions=1):
        super(Encoder, self).__init__()
        
        self.word_emb_size = word_emb_size
        self.rnn_hidden_size = rnn_hidden_size
        self.rnn_num_layers = rnn_num_layers
        self.rnn_num_directions = rnn_num_directions
        self.is_bidirectional = True if self.rnn_num_directions==2 else False
        
        assert(self.rnn_num_directions==1 or self.rnn_num_directions==2)   
        
        self.rnn = nn.GRU(input_size=self.word_emb_size, 
                          hidden_size=self.rnn_hidden_size, 
                          num_layers=self.rnn_num_layers, 
                          batch_first=True,
                          bidirectional=self.is_bidirectional)

        
    def forward(self, x):
        """
        :param x: [batch_size, seq_len, embed_size] tensor
        :example x = [[emb_of(<bos>), emb_of(Hi), 
                       emb_of(there), emb_of(<eos>)]]
        :return: last hidden state [batch_size, directions * rnn_hidden_size] tensor
        """
        
        batch_size = x.size(0)
        # dont think we need to initialize as by default it initializes to zero
        #hidden = self._init_hidden(batch_size) 
        
        _, final_state = self.rnn(x) # hidden.shape = (layers*directions, batch_size, rnn_hidden_size)
        final_state = final_state.view(self.rnn_num_layers, 
                                       self.rnn_num_directions, 
                                       batch_size, 
                                       self.rnn_hidden_size) 
        # get the last layer 
        final_state = final_state[-1] # [rnn_num_directions, batch_size, rnn_hidden_size]
    
        if self.is_bidirectional:
            # if bidirectional, concatenate the directions column wise
            final_state = torch.cat((final_state[0], final_state[1]), 1) 
        else:
            # if one directional, get the 0th element, ie the only direction available
            final_state = final_state[0]
        
        return final_state # [batch_size, rnn_num_directions * rnn_hidden_size] tensor
        

## Encoder dummy input

In [518]:
encoder_word_input = np.random.randint(low=0, 
                                       high=param.word_vocab_size, 
                                       size=(temp_batch_size, temp_seq_size))

encoder_word_input = torch.LongTensor(encoder_word_input)
encoder_word_input = autograd.Variable(encoder_word_input)
encoder_input = word_emb(encoder_word_input)
print(encoder_input.shape)

torch.Size([12, 10, 300])


## Run the encoder class

In [519]:
encoder = Encoder(param.word_embed_size, 
                  param.encoder_rnn_hidden_size, 
                  param.encoder_rnn_num_layers, 
                  param.encoder_rnn_num_directions)
encoder_output = encoder.forward(encoder_input)
print(encoder_output.shape)

torch.Size([12, 300])


## Decoder

In [526]:
class Decoder(nn.Module):
    def __init__(self, decoder_kernels, dilations, paddings, word_vocab_size):
        """
        :param decoder_kernels: [(out_chan, in_chan, width), ...] has num_layers elements
        :param dilations: [1, 2, 4] list of int
        :param paddings: [2, 4, 8] list of int
        """
        super(Decoder, self).__init__()
        
        self.kernels_shape = decoder_kernels
        self.dilations = dilations
        self.paddings = paddings
        self.word_vocab_size = word_vocab_size
        
        # Because we want to have a variable number of layers, 
        # I do not know how to use nn.Sequential. 
        # Because of not being able to use nn.Sequential, we cannot use nn.Conv1d 
        # and have to use F.conv1d.
        
        # If we could have used nn.Conv1d, the conv layers would have been in the __init__
        # as it would hold the weights. So we would not have to store the 
        # weights explicitly in self.kernels_param/self.biases_param
        
        # Learnable kernel parameters
        self.kernels_param = [nn.Parameter(torch.Tensor(out_chan, in_chan, width).normal_(0, 0.05))
                              for out_chan, in_chan, width in decoder_kernels]
        self._add_to_parameters(self.kernels_param, "decoder_kernels_param")
        
        # Learnable bias parameters
        self.biases_param = [nn.Parameter(torch.Tensor(out_chan).normal_(0, 0.05)) 
                             for out_chan, _, _ in decoder_kernels ]
        self._add_to_parameters(self.biases_param, "decoder_biases_param")
        
        self.conv_out_size = self.kernels_shape[-1][0]
        self.lin_layer = nn.Linear(self.conv_out_size, words_vocab_size)
        
        
    def _add_to_parameters(self, parameters, name):
        # Necessary to do this for the module to access the parameters
        for i, parameter in enumerate(parameters):
            self.register_parameter(name='{}-{}'.format(name, i), param=parameter)
        
    def forward(self, x, z=None):
        """
        :param x: [batch_size, seq_len, word_emb_size]
        :param z: [batch_size, lat_var_size]
        
        :note: for x, the last element of the seq is <eos>
        :return: un-normalized logit of sentence words 
                 distribution
                 [batch_size, seq_len, word_vocab_size]
        """
        
        decoder_input = x[:,:-1,:] # last in seq is <eos> which is not fed as input
        
        lat_var_size = z.shape[1] # [batch_size, lat_var_size]
        batch_size, input_seq_len, word_emb_size = decoder_input.shape
    
        z = torch.cat([z]*input_seq_len, 1) # [batch_size, lat_var_size * input_seq_len]
        z = z.view(batch_size, input_seq_len, lat_var_size) # [batch_size, input_seq_len, lat_var_size]
        
        # concatenate z to each word in the input for decoder
        decoder_input = torch.cat([decoder_input, z], 2) # [batch_size, input_seq_len, word_emb_size + lat_var_size] 
            
        """
        Why transpose:
        Input decoder_input has the shape of [batch_size, input_seq_len, word_emb_size + lat_var_size]
        Since Conv1d takes in input in the form [batch_size, word_emb_size + lat_var_size, input_seq_len], 
        we need to change shape.
        
        Why contiguous:
        Below, we use "contiguous" to store the variable in contiguous memory. 
        Storing variable in contiguous memory is necessary to call "view" on the variable.
        """
        # [batch_size, word_emb_size + lat_var_size, input_seq_len]
        decoder_input = decoder_input.transpose(1, 2).contiguous() 
        
        x = decoder_input
        # Get the output from the conv layer
        for layer, kernel in enumerate(self.kernels_shape):
            out_chan, in_chan, width = kernel[0], kernel[1], kernel[2] 
            
            pad = self.paddings[layer]
            dil = self.dilations[layer]
            
            x = F.conv1d(x, self.kernels_param[layer], 
                               bias=self.biases_param[layer], 
                               dilation=self.dilations[layer], 
                               padding=self.paddings[layer])
            
            # Because of padding, the seq_len increases by |padding|
            # These are not meaningful in language modeling and needs to be removed
            x = x[:,:,:-self.paddings[layer]].contiguous()
            x = F.relu(x)
            
        # x.shape = [batch_size, self.conv_out_size, input_seq_len]
        print(x.shape)
            
        # Return should have size [batch_size, input_seq_len, self.conv_out_size]
        x = x.transpose(1, 2).contiguous() # [batch_size, input_seq_len, self.conv_out_size]
        
        # Only the out_chal should go through the linear layer
        x = x.view(-1, self.conv_out_size) # [batch_size * input_seq_len, self.conv_out_size]
        x = self.lin_layer(x) # [batch_size * input_seq_len, self.word_vocab_size]
        logits = x.view(-1, input_seq_len, self.word_vocab_size) # [batch_size * input_seq_len, self.word_vocab_size]
        return logits 
        

## Decoder dummy input 

In [527]:
decoder_word_input = np.random.randint(low=0, 
                                       high=temp_word_vocab_size, 
                                       size=(temp_batch_size, temp_seq_size))

decoder_word_input = torch.LongTensor(decoder_word_input)
decoder_word_input = autograd.Variable(decoder_word_input)
word_emb = nn.Embedding(temp_word_vocab_size, param.word_embed_size)
decoder_input = word_emb(decoder_word_input)

z = autograd.Variable(torch.randn([temp_batch_size, param.latent_variable_size]))

## Run the decoder class

In [528]:
decoder = Decoder(param.decoder_kernels, 
                  param.decoder_dilations, 
                  param.decoder_paddings, 
                  temp_words_vocab_size)
print(decoder_input.shape)
logits = decoder.forward(decoder_input, z)

torch.Size([12, 10, 300])
torch.Size([12, 500, 9])
