In [1]:
%reload_ext autoreload
%autoreload 2
%matplotlib inline

In [2]:
import numpy as np
import torch
import torch.utils.data
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torchvision import datasets, transforms
from torchvision.utils import make_grid , save_image

In [3]:
from constants import *

In [4]:
import matplotlib.pyplot as plt

### Old way of pulling out corpus

In [5]:
with open(f'{CONCAT_DIR}/concat_corpus.utf') as f:
    train_contents = f.read()

In [6]:
train_contents[0:10]

'їPÿ\x07{\x919\x05)\x1c'

In [7]:
import h5py    
import numpy as np    
import json
concat_h5 = h5py.File(f'{CONCAT_DIR}/concat_corpus.h5','r+') 

concat_json = json.load(open(f'{CONCAT_DIR}/concat_corpus.json', 'rb'))

  from ._conv import register_converters as _register_converters


OSError: Unable to open file (unable to lock file, errno = 11, error message = 'Resource temporarily unavailable')

In [8]:
t = concat_h5['train']

NameError: name 'concat_h5' is not defined

In [None]:
t[:10]

### One hot encoding

In [None]:
def one_hot(a,c): 
    return np.eye(c)[a]

In [None]:
class MusicDataset(torch.utils.data.Dataset):
    """Face Landmarks dataset."""

    def __init__(self, h5_file, set_type, json_file, timesteps, root_dir):
        self.concat_h5 = h5py.File(f'{root_dir}/{h5_file}','r+')
        self.dataset = self.concat_h5[set_type]
        self.concat_json = json.load(open(f'{root_dir}/{json_file}', 'rb'))
        self.vocab_size = len(self.concat_json['idx_to_token'])+1
        self.data_length = self.dataset.shape[0]
        self.timesteps = timesteps

    def __len__(self):
