<a href="https://colab.research.google.com/github/RodrigoMarquesP/Music_Generation_with_Recurrent_Neural_Networks-An_AI_Composer/blob/master/Geracao_de_musica_com_RNNs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Este projeto é inspirado no curso 6.S191 do MIT de 2020, incluindo o uso dos dados de treinamento, porém ele é extremamente reprodutível com qualquer outro conjunto de dados.

Obs: O texto se atém ao máximo à língua portuguesa, mas para manter os termos mais frequentemente encontrados na literatura e comunidade, utilizaremos as palavras ***input/output*** para ***entrada/saída*** da rede respectivamente, bem como usaremos ***dataset*** para falar sobre o ***conjunto de dados***.

# ***Redes Neurais Recorrentes como uma solução para problemas sequenciais***

Um problema sequencial no nicho de dados é um problema envolvendo dados sequenciais de qualquer tipo - como um vetor de informações -, onde os termos da sequência tem relações de significado, estado, e ordem com outros. Tal situação pode trazer consigo diversos problemas para os modelos tradicionais de aprendizado de máquina e deep learning, bem como:


*   O **comprimento** da sequência pode ser **variável**.
*   A **ordem** dos termos pode ser **variável**, mudando ou não seu significado/valor.
*   **Relações importantes** podem estar **distantes** na sequência, não permitindo que os modelos considerem apenas um comprimento fixo para todos inputs.
*   **Um termo** pode ter seu **significado e/ou relevância diferentes** dependendo de sua posição e/ou dos outros termos que o cercam.

Tratar esses pontos nos permite diversas aplicações, como previsões climáticas, previsão de preços de ações, predição de rastreamento de objetos (essencial para carros autônomos), geração de texto, classificação de sentimentos, geração de músicas, e muitos outros.

   

<p align="center">
  <img src="https://raw.githubusercontent.com/RodrigoMarquesP/Music_Generation_with_Recurrent_Neural_Networks-An_AI_Composer/master/files/climatic.gif"> 
</p>  


<p align="center">
  <img src="https://raw.githubusercontent.com/RodrigoMarquesP/Music_Generation_with_Recurrent_Neural_Networks-An_AI_Composer/master/files/stock_prices.png" width=650>
</p> 


<p align="center">
  <img src="https://raw.githubusercontent.com/RodrigoMarquesP/Music_Generation_with_Recurrent_Neural_Networks-An_AI_Composer/master/files/trajectory_prediction.gif" width=650>
</p>

Para resolver esses problemas, a RNN processa ao longo da sequência, item por item, prevendo o próximo elemento, como a próxima letra, palavra, nota musical (na geração de músicas), número, vetor, etc. Com esta técnica, a retropropagação (backpropagation) é computada ao longo do tempo, fazendo com que o estado interno da rede (*h*) seja importante para a previsão (*y*).

Abaixo está uma comparação entre o modelo de rede neural padrão e uma arquitetura específica de RNN para servir de exemplo - esta será usada na geração de músicas pois seu output é uma sequência de qualquer comprimento desejado, entretanto existem diversas outras arquiteturas.


<p align="center">
  <img src="https://raw.githubusercontent.com/RodrigoMarquesP/Music_Generation_with_Recurrent_Neural_Networks-An_AI_Composer/master/files/rnn.jpg" width=700>
</p>

Então, após treinar esta arquitetura com sequências de notas musicais, podemos prever notas seguintes que nunca existiram nas músicas de treino, gerando uma melodia totalmente nova, uma composição genuína de um modelo de deep learning.



# ***Criação do modelo***

Antes mesmo de começar o projeto, vá até a opção de *Ambiente de execução* na barra de opções e altere o ambiente de execução para GPU, o que acelerará as operações computacionais.


## ***Carregando os pacotes:***

In [None]:
# Especificando a versão do tensorflow
%tensorflow_version 2.x

# importando os pacotes
import tensorflow as tf 
import numpy as np
import matplotlib.pyplot as plt
import os
import time
import functools
from IPython import display as ipythondisplay
# O módulo tqdm mostra uma barra de progresso no nosso loop de treino
from tqdm import tqdm

# Para o conjunto de dados de treinamento usaremos o dataset de músicas folclóricas irlandesas do MIT 6.S191, sob licença:

# Copyright 2020 MIT 6.S191 Introduction to Deep Learning. All Rights Reserved.
# 
# Licensed under the MIT License. You may not use this file except in compliance
# with the License. Use and/or modification of this code outside of 6.S191 must
# reference:
#
# © MIT 6.S191: Introduction to Deep Learning
# http://introtodeeplearning.com

