In [1]:
import numpy as np
from tqdm import tqdm
import torch
from torch.utils.data import DataLoader
from torch.nn.utils import clip_grad_norm_
from torch.utils.tensorboard import SummaryWriter

import os

# Dataset 

In [2]:
dataset_dir = "../Dataset/"
files = os.listdir(dataset_dir)

dataset_files = [ os.path.join(dataset_dir, file) for file in files]

In [3]:
songs = {}
for dataset_filename in dataset_files:
    abc_notation_file = open(dataset_filename, 'r')
    songs[os.path.basename(dataset_filename)] = abc_notation_file.read()
    abc_notation_file.close()

In [4]:
#File to train
train_list = list(songs.keys())
musical_train_file = train_list[0]

## Vectorize the text

Before we begin training our RNN model, we'll need to create a numerical representation of our text-based dataset. To do this, we'll generate two lookup tables: one that maps characters to numbers, and a second that maps numbers back to characters.

In [5]:
# Find all unique characters in the joined string
vocab = sorted(set(songs[musical_train_file]))
print("There are", len(vocab), "unique characters in the dataset")

There are 83 unique characters in the dataset


In [6]:
# Create a mapping from character to unique index.
# For example, to get the index of the character "d", 
#   we can evaluate `char2idx["d"]`.  
char2idx = {u:i for i, u in enumerate(vocab)}

# Create a mapping from indices to characters. This is
#   the inverse of char2idx and allows us to convert back
#   from unique index to the character in our vocabulary.
idx2char = np.array(vocab)

In [7]:
print('{')
for char,_ in zip(char2idx, range(20)):
    print('  {:4s}: {:3d},'.format(repr(char), char2idx[char]))
print('  ...\n}')

{
  '\n':   0,
  ' ' :   1,
  '!' :   2,
  '"' :   3,
  '#' :   4,
  "'" :   5,
  '(' :   6,
  ')' :   7,
  ',' :   8,
  '-' :   9,
  '.' :  10,
  '/' :  11,
  '0' :  12,
  '1' :  13,
  '2' :  14,
  '3' :  15,
  '4' :  16,
  '5' :  17,
  '6' :  18,
  '7' :  19,
  ...
}


In [8]:
def vectorize_string(string):
  vectorized_output = np.array([char2idx[char] for char in string])
  return vectorized_output

In [9]:
print(vectorize_string(songs[musical_train_file]))

[49 22 14 ... 22 82  2]


In [10]:
print(len(songs[musical_train_file]))
print(len(vectorize_string(songs[musical_train_file])))

200425
200425


## PyTorch Dataset

In [5]:
class MusicalDataset(torch.nn.Module):
    def __init__(self, abc_string, seq_lenght):
        self.dataset = abc_string
        self.seq_lenght = seq_lenght

        self.vocab = self.vocabulary(abc_string)
        self.char2idx, self.idx2char = self.mapping(self.vocab)
        self.vectorized_dataset = self.vectorize_string(self.dataset)
    
    def __len__(self):
        '''
            Why -1...?
            Suppose seq_length is 4 and our text is "Hello". Then, our
            input sequence (x) is "Hell" and the target sequence (y) is "ello".
        '''
        return len(self.dataset) - self.seq_lenght - 1
        
    def __getitem__(self, idx):
        x = self.vectorized_dataset[idx : idx + self.seq_lenght]
        y = self.vectorized_dataset[idx + 1 : idx + self.seq_lenght + 1]

        return [torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)]
    
    def getVocabSize(self):
        return len(self.vocab)
    
    def getSequenceLength(self):
        return self.seq_lenght

    def vectorize_string(self, string):
        '''
            Vectorize (convert to numerical) a string using the
            mapping created by the notation presented in the dataset.
            
            @return a numpy array with `N` elements, where `N` is
                the number of characters in the input string
        '''
        return np.array([self.char2idx[char] for char in string])
    
    def vocabulary(self, string):
        '''
            Return the vocabulary used in the input string, i.e
            a set of no duplicated elements.
            
            @param string: the dataset with several songs written using
                a specific anotation
        '''
        return sorted(set(string))
    
    def mapping(self, vocab):
        '''
         Create a mapping from character to unique index and from 
         indices to characters. 
         
         @param vocab: A set with no duplicate elements which represents
             the vocabulary of our anotation (all unique characters).
         @return Mapping contained in a list [char2idx, idx2char]        
        '''
        char2idx = {u:i for i, u in enumerate(vocab)}
        idx2char = np.array(vocab)
        
        return [char2idx, idx2char]


In [None]:
test = MusicalDataset(songs[musical_train_file], 8)
print(len(test))

dataloader = DataLoader(test, batch_size=2, shuffle=False, num_workers=4)
print(len(dataloader))

idx = 0
for inputs, targets in dataloader:
    idx += 1

