In [1]:
import torch
import numpy as np
import torch.nn.functional as F

from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms

# HERE IMPLEMENT CUDA
# if gpu is to be used
use_cuda = torch.cuda.is_available()
#use_cuda = False
print("use_cuda : ", use_cuda)
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor
ByteTensor = torch.cuda.ByteTensor if use_cuda else torch.ByteTensor
Tensor = FloatTensor

use_cuda :  False


In [2]:
class ToLongTensor():
    def __init__(self):
        pass
    def __call__(self, inp):
        return (torch.LongTensor(var) for var in inp)

In [3]:
class CharLSTM(torch.nn.Module):
    def __init__(self, input_size, embedding_len, hidden_size, output_size, n_layers=1):
        super().__init__()
        #store input parameters in the object so we can use them later on
        self.input_size = input_size
        self.embedding_len = embedding_len
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers

        #required functions for model
        self.encoder = torch.nn.Embedding(input_size, embedding_len)
        self.rnn = torch.nn.LSTM(embedding_len, hidden_size, n_layers, batch_first=True)
        self.decoder = torch.nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden):
        embedding = self.encoder(x.view(-1)) #encode our input into a vector embedding
        output, hidden = self.rnn(embedding.view(-1, 1, self.embedding_len), hidden) #calculate the output from our rnn based on our input and previous hidden state
        output = self.decoder(output.view(-1, self.hidden_size)) #calculate our output based on output of rnn
        return output, hidden

    def init_hidden(self, x):
        return (torch.zeros(self.n_layers, x.shape[0], self.hidden_size),
                torch.zeros(self.n_layers, x.shape[0], self.hidden_size)) #initialize our hidden and cell state to a matrix of 0s

In [4]:
class ToLongTensor():
    def __init__(self):
        pass
    def __call__(self, inp):
        return (torch.LongTensor(var) for var in inp)