!pip install mitdeeplearning -q
import mitdeeplearning as mdl

# Para converter a notação musical ABC em arquivos de som usaremos:
!apt-get install abcmidi timidity > /dev/null 2>&1

## ***Carregando e explorando o dataset da biblioteca do MIT***

O MIT 6.S191 disponibiliza um dataset com quase mil músicas folclóricas irlandesas em notação ABC para treinar nosso modelo. Vamos dar uma olhada:


<p align="center">
<img src="https://raw.githubusercontent.com/RodrigoMarquesP/Music_Generation_with_Recurrent_Neural_Networks-An_AI_Composer/master/files/irish_folk.gif">
</p>

In [None]:
# Podemos carregar os dados e executar as músicas diretamente pelo pacote mdl
musicas = mdl.lab1.load_training_data()
print(len(musicas))
mdl.lab1.play_song(musicas[1])

Vamos olhar a canção mais de perto.

In [None]:
print(musicas[1])

Para tornar o dataset um único vetor de dados, podemos unir todos os sons como um objeto de texto. 

In [None]:
todas_musicas = "\n\n".join(musicas)

# ***Processamento dos dados***

O processo de treinamento e previsão da RNN exige uma sequência de dados, para que então a rede preveja novos elementos no fim da sequência. Baseado nisso, precisamos transformar os dados em um vetor e ter um meio prático de mapear os caracteres em números e vice-versa (redes neurais treinam e operam apenas com números). Para tal, podemos observar os caracteres exclusivos.

In [None]:
vocabulario = sorted(set(todas_musicas))
id2char = np.array(vocabulario)
char2id = {char:id for id, char in enumerate(vocabulario)}

print(f"{len(vocabulario)} caracteres exclusivos listados abaixo:")
for key, item in char2id.items():
  print(f"({repr(key)}:{item})", end="   ")

Agora podemos simplesmente mapear o vetor de caracteres para um vetor numérico.

In [None]:
todas_musicas_num = np.array(list(map(lambda char: char2id[char], todas_musicas)))
print(todas_musicas_num[:20])

Feito isso, precisamos nos lembrar de que a rede terá 'n' inputs, o que significa que o modelo processa 'n' caracteres cada vez, uma sequência. Portanto, precisamos agrupar os dados de treino em partes de comprimento 'n', onde o input será amostrado aleatoriamente enquanto o output será esta mesma sequência, porém deslocado um caractere para a direita. Vejamos um exemplo:

*   Se o comprimento desejado for 5:  

        'qwerty' -> input='qwert', output='werty'

Baseado nisto, podemos criar uma função extratora de lotes (batches), a qual recebe um tamanho de input e um tamanho de lote (batch) para retornar um conjunto de amostras aleatoriamente extraídas.

In [None]:
def extrai_batch(dataset, tamanho_input, tamanho_batch):
  # o ultimo indice valido nos dados
  n = dataset.shape[0] - 1
  # indices extraidos aleatoriamente
  iea = np.random.choice(n-tamanho_input, tamanho_batch)

  # para cada indice, extraimos o input e o output
  input_batch = np.array([dataset[i:i+tamanho_input] for i in iea])
  output_batch = np.array([dataset[i+1:i+1+tamanho_input] for i in iea])

  # para garantir o formato dos batches
  x_batch = np.reshape(input_batch, [tamanho_batch, tamanho_input])
  y_batch = np.reshape(output_batch, [tamanho_batch, tamanho_input])

  return x_batch, y_batch

# ***O modelo propriamente dito***

Com os dados apropriados e a função de extração de lotes, estamos prontos para criar nosso modelo. Nesse projeto em específico, a rede será composta por uma camada de incorporação (embedding layer), uma camada LSTM, e uma camada densa:


