In [None]:
import torch
import torch.nn as nn

In [None]:
def encode_sample_to_hot_vector_and_target_indexes(sample,dictionary,dic_length):
    #here x[0] is all zeros row, it is used as x0=0 as initial input to sequence generation
    x = torch.zeros(len(sample)+1,dic_length)
    y = torch.empty((len(sample)),dtype=torch.long)
    
    for j,ch in enumerate(sample):
        k = chr_to_idx[ch]
        x[j+1,k]=1.0
        y[j] = k
    return x[:-1], y

In [None]:
def encode_samples_to_hot_tensors(samples_list,chr_to_idx_dict):
    
    dic_length = len(chr_to_idx_dict)
    x_train_list = []
    
    for i, txt in enumerate(samples_list):
        x,y = encode_sample_to_hot_vector_and_target_indexes(txt,chr_to_idx_dict,dic_length)
        x_train_list.append((x,y))

    #x_train_list.sort(key=lambda val: val[0].shape[0],reverse=True)
    
    return x_train_list

In [None]:
def batch_generator(samples_list,batch_size):
    total_samples_num = len(samples_list)    
    #total_samples_num = 150

    batch_number = int(total_samples_num/batch_size)+int(total_samples_num%batch_size>0)
    
    rand_indexes = torch.randperm(total_samples_num)
    
    for i in range(batch_number):
        rand_indexes_for_batch = rand_indexes[batch_size*i:batch_size*(i+1)]
        batch_samples = [samples_list[j] for j in rand_indexes_for_batch]
        batch_samples.sort(key=lambda s: s[0].shape[0],reverse=True)
        
        yield batch_samples

In [None]:
def batch_preprocess(batch):
    x_list = [sample[0] for sample in batch]
    y_list = [sample[1] for sample in batch]
    
    x_train_batch = torch.nn.utils.rnn.pack_sequence(x_list)
    y_target_batch = torch.nn.utils.rnn.pack_sequence(y_list)
    
    return x_train_batch, y_target_batch

In [None]:
class LSTM_module(nn.Module):
    def __init__(self,hidd_size,num_layers,is_bidirect, dictionary):
        super().__init__()
        # number of lstm stacked layers
        self.num_layers = num_layers 
        self.input_size = len(dictionary)
        self.hidden_size = hidd_size
        
        # number of directions in lstm
        self.num_directions = 2 if is_bidirect else 1 
        
        self.idx_to_chr = dictionary

        self.lstm = nn.LSTM(input_size=self.input_size, hidden_size=self.hidden_size,num_layers=num_layers,bias=True,batch_first=False,bidirectional=is_bidirect)
        self.linear = nn.Linear(self.hidden_size*self.num_directions,self.input_size)
    
    def _forward(self,x): 
        #x.shape --> (sequence_length,batch,features_size)
        #lstm_out.shape --> (sequence_length,batch,hidden_size)
        #h_n, c_n shape --> (num_layers * num_directions, batch, hidden_size)
        lstm_out, (h_n, c_n) = self.lstm(x) 
        
        #y_hat shape --> (sequence_length,batch,output_size=input_size)
        y_hat = self.linear(lstm_out.data)
        
        return y_hat,(h_n, c_n)
    
    def forward(self,x):
        return self._forward(x)
        
    def generate_samples(self,samples_number_to_generate, max_sample_length):
        #x.shape --> (sequence_length,batch,features_size)
        #lstm_out.shape --> (sequence_length,batch,hidden_size)
        #h_n, c_n shape --> (num_layers * num_directions, batch, hidden_size)
        x = torch.zeros(1,1,self.input_size)
        h_t = c_t = torch.zeros((self.num_layers*self.num_directions,1,self.hidden_size))
        
        #generated samples are appended to this list
        generated_samples_list=[] 
        
        #this list is used for constracting one sample
        generated_sample = []     
        
        #this loop is used to generate desired number of samples
        for i in range(samples_number_to_generate): 
            #here we zero out initial input and initial hidden and state variables
            x.zero_()
            h_t.zero_()
            c_t.zero_()
    
            for t in range(max_sample_length):
                #forward propagating previously generated character given as vector x. 
                #(h_t,c_t) are hidden and memory state from previous iteration
                
                #lstm_out shape --> (sequence_length,batch,hidden_size)
                #h_n, c_n shape --> (num_layers * num_directions, batch, hidden_size)
                lstm_out, (h_t, c_t) = self.lstm(x,(h_t,c_t))
                
                #y_hat is unnormilized output of linear layer which is applied to output of lstm layer
                #y_hat shape --> (sequence_length,batch,output_size=input_size)
                y_hat = self.linear(lstm_out)
                
                #these are probabilities of possible characters (this is probability distribution generated by network)
                #we need to squeeze y_hat to obtain one dimensional tensor
                #p_model length is dictionary length
                p_model = torch.nn.functional.softmax(y_hat,dim=2).squeeze()
        
                #here we sample index of charachter from probability distribution array
                generated_char_ind = torch.multinomial(p_model,1).item()
                
                #find corresponding character in dictionary for sampled index 
                generated_char = self.idx_to_chr[generated_char_ind]
                
                #append this generated charachter to sample being generated. (we construct sample one by one character)
                generated_sample.append(generated_char)
        
                #here we generate hot vector for already generated character and will use it as next input to network
                x.zero_()
                x[0,0,generated_char_ind]=1.
                
                #if network generates end of line character, we stop generation for this sample and begin for next sample.
                if generated_char=='\n' or generated_char=='.':
                    break
            #generate sample string and save that sample to samples list
            generated_samples_list.append(''.join(generated_sample))
        
            #prepare for next sample construction
            generated_sample.clear()
        
        return generated_samples_list
            

In [None]:
total_training_samples_num =  None
batch_size = 8

with open('shakespeare.txt') as f:
    whole_text = f.read()
    alpha = set(whole_text.lower())
    del whole_text
    idx_to_chr = {i:c for i,c in enumerate(alpha)}
    chr_to_idx = {c:i for i,c in enumerate(alpha)}
    f.seek(0)
    samples = f.readlines()
    samples = [l.lower() for i,l in enumerate(samples) 
               if total_training_samples_num is None or i < total_training_samples_num]

In [None]:
dictionary_size = len(alpha)
hidden_size = 64
output_size = dictionary_size
num_layers = 2
directions_num = 1
is_bidirectional = directions_num == 2

samples_number_to_generate = 20
max_sample_length = 100

In [None]:
samples_tensor_list = encode_samples_to_hot_tensors(samples,chr_to_idx)

In [None]:
module = LSTM_module(hidden_size,num_layers,is_bidirectional,idx_to_chr)

In [None]:
#we can see here sample generation by untrained model
module.generate_samples(samples_number_to_generate,max_sample_length)

In [None]:
criteria = nn.CrossEntropyLoss()

In [None]:
optim = torch.optim.Adam(module.parameters())

In [None]:
for epoch in range(20):
    batch_gen = batch_generator(samples_tensor_list,batch_size)
    
    for batch in batch_gen:
        x, y = batch_preprocess(batch)

        y_hat, (h_n, c_n) = module(x)

        loss = criteria(y_hat, y.data)
        optim.zero_grad()
        loss.backward()
        optim.step()
    print('epoch {} | loss {}'.format(epoch,loss.item()))
print('finished.')

In [None]:
module.eval()

In [None]:
#here is sample generation by trained model
module.generate_samples(samples_number_to_generate,max_sample_length)