In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
import sys
import time
import numpy as np
import random
import matplotlib.pyplot as plt
from utils import *
log = logger(True) ; log("We're logging :)")

# Auto-reloading of modules in iPython
%load_ext autoreload
%autoreload 2

# CONSTANT
SAVE_EVERY = 20
SEQ_SIZE = 50
RANDOM_SEED = 11
VALIDATION_SIZE = 0.15
LR = 1e-3
N_EPOCHS = 100
NUM_LAYERS, HIDDEN_SIZE = 1, 100
DROPOUT_P = 0
model_type = 'lstm'
torch.manual_seed(RANDOM_SEED)
INPUT = 'data/music_notation.txt'  
INP_LYRICS = 'data/lyrics.txt' 
TEST_INPUT = 'data/test.txt'
SAVED_ABC = 'data/output.abc'
RESUME = True
CHECKPOINT = 'ckpt_mdl_{}_ep_{}_hsize_{}_dout_{}_RMSprop'.format(model_type, N_EPOCHS, HIDDEN_SIZE, DROPOUT_P)
GENERATION_MAX_LENGTH = 1000

We're logging :)


In [2]:
# READ IN DATA FILE
f = open(INPUT,"r")
data, buffer = [], ''
store = False
for line in f:
    if line == '||\n':
        data += [buffer]
        buffer = ''
    else:
        buffer += line
f.close()

# We only want songs which are at least as big as our batch size +1
data = [ song for song in data if len(song) > SEQ_SIZE + 10 ]

log('=====> Data loaded')

for i in range(len(data)):
    data[i] = data[i].split('K:')[1]

=====> Data loaded


In [3]:
# READ A LYRICS FILE
f = open(INP_LYRICS,"r")
lyrics = f.read().split('||')
lyrics = [sent.replace('\n', ' ').lstrip(' ').rstrip(' ') for sent in lyrics]
log('=====> Lyrics loaded')

for i, sentence in enumerate(lyrics):
    words = sentence.split(' ')[:SEQ_SIZE]
    sent = ' '.join(words)
    lyrics[i] = sent

=====> Lyrics loaded


In [4]:
# READ A LYRICS FILE
f = open(TEST_INPUT,"r")
test_input = f.read()
log('=====> Test Lyrics loaded')

words = test_input.replace('\n', ' ').split(' ')[:SEQ_SIZE]
test_input = ' '.join(words)

=====> Test Lyrics loaded


In [5]:
# for character index encoding
char_idx = ''.join(set(list(open(INPUT,'r').read())))
char_list = list(char_idx)

# for words encoding
vocab = []
for sentence in lyrics:
    vocab.extend(sentence.split(' ')) 
vocab = set(vocab)
word_to_ix = {word: i for i, word in enumerate(vocab)}
word_to_ix["UNK"] = len(word_to_ix)

# NOW SPLIT INTO TRAIN/VALIDATION SETS
num_train = len(data)
indices = list(range(num_train))
split_idx = int(np.floor(VALIDATION_SIZE * num_train))

# Shuffle data and split
np.random.seed(RANDOM_SEED)
np.random.shuffle(indices)

train_idxs, valid_idxs = indices[split_idx:], indices[:split_idx]
train_len, valid_len = len(train_idxs), len(valid_idxs)
log('Number of unique characters: %s' % len(char_idx))
log('Original data length: %s' % len(data))
log('Training data length: %s'% train_len)
log('Validation data length: %s' % valid_len)
assert(train_len + valid_len == len(data)), 'Train_len + valid_len should == len(data)'

Number of unique characters: 93
Original data length: 468
Training data length: 398
Validation data length: 70


In [6]:
# Utility functions
def tic(): return time.time()


def toc(tic, msg=None):
    s = time.time() - tic
    m = int(s / 60)
    if msg:
        return '{}m {}s {}'.format(m, int(s - (m * 60)), msg)
    return '{}m {}s'.format(m, int(s - (m * 60)))


# Gives us a random slice of size SEQ_SIZE + 1 so we can get a train/target.
def rand_slice(data, slice_len=SEQ_SIZE):
    d_len = len(data)
    s_idx = random.randint(0, d_len - slice_len)
    e_idx = s_idx + slice_len + 1
    return data[s_idx:e_idx]


def seq_to_tensor(seq):
    '''
    create tensor from char seq
    '''
    out = torch.zeros(len(seq)).long()
    for i, c in enumerate(seq):
        out[i] = char_idx.index(c)
    return out


def train_slice(data, slice_len=50):
    '''
    creates a random training set
    '''
    slice_i = rand_slice(data, slice_len=slice_len)
    seq = seq_to_tensor(slice_i[:-1])
    target = seq_to_tensor(slice_i[1:])
    return Variable(seq), Variable(target)