In [26]:
class WriterRNN():
    def __init__(self, txt_file_path='Data/lyrics.txt', chunk_size=100, batch_size=32, train_epochs=50, transform=None, lower_case=True, lr=0.001):
        self.txt_file_path = txt_file_path
        self.chunk_size = chunk_size
        self.transform = transform
        self.lower_case = lower_case
        self.was_read = False
        self.batch_size = batch_size
        self.train_epochs = train_epochs
        self.criterion = torch.nn.CrossEntropyLoss()
        self.lr = lr
            
    def read(self, embedding_len, hidden_size):
        self.embedding_len = embedding_len
        self.hidden_size = hidden_size
        
        with open(self.txt_file_path, 'r') as file: # Imports data
            rawtxt = file.read()

        if self.lower_case: rawtxt = rawtxt.lower() # Converts to lower case by default
        
        letters = set(rawtxt)                                                         # List of unique characters
        self.nchars = len(letters)                                                    # No. of unique characters
        self.num_to_let = dict(enumerate(letters))                                    # Dictionary Mapping
        self.let_to_num = dict(zip(self.num_to_let.values(), self.num_to_let.keys())) # Reverse Mapping
        
        txt = [self.let_to_num[letter] for letter in list(rawtxt)]      # Convert list of characters to mapped numbers
        self.X = np.array(txt)                                          # Covert to numpy array
        self.was_read = True
        self.RNN = CharLSTM(self.nchars, self.embedding_len, self.hidden_size, self.nchars)      # Creates RNN
        self.optimiser = torch.optim.Adam(self.RNN.parameters(), lr=self.lr)                     # Creates an optimiser
        
    def __len__(self):
        return len(self.X) - self.chunk_size           # The number of datapoints we have based on the chunk size and X
    
    def __getitem__(self, idx):
        x = self.X[idx: idx + self.chunk_size]         # Get the chunk at the particular index
        y = self.X[idx + 1: idx + self.chunk_size + 1] # Get label as a shifted input 
        
        if self.transform:                             # If given apply the transformation
            x, y = self.transform((x, y))
    
        return x, y
    
    def optimise(self, train_loader, generated_string, epoch_loss, epoch):
        for idx, (x, y) in enumerate(train_loader):
            loss = 0                                     #cost for this batch
            h = self.RNN.init_hidden(x)                  #initialize our hidden state to 0s
            for i in range(self.chunk_size):             #sequentially input each character in the sequence for each batch and calculate loss
                out, h = self.RNN.forward(x[:, i], h)    #calculate outputs based on input and previous hidden state
                
                _, outl = out.data.max(1)                #based on our output, what character id does our network assign the highest probability of being next? # This is a [batch_size] sized Tensor
                    
                letter = self.num_to_let[outl[0].item()] #what chatacter is predicted for the 0th batch item?
                generated_string += letter               #add the predicted letter to our generated sequence
                
                loss += self.criterion(out, y[:, i])     #add the cost for this input to the cost for the current batch
            
            writer.add_scalar('Loss/Train', loss/chunk_size, epoch*len(train_loader) + idx)    # write loss to a graph
            
            self.optimiser.zero_grad()
            loss.backward()
            self.optimiser.step()
        
        epoch_loss += loss.item()         #add the cost of this sequence to the cost of this epoch
        return generated_string, epoch_loss
    
    def train(self):
        if not self.was_read:
            print("THE WRITER NEEDS TO READ BEFORE TRAINING!")
            return
        print("I am going to read now. {} goddamn times!".format(self.train_epochs))
        self.train_loader = DataLoader(self, batch_size = self.batch_size, shuffle=True)
        for epoch in range(self.train_epochs):
            epoch_loss = 0              # Stores the cost for each epoch
            generated_string = ''
            generated_string, epoch_loss = self.optimise(self.train_loader, generated_string, epoch_loss, epoch)
            epoch_loss /= len(train_loader.dataset) #divide by the number of datapoinst in each epoch

            print('Epoch ', epoch+1, ' Avg Loss: ', epoch_loss)
            print('Generated text: ', generated_string[0:150], '\n')
        
    def maparray(self, txt):
        tmp = [self.let_to_num[letter] for letter in list(txt)]
        txt = np.array(tmp) #convert to numpy array
        return txt
    
    def write_new(self, prime_str='a', str_len=150, temperature=0.75):
        generated_string = 'NEW CREATION: '

        prime_str = self.maparray(prime_str)          # use the maparray function to map the string to its character ids
        x = torch.LongTensor(prime_str).unsqueeze(0)  # convert to LongTensor and add dimension to make batch size 1
        h = self.init_hidden(x)                       # initialize hidden state

        for i in range(x.shape[1]-1):                 # for each input character except the last
            out, h = self.forward(x[:, i], h)         # feed that character into the network (prime hidden state)

        x = x[:, -1]                        #get the last letter
        for i in range(str_len):            #for each character we want to generate
            out, h = self.forward(x, h)     #feed in the last character 

            out_dist = out.view(-1).div(temperature).exp() #get the output and exponentiate
            sample = torch.multinomial(out_dist, 1).item() #turn into torch multinomial distribution and sample
            pred_char = self.num_to_let[sample]                 #convert the sampled number into the corresponding character

            generated_string += pred_char   #add the character to the generated string

            x = torch.LongTensor([sample])  #set the last letter equal to the newly generated character

        print(generated_string)
        
    

In [27]:
#hyper-params
lr = 0.001
train_epochs = 50
batch_size = 32
chunk_size = 100
embedding_len = 400
hidden_size = 128

writer = SummaryWriter() # we will use this to show our models performance on a graph

Commie = WriterRNN(transform=ToLongTensor())

Commie.read(embedding_len, 
            hidden_size
           )

In [28]:
Commie.train()

I am going to read now. 50 goddamn times!


KeyboardInterrupt: 

In [None]:
Commie.write_new()