*   **tf.keras.layers.Embedding:** A camada de incorporação (embedding layer) consiste em mapear os inputs (até então sendo cada caractere representado por um número inteiro) para um vetor numérico de valores reais, similar ao método [one-hot encoding](https://towardsdatascience.com/categorical-encoding-using-label-encoding-and-one-hot-encoder-911ef77fb5bd), porém, garantindo que entradas similares sejam representadas por vetores similares, agregando a "semântica" das notas ao entendimento da rede. Isto é feito através do processo de aprendizado dos pesos da camada, isto é, a camada atua como uma tabela de pesquisa treinável.
*   **tf.keras.layers.LSTM:**  LSTM - Long Short Term Memory - é uma variação de rede neural recorrente a qual lida muito bem com assimilar as relações entre elementos em ordem e distância diferentes na sequência de dados.
*   **tf.keras.layers.Dense:** A camada densa compactará os outputs da camada LSTM no comprimento do nosso vocabulário, sendo as saídas desta, a probabilidade log não normalizada para cada categoria, ou seja, quanto maior o número da saída, mais a rede "pensa" que o caractere correspondente deveria ser o próximo na sequência - a previsão em si. A ativação softmax não será necessária para converter os valores em sua probabilidade normalizada, uma vez que a função utilizada mais tarde para extrair os caracteres dos outputs lida com os valores crus retirados da rede.

Por simplicidade e melhor entendimento, projetaremos o modelo utilizando **tf.keras.Sequential**:

In [None]:
def cria_modelo(tamanho_vocab: int, embedding_dim: int, nos_rnn: int, tamanho_batch: int) -> tf.keras.Sequential:
  """
  Retorna o modelo de tres camadas.

  Inputs:
    tamanho_vocab: A dimensao do vocabulario, esto é, quantos caracteres estao sendo considerados.

    embedding_dim: A dimensão da camada de incorporacao.

    nos_rnn: A dimensao da camada LSTM.

    tamanho_batch: O tamanho de cada lote (batch).
  
  Returno:
    modelo: um modelo de tres camadas para problemas sequenciais.
  """

  modelo = tf.keras.Sequential([
    tf.keras.layers.Embedding(input_dim=tamanho_vocab, output_dim=embedding_dim, batch_input_shape=[tamanho_batch, None]),
    tf.keras.layers.LSTM(units=nos_rnn, return_sequences=True, recurrent_initializer='glorot_uniform', recurrent_activation='sigmoid', stateful=True),
    tf.keras.layers.Dense(units=tamanho_vocab)
    ])

  return modelo

Podemos então instanciar o modelo e observar sua estrutura.

In [None]:
modelo = cria_modelo(len(vocabulario), embedding_dim=256, nos_rnn=1024, tamanho_batch=32)
modelo.summary()

Podemos passar uma sequência qualquer pela rede, antes mesmo de treiná-la, apenas para observar as dimensionalidades.

In [None]:
# Gera um batch de 32 sequencias de 100 caracteres cada
x, y = extrai_batch(todas_musicas_num, tamanho_input=100, tamanho_batch=32)

# alimenta a rede com os dados (feedforward)
predicao_sem_treino = modelo(x)

# Agora podemos inspecionar o formato do input (x) e do output (predicao_sem_treino)
print("Estamos usando um batch de 32 sequencias de 100 caracteres cada:")
print(f"Forma do Input: {x.shape}")
print(f"Forma do Output: {predicao_sem_treino.shape}")

Portanto, para cada input, existem 83 outputs: uma distribuição de probabilidade log não normalizada sob os possíveis 83 caracteres, onde o maior número pode ser tomado como a previsão da rede. Esta abordagem simples (pegar o argmax) pode levar a um loop, logo é mais adequado amostrar da distribuição:

In [None]:
previsao_amostrada = tf.random.categorical(predicao_sem_treino[0], num_samples=1)  # Retorna um tensor com formato (tamanho_input, 1)
previsao_amostrada = tf.squeeze(previsao_amostrada)  # então, podemos comprimi-lo em um vetor simples
print(previsao_amostrada)

Podemos usar nosso decodificador 'id2char' para gerar os caracteres de fato:

In [None]:
print(repr("".join(id2char[previsao_amostrada])))

Como os pesos da rede são aleatoriamente inicializados, o output não faz muito sentido, mas já podemos atestar que toda a estrutura do problema está definida e funcionando, então vamos partir para o treinamento.

# ***Faça a rede inteligente: treine-a***

O processo de treinamento requer uma função de perda (loss function) para informar ao modelo "o quanto ele está errando". Para esta tarefa, podemos usar a função *sparse_categorical_crossentropy*, que lida com alvos inteiros (categorias, de 0 a 82).

Vamos construir a função de perda para posterior otimização com seu uso:



*   ***Labels:*** Classes verdadeiras referentes ao input atual.
*   ***Logits:*** Distribuição de probabilidade log não normalizada nas classes referentes ao input atual.

In [None]:
def calcula_perda(labels, logits):
  loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=labels, y_pred=logits, from_logits=True)
  return loss

A função de perda retorna um vetor de perdas, do qual a média significa o custo total. Podemos obter esse custo em nosso modelo ainda sem treinamento.