def train_batch(data, b_size=100, slice_len=50):
    batch_seq = torch.zeros(b_size, slice_len).long()
    batch_target = torch.zeros(b_size, slice_len).long()
    for idx in range(b_size):
        seq, target = train_slice(data, slice_len=slice_len)
        batch_seq[idx] = seq.data
        batch_target[idx] = target.data
    return Variable(batch_seq), Variable(batch_target)


def lyrics_to_tensor(lyrics):
    '''
    create tensor from lyrics
    '''
    context_idxs = torch.zeros(SEQ_SIZE).long()
    for i, w in enumerate(lyrics.split(' ')):
        if i==SEQ_SIZE:
            break
        try:
            context_idxs[i] = word_to_ix[w]
        except:
            pass
    return context_idxs

# Given a song, return a sequence/target as a variable
def song_to_seq_target(song, lyrics):
    a_slice = rand_slice(song)
    seq = seq_to_tensor(a_slice[:-1])
    target = seq_to_tensor(a_slice[1:])
    text_seq = lyrics_to_tensor(lyrics)
    assert(len(seq) == len(target)), 'SEQ AND TARGET MISMATCH'
    return Variable(seq), Variable(target), Variable(text_seq)

print("=====> Utility functions loaded.")

=====> Utility functions loaded.


In [7]:
class MusicRNN(nn.Module):
    def __init__(self, text_input_size, input_size, hidden_size, output_size, model='gru', num_layers=1):
        super(MusicRNN, self).__init__()
        self.model = model
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers
        self.text_embeddings = nn.Embedding(text_input_size, hidden_size)
        self.embeddings = nn.Embedding(input_size, hidden_size)
        if self.model == 'lstm':
            self.rnn1 = nn.LSTM(hidden_size * 2, hidden_size, num_layers)
            self.rnn2 = nn.LSTM(hidden_size, hidden_size, num_layers)
        elif self.model == 'gru':
            self.rnn = nn.GRU(hidden_size, hidden_size, num_layers)
        else:
            raise NotImplementedError
        self.out = nn.Linear(self.hidden_size, self.output_size)
        self.drop = nn.Dropout(p=DROPOUT_P)
        
        
    def init_hidden(self):
        if self.model == 'lstm':
            self.hidden = (Variable(torch.zeros(self.num_layers, 1, self.hidden_size)),
                           Variable(torch.zeros(self.num_layers, 1, self.hidden_size)))
        elif self.model == 'gru':
            self.hidden = Variable(torch.zeros(self.num_layers, 1, self.hidden_size))
 

    def forward(self, seq, lyrics):
        embeds = self.embeddings(seq.view(1, -1)).view(1,1,-1) 
        text_embeds = self.text_embeddings(lyrics.view(1, -1))#.view(1,1,-1) 
        combined_embeds = torch.cat((embeds, text_embeds), 2)
        rnn_out, self.hidden = self.rnn1(combined_embeds, self.hidden)
        rnn_out = self.drop(rnn_out)
        rnn_out, self.hidden = self.rnn2(rnn_out, self.hidden)
        rnn_out = self.drop(rnn_out)
        output = self.out(rnn_out.view(1,-1)) 
        return output
    
    
print('=====> Defining model')

=====> Defining model


In [8]:
def train_pass(seq, target, text_seq, fit=True):
    model.init_hidden() 
    model.zero_grad()   
    loss = 0
    for i, c in enumerate(seq):
        output = model(c, text_seq[i])
        loss += loss_function(output, target[i].unsqueeze(0))
        
    if fit:
        loss.backward()
        optimizer.step()
    
    return loss.data.item() / len(seq)

In [9]:
# Model
RESUME = False
if RESUME:
        try:
            # Load checkpoint.
            print('==> Resuming from checkpoint..')
            assert os.path.isdir('checkpoint'), 'Error: no checkpoint directory found!'

            # load the previously trained model
            checkpoint = torch.load('./checkpoint/checkpoint1.pth.tar')
            model = checkpoint['model']
            loss = checkpoint['loss']
            v_loss = checkpoint['v_loss']
            losses = checkpoint['losses']
            v_losses = checkpoint['v_losses']
        except:
            print('No Pre-trained model found.')
            print('==> Building model..')
            in_size, out_size, text_in_size = len(char_idx), len(char_idx), len(word_to_ix)
            model = MusicRNN(text_in_size, in_size, HIDDEN_SIZE, out_size, model_type, NUM_LAYERS) #to do
            loss, v_loss = 0, 0
            losses, v_losses = [], []