print(targets)

In [None]:
x, y = next(iter(dataloader))
print(x.shape)

# The Recurrent Neural Network (RNN) model

The model is based off the LSTM architecture, where we use a state vector to maintain information about the temporal relationships between consecutive characters.

<img src="https://raw.githubusercontent.com/aamini/introtodeeplearning/2019/lab1/img/lstm_unrolled-01-01.png" alt="Drawing"/>

Interesante añadir una descripción del funcionamiento de los *emmbeding layers* para que quede claro su funcionamiento. 

Arreglar la imagen para que quede más explicito las dimensiones entre capas.

Añadir los detalles de los pasos del LSTM: olvidar, añadir...

In [6]:
class MusicalLSTMModel(torch.nn.Module):
    '''Container module with an encoder, a recurrent module, and a decoder.'''

    def __init__(self, vocab_size, embedding_dim, hidden_state_dim, rnn_units, dropout=0.05):
        super(MusicalLSTMModel, self).__init__()

        self.encoder = torch.nn.Embedding(vocab_size, embedding_dim)
        self.rnn = torch.nn.LSTM(embedding_dim, hidden_state_dim, rnn_units, \
                                 dropout=dropout, batch_first=True)
        self.decoder = torch.nn.Linear(hidden_state_dim, vocab_size)

    def forward(self, x, hidden):
        emb = self.encoder(x) # (batch_size, sequence_length, embedding_dim)
        output, hidden = self.rnn(emb) # (batch_size, sequence_length, hidden_size)
        
        output = self.reshapeLSTMOutput(output) # (batch_size*sequence_length, hidden_size)
               
        output = self.decoder(output) # (batch_size*sequence_length, vocab_size)
        return output, hidden
    
    def reshapeLSTMOutput(self, lstm_output):
        '''
            This function reshapes the LSTM output in order to be able
            to use in the following layer, nn.Linear() layer.
            
            
            @param lstm_output: tensor output from LSTM layer shich shape is 
                (batch_size, sequence_length, hidden_size) 
                
            @result: Reshape output to (batch_size*sequence_length, hidden_size)
        '''
        batch_size = lstm_output.size(0) * lstm_output.size(1)
        hidden_size = lstm_output.size(2)
        return lstm_output.reshape(batch_size, hidden_size)
        

## Test

In [7]:
batch_size = 4
seq_length = num_layers = 8
hidden_size = 512
vocab_size = 3
learning_rate = 5e-2
seq_length = 100
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

dataset = MusicalDataset(songs[musical_train_file], seq_lenght=seq_length)

model = MusicalLSTMModel(dataset.getVocabSize(), embedding_dim=256, hidden_state_dim=hidden_size, \
                         rnn_units=dataset.getSequenceLength()).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)




In [None]:
input = ((vocab_size) * torch.rand(batch_size, seq_length)).type(torch.long)
states = (torch.zeros(num_layers, batch_size, hidden_size),
          torch.zeros(num_layers, batch_size, hidden_size))

print("Input shape:      ", input.shape, " # (batch_size, sequence_length)")
pred, hidden = model(input, states)

print("Prediction shape: ", pred.shape, "# (batch_size * sequence_length, vocab_size)")
print("Hidden shape: ", hidden[0].shape, "# (num_layers, batch_size, vocab_size)")

# Train

https://github.com/yunjey/pytorch-tutorial/blob/master/tutorials/02-intermediate/language_model/main.py

¿Por qué el detach del estado?
https://discuss.pytorch.org/t/solved-why-we-need-to-detach-variable-which-contains-hidden-representation/1426

In [8]:
def train(model, dataset, batch_size, optimizer, n_epochs=25, writer=None):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    criterion = torch.nn.CrossEntropyLoss(reduction="mean")
    seq_length = num_layers = dataset.getSequenceLength()
    
    # Lambda function to detach the hidden state from t-1, It has to be considered
    # as constant
    detach = lambda states : [state.detach() for state in states]
    
    for epoch in tqdm(range(n_epochs)):
        # Set initial hidden and cell states
        states = (torch.zeros(num_layers, batch_size, hidden_size).to(device),
                  torch.zeros(num_layers, batch_size, hidden_size).to(device))
        
        # statistics
        running_loss = 0.0
        
        for idx, (inputs, targets) in enumerate(dataloader):
            inputs = inputs.to(device)
            targets = inputs.to(device)
            
            states = detach(states)
            outputs, states = model(inputs, states)
            loss = criterion(outputs, targets.reshape(-1))
            
            optimizer.zero_grad()
            loss.backward()
            clip_grad_norm_(model.parameters(), 0.5)
            optimizer.step()
        
            step = (idx+1) // seq_length
            if step % 10000000000 == 0:
                if writer:
                    writer.add_scalar('Train/Iterative_Loss', loss.item(), idx)
            
            running_loss +=  loss.item() * inputs.size(0)