#         print((self.data_length - self.timesteps)//10)
#         return (self.data_length - self.timesteps)//10
        return (self.data_length // self.timesteps)

    def __getitem__(self, idx):
#         print(np.arange(10)[0:8]) # example
#         print(np.arange(10)[8])
        # (AS) Should not have duplicate sequences. 
        # RBMs do not actually use target value, so no point in repeating next char
#         x = self.dataset[idx:idx+timesteps]
#         y = self.dataset[idx+timesteps]
        
        start = idx*self.timesteps
        x = self.dataset[start:start+self.timesteps]
        y = self.dataset[start+self.timesteps]
#         x_hot = one_hot(x, self.vocab_size)
        return x, y


In [None]:
md = MusicDataset(h5_file='concat_corpus.h5', set_type='train', json_file='concat_corpus.json', timesteps=15, root_dir=CONCAT_DIR)

In [None]:
batch_size = 64
train_loader = torch.utils.data.DataLoader(md,
    batch_size=batch_size)

### Dataset sanity test

In [None]:
train_iter = enumerate(train_loader)

In [None]:
i, (x, y) = next(train_iter)
i2, (x2, y2) = next(train_iter)

In [None]:
md.dataset[:100]

In [None]:
# a = np.argmax(x[11], axis=1)[:-1]
# b = np.argmax(x[10], axis=1)[1:]
# np.testing.assert_array_equal(a, b)

In [None]:
# a = np.argmax(x2[0], axis=1)[:-1]
# b = np.argmax(x[-1], axis=1)[1:]
# np.testing.assert_array_equal(a, b)

### Model

In [None]:
def repackage_var(h):
    """Wraps h in new Variables, to detach them from their history."""
    return torch.autograd.Variable(h.data).cuda() if type(h) == torch.autograd.Variable else tuple(repackage_var(v) for v in h)

In [None]:
class StatefulLSTM(torch.nn.Module):
    def __init__(self, scale_size, n_hidden, n_factors, bs, nl):
        super().__init__()
        self.scale_size = scale_size
        self.nl = nl
        self.embedding = torch.nn.Embedding(scale_size, n_factors)
        self.rnn = torch.nn.LSTM(n_factors, n_hidden, nl, dropout=0.5)
        self.l_out = torch.nn.Linear(n_hidden, scale_size)
        self.n_hidden = n_hidden
        self.init_hidden(bs)
        
    def forward(self, notes):
        bs = notes[0].shape[0]
        if self.h[0].size(1) != bs: 
            print('batch size is not same size as original:', bs)
            self.init_hidden(bs)
        emb = self.embedding(notes)
        outp,h = self.rnn(emb, self.h)
        self.h = repackage_var(h)
#         return torch.nn.functional.log_softmax(self.l_out(outp), dim=-1).view(-1, self.scale_size)
        return torch.nn.functional.log_softmax(self.l_out(outp[:, -1, :]), dim=-1)
#         return torch.nn.functional.softmax(self.l_out(outp[:, -1, :]), dim=-1)
    
    def init_hidden(self, bs):
        h1 = torch.autograd.Variable(torch.zeros(self.nl, bs, self.n_hidden))
        h2 = torch.autograd.Variable(torch.zeros(self.nl, bs, self.n_hidden))
        if self._cuda():
            self.h = (h1.cuda(), h2.cuda())
        else:
            self.h = (h1, h2)
            
    def _cuda(self):
        return next(self.parameters()).is_cuda

### Training

In [None]:
m = StatefulLSTM(md.vocab_size, n_hidden=256, n_factors=10, bs=batch_size, nl=2).cuda()

In [None]:
train_op = torch.optim.Adam(m.parameters(), lr=1e-3)

In [None]:
loss_fn = torch.nn.NLLLoss()

In [None]:
display_step = 100
training_steps = 100
for step in range(training_steps):
# for step in tqdm(range(training_steps)):
    for i, (data,target) in enumerate(train_loader):
        data, target = torch.autograd.Variable(data.long().cuda()), torch.autograd.Variable(target.long().cuda())
        m.zero_grad()
        forward = m(data)
        loss = loss_fn(forward, target)
        loss.backward()
        train_op.step()
#     if (step % display_step == 0):
    print(f'Step: {step} Loss: {loss.data[0]}')

### Saving model

In [None]:
model_path = f'{OUT_DIR}/../models/bachbot_vanilla_rnn.h5'

In [None]:
torch.save(m.state_dict(), model_path)

In [None]:
m.load_state_dict(torch.load(model_path))

### Generate music

In [None]:
gen_song = encoded_songs[ind][:timesteps].tolist() # TODO explore different (non-random) seed options

Need to have unknown state 0?

In [None]:
timesteps = md.timesteps
output_size = md.vocab_size

In [None]:
gen_song = md.dataset[:timesteps]

In [None]:
def generate_sequence(song, seq_length):
    full_song = song.tolist()
    # generate music!
    m.init_hidden(batch_size)
    for i in range(seq_length):
        seed = np.array([full_song[-timesteps:]])
        # Use our RNN for prediction using our seed! 
        seed_v = torch.autograd.Variable(torch.from_numpy(seed).long()).cuda()
    #     seed_v = torch.autograd.Variable(torch.from_numpy(np.argmax(seed, axis=1)).long()).cuda() # for onehot
        predict_probs = m(seed_v)

        percentage_prob = (np.e ** predict_probs.data.cpu().numpy())
        # Define output vector for our generated song by sampling from our predicted probability distribution
        
    #     sampled_note = np.random.choice(range(md.vocab_size), p=percentage_prob[0]) # TODO
        sampled_note = np.argmax(percentage_prob)
    #     print('Sampled_note:', sampled_note)
        full_song.append(sampled_note)
    return full_song
    


In [None]:
import decode

In [None]:
def decode_output(output_idx):
    idx2token = md.concat_json['idx_to_token']
    token_list = list(map(lambda x: idx2token.get(str(x), ''), output_idx))
    return decode_token(token_list)

def decode_token(token_list):
    if (token_list[0] != START_DELIM):
        token_list.insert(0, START_DELIM)
    token_str = ''.join(token_list)
    with open(f'{SCRATCH_DIR}/utf_to_txt.json', 'r') as f:
        utf_to_txt = json.load(f)
    score, stream = decode.decode_string(utf_to_txt, token_str)
    return token_str, score, stream

# test = [idx2token[f'{x}'] for x in seq_arr]; test

In [None]:
song_seed = md.dataset[:md.timesteps]
generated_idxs = generate_sequence(song_seed, 500)

In [None]:
token_str, score, stream = decode_output(generated_idxs)

In [None]:
token_str

In [None]:
score

### For testing stuff

In [None]:
with open(f'{SCRATCH_DIR}/utf_to_txt.json', 'r') as f:
    utf_to_txt = json.load(f)

In [None]:
test_str = train_contents[:200]

In [None]:
test_str = open(f'{SCRATCH_DIR}/BWV-400-nomask-fermatas.utf', 'r').read()[:200]

In [None]:
token_str, score, stream = decode_token(test_str)

In [None]:
stream.elements[:10]

### Evaluate stream

In [None]:
stream.show()

In [None]:
fp = stream.write('midi', fp=f'{OUT_DIR}/testout.midi')

In [None]:
fp = stream.write('xml', fp=f'{OUT_DIR}/testout.xml')

In [None]:
from IPython.lib.display import FileLink
FileLink('../data/bachbot/out/testout.midi')