In [None]:
custo_total_sem_treino = calcula_perda(y, predicao_sem_treino).numpy().mean()
print(f"Custo total do batch no modelo ainda sem treinamento: {custo_total_sem_treino}")

### ***Hiperparâmetros (Hyperparameters)***

Além da função de perda, existem muitos hiperparâmetros para serem especificados. Abaixo escolhemos alguns valores coerentes, mas sinta-se livre para tentar outros (de fato, para realmente entender sua influência, você deveria testar vários valores diferentes).

&rarr; O processo de otimização requer um critério de parada, que pode ser algo automático (uma perda mínima para a parada do processo), mas em nosso caso, definiremos o número de iterações.

In [None]:
iteracoes_treino = 2000  # aumente para obter um modelo mais refinado

&rarr; A função de extração de lotes requer o tamanho do lote e do input. Quanto maiores estes parâmetros, maior será o custo computacional.

In [None]:
tamanho_batch = 4
tamanho_input = 100

&rarr; A taxa de aprendizado é um parâmetro crítico:

*   Caso seja muito pequena, a otimização pode ficar presa num ótimo local ruim, levando a resultados do modelo ruins.
*   Caso seja muito grande, pode não atingir o ponto de ótimo, ou pode até divergir o processo de otimização.



In [None]:
taxa_aprendizado = 5e-3

&rarr; Usaremos as mesmas dimensões das camadas usadas anteriormente. Esses valores são arbitrários e escolhidos empiricamente: lembre-se de que a arquitetura do modelo não possui regra fixa alguma. Portanto, modelamos pela experiência (sempre respeitando os limites computacionais).

In [None]:
tamanho_vocab = len(vocabulario)
embedding_dim = 256 
nos_rnn = 1024

# instantiate the model
modelo = cria_modelo(tamanho_vocab=tamanho_vocab, 
                  embedding_dim=embedding_dim, 
                  nos_rnn=nos_rnn, 
                  tamanho_batch=tamanho_batch)