else:
    print('==> Building model..')
    in_size, out_size, text_in_size = len(char_idx), len(char_idx), len(word_to_ix)
    model = MusicRNN(text_in_size, in_size, HIDDEN_SIZE, out_size, model_type, NUM_LAYERS) #to do
    loss, v_loss = 0, 0
    losses, v_losses = [], []

start_epoch = 0
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
# tried out various optimizers
# optimizer = torch.optim.SGD(model.parameters(), lr=LR, momentum=0.9)
# optimizer = torch.optim.Adagrad(model.parameters())
# optimizer = torch.optim.RMSprop(model.parameters())

loss_function = nn.CrossEntropyLoss()

==> Building model..


In [10]:
# Train Model
time_since = tic()
for epoch in range(start_epoch, N_EPOCHS):
    
    # Training
    for i, song_idx in enumerate(train_idxs):
        this_loss = train_pass(*song_to_seq_target(data[song_idx], lyrics[song_idx])) #modified
        loss += this_loss
        
        msg = '\rTraining Epoch: {}, {:.2f}% iter: {} Time: {} Loss: {:.4}'.format(
             epoch, (i+1)/len(train_idxs)*100, i, toc(time_since), this_loss)
        sys.stdout.write(msg)
        sys.stdout.flush()
    print()
    losses.append(loss / len(train_idxs))
        
    # Validation
    for i, song_idx in enumerate(valid_idxs):
        this_loss = train_pass(*song_to_seq_target(data[song_idx], lyrics[song_idx]), fit=False)
        v_loss += this_loss
        
        msg = '\rValidation Epoch: {}, {:.2f}% iter: {} Time: {} Loss: {:.4}'.format(
             epoch, (i+1)/len(valid_idxs)*100, i, toc(time_since), this_loss)
        sys.stdout.write(msg)
        sys.stdout.flush()
    print()
    v_losses.append(v_loss / len(valid_idxs))
    
    # Save checkpoint.
    if epoch % SAVE_EVERY == 0 and start_epoch != epoch or epoch == N_EPOCHS - 1:
        print('=======>Saving..')
        state = {
            'model': model,
            'loss': losses[-1],
            'v_loss': v_losses[-1],
            'losses': losses,
            'v_losses': v_losses,
            'epoch': epoch,
        }
        if not os.path.isdir('checkpoint'):
            os.mkdir('checkpoint')
#         torch.save({'state_dict': model.state_dict()}, './checkpoint/checkpoint.pth.tar')
        torch.save(state, './checkpoint/checkpoint1.pth.tar')
    
    loss, v_loss = 0, 0

Training Epoch: 0, 15.08% iter: 59 Time: 0m 44s Loss: 2.573

KeyboardInterrupt: 

In [None]:
# Plotting loss over training and validation data 
plt.rc('font', size=12)          # controls default text sizes
plt.rc('axes', titlesize=12)     # fontsize of the axes title
plt.rc('axes', labelsize=0)      # fontsize of the x and y labels
plt.rc('xtick', labelsize=12)    # fontsize of the tick labels
plt.rc('ytick', labelsize=12)    # fontsize of the tick labels
plt.rc('legend', fontsize=12)    # legend fontsize
plt.rc('figure', titlesize=12)   # fontsize of the figure title
plt.plot(losses, label='Training Loss')
plt.plot(v_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss per Epoch')
plt.legend()
plt.show()

In [None]:
def write_song(prime_str='<start>', lyrics = "Children, go where I send thee, How shall I send thee?", max_len=1000, temp=0.8):
    model.init_hidden()
    
    # "build up" hidden state using the beginging of a song '<start>'
    creation = '<start>'
    prime = Variable(seq_to_tensor(creation))
    for i in range(len(prime)-1):
        _ = model(prime[i], lyrics_to_tensor(lyrics)[i])

    # Generate rest of sequence
    for j in range(max_len):
        out = model(Variable(seq_to_tensor(creation[-1])), lyrics_to_tensor(lyrics)[0]).data.view(-1)
        
        out = np.array(np.exp(out/temp))
        dist = out / np.sum(out)

        # Add predicted character to string and use as next input        
        creation += char_idx[np.random.choice(len(dist), p=dist)]
        if creation[-5:] == '<end>':
            break

    return creation

In [None]:
# Testing string
if len(test_input.split(' '))>=SEQ_SIZE:
    test_input = ' '.join(test_input.split(' ')[:50])
    print('getting only 50 words of text input.')
log(write_song(lyrics = test_input, max_len=GENERATION_MAX_LENGTH, temp=0.8))