#             step = (idx+1) // seq_length
#             if step % 10000 == 0:
#                 print ('Epoch [{}/{}], Step[{}/{}], Loss: {:.4f}, Perplexity: {:5.2f}'
#                        .format(epoch+1, n_epochs, step, batch_size, loss.item(), np.exp(loss.item())))
                
        epoch_loss = running_loss / len(dataset)
        if writer:
            writer.add_scalar('Train/EpochLoss', epoch_loss, epoch)
        

In [9]:
batch_size = 6
hidden_size = 512
learning_rate = 1e-3
seq_length = 50
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

dataset = MusicalDataset(songs[musical_train_file], seq_lenght=seq_length)

model = MusicalLSTMModel(dataset.getVocabSize(), embedding_dim=64, hidden_state_dim=hidden_size, \
                         rnn_units=dataset.getSequenceLength()).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

writer = SummaryWriter("tensorboard/test/")
train(model, dataset, batch_size, optimizer, 1, writer)

  0%|          | 0/1 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
test = lambda a : a + 10
print(test(5))

In [None]:
seq_lenght = rnn_units = 11
dataset = MusicalDataset(songs[musical_train_file], seq_lenght)
vocab_size = dataset.getVocabSize()
batch_size = 2
dataloader = DataLoader(dataset, batch_size, shuffle=False, num_workers=0)

model = MusicalLSTMModel(vocab_size, embedding_dim=256, hidden_state_dim=512, rnn_units=rnn_units)
criterion = torch.nn.CrossEntropyLoss(reduction="sum")

In [None]:
x, y = next(iter(dataloader))
print("Input shape:      ", x.shape, " # (batch_size, sequence_length)")
h0 = torch.randn(rnn_units, seq_lenght, 512)
c0 = torch.randn(rnn_units, seq_lenght, 512)
states = (h0, c0)

print("Prediction shape: ", pred.shape, "# (batch_size * sequence_length, vocab_size)")
print("Hidden shape: ", hidden[0].shape, "# (batch_size, sequence_length, vocab_size)")

print(criterion(pred, y.reshape(-1)))

In [None]:
x, y = next(iter(dataloader))
print("Input shape:      ", x.shape, " # (batch_size, sequence_length)")
h0 = torch.randn(rnn_units, seq_lenght, 512)
c0 = torch.randn(rnn_units, seq_lenght, 512)
states = (h0, c0)
pred, hidden = model(x, states)

print("Prediction shape: ", pred.shape, "# (batch_size * sequence_length, vocab_size)")
print("Hidden shape: ", hidden[0].shape, "# (batch_size, sequence_length, vocab_size)")
# print(pred)

In [None]:
print("Prediction shape: ", pred.shape, "# (batch_size, sequence_length, vocab_size)")
print("Target shape: ", y.shape, "# (batch_size, sequence_length)")

In [None]:
# pred_test = pred[:, 0:4, 0:2].permute(0,2,1)
pred_test = pred[:, 0:4, 0:2]
y_test = y[:, 0:4]

print("Pred. shape: {}".format(pred_test.shape))
print(pred_test)
print("Target shape: {}".format(y_test.shape))
print(y_test)
y_test[:] = 1



criterion = torch.nn.CrossEntropyLoss(reduction="sum")
print(criterion(pred_test, y_test.reshape(-1)))

In [None]:
print(y.shape)
print(y.reshape(-1).shape)

In [None]:
pred_test = pred[:, 0:2, 0:2]
y_test = y[:, 0:2]
# print(pred_test.shape)
# print(pred_test)

print("-"*10)
logSoftmax = torch.nn.LogSoftmax(dim=2)
soft = logSoftmax(pred_test)
# print(soft.shape)
# print(soft)
print(y_test.shape)
print(y_test)
print("-"*10)
a, labels = logSoftmax(pred_test).max(dim=2)
print(labels.shape)
print(a.shape)
print(y_test.reshape((2, -1)))

criterion = torch.nn.CrossEntropyLoss()
print(criterion(a, y_test.reshape(-1)))


In [None]:
_, labels = pred.max(dim=2)
print(labels)
print(labels.shape)

In [None]:
criterion = torch.nn.CrossEntropyLoss()
loss = criterion(pred, y)
print(loss)

# Useless

In [None]:
class SparseCategoricalCrossEntropyWithLogits(torch.nn.Module):
    
    def __init__(self, dim=2):
        super(SparseCategoricalCrossEntropyWithLogits, self).__init__()
        self.dim = dim
        self.logSoftmax = torch.nn.LogSoftmax(dim=self.dim)
        self.nllLoss = torch.nn.NLLLoss()
    
    def forward(self, pred, target):
        _, labels = self.logSoftmax(pred).max(dim=self.dim)
        return self.nllLoss(labels, target)