In [None]:
# Setup

# Characters are represented by 1-hot vectors of size 128
char_dim = 128

import numpy as np
import os
from collections import Counter
import unicodedata
import string
import gc
import matplotlib.pyplot as plt
import csv

import torch
from torch import nn
import torch.nn.functional as F
from torch.nn import LSTM
from torch import optim

In [None]:
# replaces special characters with their close equivalents in order to simplify the characters that appear
def clean_text(text):
    return str(unicodedata.normalize('NFD', text).encode('ascii', 'ignore')).replace('\\n', '\n')

file = open('beatles.txt', 'r')
beatleslyrics = ''
# ignore lines that consist of 'TOP'
line = file.readline()
while line != '':
    beatleslyrics += line
    line = file.readline()
file.close()
beatleslyrics = clean_text(beatleslyrics)
print('read %d characters' % len(beatleslyrics))
char_counter = Counter(beatleslyrics)
print(char_counter)

for key in char_counter.keys():
    if ord(key) >= 128:
        print('invalid character value found: %s has numeric value %d', key, ord(key))

In [None]:
# converts a list of N strings of length <=T into a numpy array of 1-hot vectors
# input: list of length N; max length of any string in the list is T
# output size: (T, N, 128)
i128 = np.eye(128)
def char_to_ix(texts):
    T = max([len(text) for text in texts])
    ords = np.zeros((T, len(texts)), dtype=int)
    for n, text in enumerate(texts):
        ords[:len(text), n] = [ord(char) for char in text]
    return i128[ords]

# converts a list of N strings of length <=T into a numpy array of length (T, N).
# Zero-pads shorter strings.
def char_to_array(texts):
    T = max([len(text) for text in texts])
    result = np.zeros((T, len(texts)), dtype=int)
    for n, text in enumerate(texts):
        result[:len(text), n] = [ord(char) for char in text]
    return result
    #ords = np.array([[ord(char) for char in text] for text in texts], dtype=int)
    #return ords.transpose((1, 0))

In [None]:
# free some memory if possible
train_data = None
val_data = None
test_data = None
val_data_ix = None
val_data_array = None
test_data_ix = None
test_data_array = None
gc.collect()

train_data = ''
val_data = ''
test_data = ''

# the string TOP separates all songs
songs = beatleslyrics.split('TOP\n')
print('number of songs: %d' % len(songs))
np.random.seed(0)
np.random.shuffle(songs)

train_data = songs[:len(songs) * 34 // 100]
val_data   = songs[len(songs) * 34 // 100:len(songs) * 67 // 100]
test_data  = songs[len(songs) * 67 // 100:]

train_data_ix = torch.tensor(char_to_ix(train_data), dtype=torch.float)
train_data_array = torch.tensor(char_to_array(train_data))

print(train_data_ix.shape)
print(train_data_array.shape)

val_data_ix = torch.tensor(char_to_ix(val_data), dtype=torch.float)
val_data_array = torch.tensor(char_to_array(val_data))

test_data_ix = torch.tensor(char_to_ix(test_data), dtype=torch.float)
test_data_array = torch.tensor(char_to_array(test_data))

print(len(train_data))
print(len(val_data))
print(len(test_data))

In [None]:
class BeatlesLSTM(nn.Module):
    def __init__(self, hidden_dim, num_stacks):
        super(BeatlesLSTM, self).__init__()
        self.hidden_dim = hidden_dim

        self.lstm = nn.LSTM(char_dim, hidden_dim, num_layers=num_stacks, dropout=0.0)
        
        # The linear layer that maps from hidden state space to character space
        self.hidden2char = nn.Linear(hidden_dim, char_dim)
        self.init_hidden_zeros(1)
    
    def init_hidden_zeros(self, minibatch_size):
        self.init_hidden(torch.zeros((self.lstm.num_layers, minibatch_size, self.hidden_dim)), torch.zeros((self.lstm.num_layers, minibatch_size, self.hidden_dim)))
    
    def init_hidden(self, h, c):
        self.hidden = (h, c)

    def forward(self, text):
        # text should be of size (T, N, char_dim)
        # returns character scores of size (T, N, char_dim)
        
        hs, self.hidden = self.lstm(text, self.hidden)
        char_space = self.hidden2char(hs)
        return char_space

In [None]:
def model_loss(model, loss_func, data_ix, data_array):
    model.lstm.eval()
    this_minibatch_size = data_ix.shape[1]
    model.init_hidden_zeros(this_minibatch_size)
    sequence_in = data_ix[:-1, :, :]
    #sequence_out = data_array[1:, :]

    #char_scores = model(sequence_in)
    #loss = loss_func(char_scores.view(-1, char_dim), sequence_out.view(-1))
    loss = 0
    with torch.no_grad():
        for i, char_in in enumerate(sequence_in):
            char_scores = model(char_in.view(1, this_minibatch_size, -1))
            loss += loss_func(char_scores.view(-1, char_dim), data_array[i+1,:])
    model.lstm.train()
    return loss / len(sequence_in)

In [None]:
def train_loop(model, epochs, train_data_ix, train_data_array, val_data_ix, val_data_array,
               checkpoint_name=None, minibatch_size=4, optimizer=None):
    loss_func = torch.nn.CrossEntropyLoss()
    if optimizer == None:
        optimizer = optim.RMSprop(model.parameters())
    train_losses = []
    val_losses = []

    for epoch in range(epochs):
        print('on epoch %d' % epoch)
        for i in range((train_data_ix.shape[1] + 1) // minibatch_size):
            print('\r\ton iteration %d / %d' % (i, (train_data_ix.shape[1] + 1) // minibatch_size), end='')
            model.zero_grad()

            sequence_in = train_data_ix[:-1, i * minibatch_size : (i + 1) * minibatch_size, :]
            sequence_out = train_data_array[1:, i * minibatch_size : (i + 1) * minibatch_size]

            # the last minibatch might have a different size if minibatch_size doesn't evenly divide the number of songs
            this_minibatch_size = sequence_in.shape[1]
            model.init_hidden_zeros(this_minibatch_size)

            char_scores = model(sequence_in)
            loss = loss_func(char_scores.contiguous().view(-1, char_dim), sequence_out.contiguous().view(-1))
            loss.backward()
            torch.nn.utils.clip_grad_norm_(gutenberg_model.parameters(), 5)
            optimizer.step()
        print()
        train_loss = model_loss(model, loss_func, train_data_ix, train_data_array)
        val_loss = model_loss(model, loss_func, val_data_ix, val_data_array)
        print('\ttraining loss = %f' % train_loss)
        print('\tvalidation loss = %f' % val_loss)
        train_losses += [train_loss]
        val_losses += [val_loss]
        if checkpoint_name != None:
            torch.save(model.state_dict(), checkpoint_name + str(epoch))
    return train_losses, val_losses