Finalmente, podemos definir nosso otimizador, o algoritmo que irá aplicar os gradientes na otimização dos pesos da rede. Alguns otimizadores bons e amplamente utilizados são o [Adam](https://www.tensorflow.org/api_docs/python/tf/keras/optimizers/Adam) e o [Adagrad](https://www.tensorflow.org/api_docs/python/tf/keras/optimizers/Adagrad), então vá em frente e tente ambos ou até mesmo os demais disponíveis (para a lista completa visite o [site do TensorFlow](https://www.tensorflow.org/api_docs/python/tf/keras/optimizers/)).

In [None]:
otimizador = tf.keras.optimizers.Adam(taxa_aprendizado)
# otimizador = tf.keras.optimizers.Adagrad(taxa_aprendizado)

# ***A rotina de treinamento***

O processo de treinamento será baseado em [GradientTape](https://www.tensorflow.org/api_docs/python/tf/GradientTape), um método de gravação dos gradientes: ao registrar todas as operações de avanço na rede (feedforward - computações que partem do input ao output), podemos calcular os gradientes com a perda na iteração e os pesos atuais. Com este último passo, podemos começar a treinar nossa rede.

**Obs**: Podemos acessar os pesos treináveis da rede através de `modelo.trainable_variables`.

In [None]:
@tf.function
def itera_treino(x, y): 
  with tf.GradientTape() as tape:
    y_pred = modelo(x)
    loss = calcula_perda(y, y_pred)

  gradientes = tape.gradient(loss, modelo.trainable_variables)
  otimizador.apply_gradients(zip(gradientes, modelo.trainable_variables))

  return loss

# ***Treinando o modelo***

Após preparar o campo, o processo de treinamento é simples:

1.   Extraia um lote (batch);
2.   Calcule a perda (dentro deste passo já está a atualização dos pesos);
3.   Adicione a informação da perda à lista de histórico para que a convergência seja posteriormente analisada;

In [None]:
historico = []

# Garantindo que o modulo tqdm funcionará adequadamente no notebook
if hasattr(tqdm, '_instances'): tqdm._instances.clear() # limpar as instancias caso existam

# Loop de treinamento
for iter in tqdm(range(iteracoes_treino)):
  x_batch, y_batch = extrai_batch(todas_musicas_num, tamanho_input, tamanho_batch)
  loss = itera_treino(x_batch, y_batch)
  historico.append(loss.numpy().mean())

Podemos mostrar graficamente o processo de convergência simplesmente plotando o vetor de histórico gerado:

In [None]:
# Criando a média móvel do custo total
passo = 20
media_movel = np.array([np.array(historico[i-passo:i]).mean() for i in range(passo, len(historico))])
mm_x = [i for i in range(passo, len(historico))]
string = f"O custo alcança aproximadamente {media_movel[-1]:.2f}"


fig, ax = plt.subplots(figsize=(10, 7))
ax.plot(historico, 'b-', lw=1)
ax.plot(mm_x, media_movel, 'r-', lw=2)
ax.set_xlabel('Iterações')
ax.set_ylabel('Custo total')
ax.text(.40, .7, string, transform=ax.transAxes, bbox=dict(facecolor='white', alpha=0), fontsize=15)
plt.show();

# ***Preparação do modelo***

Como a camada de incorporação (embedding layer) é construída com a informação do tamanho de lote, precisamos reconstruir o modelo com um novo `tamanho_lote` de um, devido a utilizarmos apenas um único input para iniciarmos as previsões. Ao reconstruir a rede, todos os parâmetros são reinicializados, e para não perdermos nosso progresso, podemos salvar os pesos para os carregarmos no modelo novo, o qual estará pronto para produção.

In [None]:
# salvando os pesos
prefixo_checkpoint = os.path.join('./training_checkpoints', "my_ckpt")
modelo.save_weights(prefixo_checkpoint)

# reconstrua o modelo
modelo = cria_modelo(tamanho_vocab=tamanho_vocab, 
                    embedding_dim=embedding_dim, 
                    nos_rnn=nos_rnn, 
                    tamanho_batch=1)

# carrega os pesos
modelo.load_weights(tf.train.latest_checkpoint('./training_checkpoints'))
modelo.build(tf.TensorShape([1, None]))

Vejamos se mantivemos nosso modelo.

In [None]:
modelo.summary()

# ***Previsão***

Nosso modelo está pronto para produção, logo podemos submeter um input inicial e extrair quantas previsões desejarmos com a mesma metodologia: amostrar sob a distribuição de probabilidade das categorias do output. Note que a primeira predição será o input para a seguna predição e assim por diante. O loop completo será:

1.   Escolha uma string para começar.
2.   Submeta ao modelo e obtenha sua predição.
3.   Guarde sua predição.
4.   Use a última predição como input.

O estado interno da rede é atualizado todo loop, construindo peça a peça uma música nova.

In [None]:
def gera_musicas(modelo, string_inicial, comprimento_predicao):
  input = [char2id[s] for s in string_inicial]
  input = tf.expand_dims(input, 0)
  novos_sons = []

  # Reinicia o estado interno do modelo e prepara o módulo tqdm
  modelo.reset_states()
  tqdm._instances.clear()

  for i in tqdm(range(comprimento_predicao)):

    # Extraia as previsoes
    predicoes = modelo(input)
    predicoes = tf.squeeze(predicoes, 0)
    categoria_previsao = tf.random.categorical(predicoes, num_samples=1)[-1,0].numpy()
      
    # Atualize o input atual
    input = tf.expand_dims([categoria_previsao], 0)
      
    # Adicione o novo caractere
    novos_sons.append(id2char[categoria_previsao])
    
  return (string_inicial + ''.join(novos_sons))

Nossa rede artista está pronta, deixe-a compor:

<p align="center">
<img src="https://raw.githubusercontent.com/RodrigoMarquesP/Music_Generation_with_Recurrent_Neural_Networks-An_AI_Composer/master/files/musician.gif">
</p>

In [None]:
texto_gerado = gera_musicas(modelo=modelo, string_inicial="X", comprimento_predicao=1000)

Agora precisamos olhar para padrões que indiquem uma música no texto da saída, o qual pode ser facilmente feito com [expressões regulares](https://docs.python.org/3/library/re.html). Isso é o que a função `mdl.lab1.extract_song_snippet(texto)` faz internamente.

Após a extração, podemos tocar, apreciar e até baixar as músicas:

<p align="center">
<img src="https://raw.githubusercontent.com/RodrigoMarquesP/Music_Generation_with_Recurrent_Neural_Networks-An_AI_Composer/master/files/party_time.gif">
</p>

In [None]:
sons_gerados = mdl.lab1.extract_song_snippet(texto_gerado)

for i, som in enumerate(sons_gerados): 
  waveform = mdl.lab1.play_song(som)

  # Caso o som seja reconhecido, pode ser tocado
  if waveform:
    print("Som ", i)
    ipythondisplay.display(waveform)