## Defining Multiplicative LSTM

1. Model of recurrent neural network used by OpenAi people (https://arxiv.org/abs/1609.07959). 

2. Model obtained from https://discuss.pytorch.org/t/implementation-of-multiplicative-lstm/2328/5.


In [1]:
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F

class mLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, embed_size, output_size):
        super(mLSTM, self).__init__()

        self.hidden_size = hidden_size
        # input embedding
        self.encoder = nn.Embedding(input_size, embed_size)
        # lstm weights
        self.weight_fm = nn.Linear(hidden_size, hidden_size)
        self.weight_im = nn.Linear(hidden_size, hidden_size)
        self.weight_cm = nn.Linear(hidden_size, hidden_size)
        self.weight_om = nn.Linear(hidden_size, hidden_size)
        self.weight_fx = nn.Linear(embed_size, hidden_size)
        self.weight_ix = nn.Linear(embed_size, hidden_size)
        self.weight_cx = nn.Linear(embed_size, hidden_size)
        self.weight_ox = nn.Linear(embed_size, hidden_size)
        # multiplicative weights
        self.weight_mh = nn.Linear(hidden_size, hidden_size)
        self.weight_mx = nn.Linear(embed_size, hidden_size)
        # decoder
        self.decoder = nn.Linear(hidden_size, output_size)

    def forward(self, inp, h_0, c_0):
        # encode the input characters
        inp = self.encoder(inp)
        # calculate the multiplicative matrix
        m_t = self.weight_mx(inp) * self.weight_mh(h_0)
        # forget, input and output gates
        f_g = torch.sigmoid(self.weight_fx(inp) + self.weight_fm(m_t))
        i_g = torch.sigmoid(self.weight_ix(inp) + self.weight_im(m_t))
        o_g = torch.sigmoid(self.weight_ox(inp) + self.weight_om(m_t))
        # intermediate cell state
        c_tilda = torch.tanh(self.weight_cx(inp) + self.weight_cm(m_t))
        # current cell state
        cx = f_g * c_0 + i_g * c_tilda
        # hidden state
        hx = o_g * torch.tanh(cx)

        out = self.decoder(hx.view(1,-1))

        return out, hx, cx

    def init_hidden(self):
        h_0 = Variable(torch.zeros(1, self.hidden_size)).cuda()
        c_0 = Variable(torch.zeros(1, self.hidden_size)).cuda()
        return h_0, c_0



In [2]:
import random

def generate_chunk(predict_len=200, temperature=0.5):
    prime_str = "O produto"  
    hidden, cell = rnn.init_hidden()
    prime_input = char_tensor(prime_str).cuda()
    predicted = prime_str

    # prime_str é o texto inicial que o gerado irá completar
    for p in range(len(prime_str) - 1):
        _, hidden, cell = rnn(prime_input[p], hidden, cell)
    inp = prime_input[-1]
    
    for p in range(predict_len):
        output, hidden, cell = rnn(inp, hidden, cell)
        
        # Usa a temperatura para amostrar a distribuição e escolher a saída probabilísticamente
        output_dist = output.data.view(-1).div(temperature).exp()
        top_i = torch.multinomial(output_dist, 1)[0]
        
        # Adiciona o caracter predito à string de saída
        predicted_char = chr(top_i)
        predicted += predicted_char
        inp = char_tensor(predicted_char).cuda()

    return predicted

In [3]:
# Converte string para uma lista de inteiros
from unidecode import unidecode

def tensor2char(tensor, temperature=0.2):
    output_dist = output.data.view(-1).div(temperature).exp()
    top_i = torch.multinomial(output_dist, 1)[0]
    return chr(top_i)

def char_tensor(string):
    string = unidecode(string)
    tensor = torch.zeros(len(string)).long()
    for c in range(len(string)):
        try:
            tensor[c] = ord(string[c])
        except:
            print(c)
            raise
    return Variable(tensor)

print(char_tensor('The omega (Ω) symbol\n'))

tensor([ 84, 104, 101,  32, 111, 109, 101, 103,  97,  32,  40,  79,  41,  32,
        115, 121, 109,  98, 111, 108,  10])


In [4]:
# Treina sobre um exemplo (i.e. uma amostragem do texto)

def train(inp, target):
    hidden = rnn.init_hidden()
    rnn.zero_grad()
    loss = 0

    inp = inp.cuda()
    hidden, cell = rnn.init_hidden()
    for c in range(len(inp)):
        output, hidden, cell = rnn(inp[c], hidden, cell)
        loss += loss_metric(output, target[c].unsqueeze(0))

    loss.backward()
    optimizer.step()

    return loss.data.item() / len(inp)



In [5]:
import time
import pandas as pd

train_data = pd.read_csv("../data/interim/train.csv")

n_epochs = 20
print_every = 1000
embed_size = 128 # ascii representation
hidden_size = 2048
lr = 0.0001

cuda = torch.device('cuda')
rnn = mLSTM(embed_size, hidden_size, embed_size, embed_size).cuda()
optimizer = torch.optim.Adam(rnn.parameters(), lr=lr)
loss_metric = nn.CrossEntropyLoss()

In [6]:
start = time.time()
all_losses = []
loss_avg = 0

dataset = list(map(char_tensor, train_data.message.values))

for epoch in range(1, n_epochs + 1):
    torch.save(rnn.state_dict(), "lstm_{}.pth".format(epoch))
    
    for i, review in enumerate(dataset): 
        if len(review) > 2:
            try:
                loss = train(review[:-1].cuda(), review[1:].cuda())       
            except:
                print(review, len(review))
                raise
                
            loss_avg += loss

            if i % print_every == 0:
                print('[(%d %d%%) %.4f]' % (epoch, epoch / n_epochs * 100, loss))
                print(generate_chunk(predict_len = 50), '\n')
    
    