<a href="https://colab.research.google.com/github/BZoennchen/musical-interrogation/blob/main/partIII/melody_rnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Die folgenden 6 Zellen sind für die Ausführung im Colab nötig.

In [None]:
#@title clone git repository
%%capture
!rm -rf musical-interrogation
!git clone https://github.com/BZoennchen/musical-interrogation.git

In [None]:
#@title move into directory
%%capture
import zipfile
import os
os.chdir('musical-interrogation/partIII')

In [None]:
#@title install dependencies to play sound
%%capture
print('installing fluidsynth...')
!apt-get install fluidsynth > /dev/null
!cp /usr/share/sounds/sf2/FluidR3_GM.sf2 ./font.sf2
print('done!')

In [None]:
#@title install dependencies to show score in music notation
%%capture
print('installing musescore3...')
!apt-get install musescore3 > /dev/null
print('done!')

In [None]:
#@title install python libs
%%capture
!pip install torch torchview music21 matplotlib fluidsynth midi2audio

In [None]:
#@title unzip dataset
%%capture
with zipfile.ZipFile('./../data/erk.zip', 'r') as zip_ref:
    zip_ref.extractall('./../deutschl/')

# Recurrent Neural Network (RNN) / LSTM

*RNNs* erlauben es uns auf Grundlage einer belieb langen endlichen Sequenz, das nächste Element der Sequenz vorherzusagen. Wie gut diese Vorhersage ist, ist eine andere Frage.

Anstatt lediglich direkte Übergänge zu berücksichtigen wollen wir Sequenzen der Länge ``sequence_len`` beim Training betrachten. Das bedeutet wir müssen bei der Datenvorbereitung beachten, dass wir derartige Sequenzen aus den Daten konstruieren. Zwar kann unser trainiertes RNN auch längere Sequenzen generieren, allerdings hat es derartige Sequenzen nie zuvor gesehen. Deshalb wird die Qualität abnehmen.

Desweiteren werden wir diesmal anstatt Noten, Events die alle die gleiche Dauer haben (einen Zeitschritt) betrachten.
Eine Note wird durch ein **X-NoteOn**-Event und der darauffolgenden **NoteHold**-Event repräsentiert. Z.B.

``65 _ _ _ 77 _ r _ _ ``

Würde bedeuten, dass das Stück mit der MIDI-Note 65 (**65-NoteOn**-Event) beginnt.
Die Note wird über 3 weitere Zeitschritte gehalten (also insgesamt 4), dann folgt ein **77-NoteOn**-Event, welches über einen weiteren Zeitschritt gehalten wird und schließlich folgt eine Pause von insgesamt 3 Zeitschritten.

Anstatt des ``NoteEncoder`` verwenden wir deshalb den ``GridEncoder``.

In [None]:
import sys
import os
sys.path.append("..") 

import matplotlib.pyplot as plt
from torchview import draw_graph

import music21 as m21
from datetime import datetime

import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

from preprocess import load_songs_in_kern, GridEncoder, StringToIntEncoder
from preprocess import TERM_SYMBOL, TIME_STEP
from dataset import ScoreDataset

from utils import score_to_wav
from IPython.display import Audio

torch.manual_seed(0);

# 1. Datenvorbereitung

Zunächst laden wir unsere (1700 **einstimmigen**) Musikstücke aus denen wir die Frequenzen berechnen wollen.

In [None]:
# this takes a while
scores = load_songs_in_kern('./../deutschl/erk')

Wir können uns eines der Musikstücke anhören oder auch anzeigen lassen.

In [None]:
Audio(score_to_wav(scores[0], 'score1.wav'))

In [None]:
scores[0].show()

Als nächstes verwandeln wir die Noten in gut leserliche Zeichenketten, wobei jedes Event durch genau eine Zeichenkette repräsentiert wird (siehe oben).

Dies übernimmt der ``GridEncoder``. Dieser transponiert die Musikstücke zusätzlich nach C-Dur.
Dieser filtert zugleich Musikstücke heraus, welche wir mit unserem ``time_step`` nicht abbilden können.
Z.B. wenn ``time_step = 1/8`` dann können wir keine ``1/16``-Noten oder auch ``1/8 + 1/16``-Noten abbilden. 

In [None]:
# this takes a while
time_step = 1/16
print(f'one timestep represents {time_step} beats')

encoder = GridEncoder(time_step)
enc_songs, invalid_song_indices = encoder.encode_songs(scores)

print(f'there are {len(enc_songs)} valid songs and {len(invalid_song_indices)} songs')

Beispielsweise lässt sich mit ``time_step = 1/16`` folgendes Musikstück nicht abbilden:

In [None]:
scores[invalid_song_indices[0]].show()

Wir können ein Musikstück in der codierten Form ausgeben:

In [None]:
' '.join(enc_songs[0])

In [None]:
print(f'longest melody: {max(len(m) for m in enc_songs)}')
print(f'shortest melody: {min(len(m) for m in enc_songs)}')

Da der Computer besser mit Zahlen umgehen kann bauen wir uns eine Abbildung von den jeweiligen Zeichenketten zu Zahlen $$\{0, 1, 2, \ldots, m-1\}$$ und umgekehrt. Dies übernimmt ``StringToIntEncoder``:

In [None]:
string_to_int = StringToIntEncoder(enc_songs)
print(f'number of unique symbols: {len(string_to_int)}')

In [None]:
encoded_symbol = string_to_int.encode(enc_songs[0][0])
print(f'midi-ptich {enc_songs[0][0]} is encoded to number {encoded_symbol}')
print(f'number {encoded_symbol} is decoded to midi-pitch {string_to_int.decode(encoded_symbol)}')

# 2. Konstruktion der Trainingsdaten

``ScoreDataset`` verwaltet unsere Daten und lässt uns, in Kombination mit einem ``DataLoader``, bequem Sequenzen (d.h. Teile eines Stücks) der Länge ``sequence_len`` (Zeitschritte) laden

In [None]:
sequence_len = 64 # this is a hyperparameter!
dataset = ScoreDataset(enc_songs=enc_songs, stoi_encoder=string_to_int, sequence_len=sequence_len)

``sequence_len * time_step`` ergibt die Anzahl der Beats die wir beim Lernen betrachten (ist im Fall einer 4/4 Signatur ``sequence_len * (time_step/0.25)``)

In [None]:
print(f'while training we are looking at {sequence_len * (time_step/0.25)} beats')

Wir teilen die Daten nun in Trainings-, Validierungs-, und Testdaten auf.

+ Trainingsdaten: Verwenden wir zum Training unseres Modells / Melodiegenerators
+ Validierungsdaten: Verwenden wir um unseren Lernerfolg während des Trainings zu vergleichen
+ Testdaten: Verwenden wir am Ende des Trainings

In [None]:
train_set, val_set, test_set = torch.utils.data.random_split(dataset, [0.8, 0.1, 0.1])

# 3. Modelldefinition

Im folgenden sehen wir alle wichtigen sog. ``Hyperparameter``, d.h. Parameter die wir eventuell noch anpassen wollen um ein besseres Ergebnis zu erhalten.

In [None]:
##### start hyperparameters #####
vocab_size = len(string_to_int) # size of our alphabet
input_dim = vocab_size # can be different
hidden_dim = 128 # can be different
layer_dim = 1 # can be different
output_dim = vocab_size # should not be different
dropout = 0.2 # can be different

criterion = torch.nn.CrossEntropyLoss()

learning_rate = 0.001 # can be different
batch_size = 64 # can be different
n_epochs = 10 # can be different
eval_interval = 100 # can be different

if torch.cuda.is_available():
    device = torch.device('cuda')
elif torch.backends.mps.is_available():
    device = torch.device('mps')
    #torch.backends.mps.empty_cache()
else:
    device = torch.device('cpu')

##### end hyperparameters #####

print(f'{device=}')

Es folgt die Modellbeschreibung unseres RNNs/LSTMs. 
Um zu verstehen was vorsich geht, blicken Sie auf die Methode ``forward``.
Diese schickt unsere Daten durch das Netz.

Die ersten beiden Zeilen erstellen die Kurz- und Langzeitspeicher und füllen diese mit lauter nullen.

Dann findet ein sog. Embedding statt: ``x = self.embedding(x)``. Dies ist nichts anderes als das was wir mit unserem einfachen *Feedforward Net* gemacht haben: Jedes Element der Eingabe ``x`` wird erst *one-hot* encoded und dann an eine Matrix multipliziert. Das Resultat: Jedes Event wird durch die Zeile einer Matrix repräsentiert. Die Matrix besitzt ``vocab_size`` Zeilen und ``input_dim`` Spalten.

Als nächstes schicken wir unsere umgewandelte Eingabe durch unsere LSTM ``out, (ht, ct) = self.lstm(x, (h0, c0))``.
Wir erhalten so viele Ausgaben wie unsere Sequenz lang ist, d.h. ``sequence_len`` viele.
Wir interessieren uns aber nur für die letzte Ausgabe, die wir uns mit ``out[:, -1, :]`` holen.
Dies ist ein Vektor mit ``hidden_dim`` Elementen. ``ht`` und ``ct`` brauchen wir nicht.

Dann schicken wir diese durch eine Dropout Schicht um der Überanpassung entgegenzuwirken.

Im letzten Schritt transformieren wir den ``hidden_dim``-dimensionalen Vektor in einen ``output_dim``-dimensionalen Vektor, was wirderum gleich ``vocab_size`` ist.

Dieser Vektor wird als Wahrscheinlichkeitsverteilung interpretiert.

In [None]:
class LSTMModel(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, dropout=0.2):
        super(LSTMModel, self).__init__()

        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim
        
        self.embedding = torch.nn.Embedding(vocab_size, input_dim)
        self.lstm = torch.nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True)
        self.dropout = torch.nn.Dropout(dropout)
        self.fc = torch.nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim, device=device)
        c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim, device=device)
        
        # x = B, T, C
        x = self.embedding(x)
        
        out, (ht, ct) = self.lstm(x, (h0, c0))
        out = self.dropout(out[:, -1, :])
        out = self.fc(out)
        return out # B, C

In [None]:
model = LSTMModel(input_dim, hidden_dim, layer_dim, output_dim, dropout)
model.to(device)  # use gpu if possible

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

for i in range(len(list(model.parameters()))):
    print(list(model.parameters())[i].shape)

Die folgende Zelle dient lediglich der Visualisierung unseres Modells und hat keine Auswirkung auf die Berechnung.
Wir holen uns einen **Batch** von Sequenzen (der Länge ``sequenz_len``)  und das jeweils zugehörige **Label** (das auf die Sequenz folgende Event).

``X_vis`` ist ein **Tensor** mit **Shape** 

$$\text{batch_size} \times \text{sequence_length}$$

geschrieben als ``(batch_size, sequence_length)``, hier ``(64, 32)``. ``X_vis`` jagen wir durch unser Netzwerk/Modell um es anschließend Anzeigen zu können.

Die Visualisierung ist nützlich um zu sehen wie der Eingabe-**Tensor** verändert wird.
Dabei sind **Batches** Anfangs etwas verwirrend.
Wir können uns auch die erste Dimension, d.h., ``batch_size`` "wegdenken".
Das ``Embedding`` transformiert unsere Sequenz in 

$$\text{sequence_length} \times \text{input_dim}$$

Hier findet impliziet ein *one-hot* Encoding statt in Kombination mit einer Matrixmultiplikation, d.h., genau das was wir in unserem **Feedforward Net** gemacht hatten.
Jedes Symbol wird durch einen Vektor der länge "Anzahl an unterschiedlicher Symbole" repräsentiert.

Anschließend wird der **Tensor** durch das eigentliche LSTM gejagt.
Neben diesem Tensor erwartet das LSTM noch einen Initialtensor für die **Hiddensates** $h_0$ und $c_0$.
Das Resultat ist ein neuer Output-Tensor:

$$\text{sequence_length} \times \text{hidden_dim}$$

und die beiden letzten **Hiddenstates** $h_{\text{k}-1}$, $c_{\text{k}-1}$ mit $k$ = ``sequence_length`` die jeweils ``hidden_dim`` Zahlen enthalten.

Da wir nur am letzten Output interessiert sind, d.h., genau das Symbol was uns das LSTM vorhersagt, holen wir uns mit ``__getitem__`` dieses Symbol codiert durch ``hidden_dim`` Zahlen.

``Dropout`` dient dem Entgegenwirken der **Überanpassung**. Dabei werden beim Training zufällig bestimmte künstliche Neuronen deaktiviert.

Man müsste denken, dass wir im letzten Schritt aus den ``hidden_dim`` Zahlen das entsprechende Symbol, also eine Zahl, erzeugen müssen. Was jedoch nützlicher ist, ist es eine Wahrscheinlichkeitsverteilung zu erhalten, d.h., einen Vektor mit ``vocab_size``Zahlen wobei jeder Zahl die Wahrscheinlichkeit für eben jenes Symbol widerspiegelt.
Genau das passiert!

In [None]:
# (batch_size, sequence_len)
X_vis, y_vis = train_set[0:batch_size]
print(f'shape of X_vis: {X_vis.shape}')
print(f'shape of y_vis: {y_vis.shape}')
print(f'number of different symbols {vocab_size}')
X_vis, y_vis = X_vis.to(device), y_vis.to(device)
model_vis = LSTMModel(input_dim, hidden_dim, layer_dim, output_dim, dropout)
model_graph = draw_graph(model_vis, input_data=X_vis, device=device)
model_graph.visual_graph

# 4. Melodiegenerierung vor dem Training

Gegeben einer Sequenz beliebiger Länge, dient die Funktion ``generate`` der Generierung eines neues neuen Musikstücks.
``temperature`` bestimmt wie stark die vom Modell gelernte Wahrscheinlichkeitsverteilung beachtet wird.

+ ``temperature`` gleich 1.0 bedeutet, dass von der Wahrscheinlichkeitsverteilung gesampelt wird.
+ ``temperature`` gegen unendlich bedeutet, dass gleichverteilt gesampelt wird (mehr Variation)
+ ``temperature`` gegen 0 bedeutet, dass die hohe Wahrscheinlichkeiten verstärkt werden (weniger Variation)

Sie können eine maximale Länge des Stücks festlegen und auch einen Anfang eines Stücks mitliefern.

In [None]:
def next_event_number(idx, temperature:float):
    with torch.no_grad():
        logits = model(idx)
        probs = F.softmax(logits / temperature, dim=1) # B, C
        idx_next = torch.multinomial(probs, num_samples=1)
        return idx_next

In [None]:
def generate(seq: list[str]=None, max_len:int=None, temperature:float=1.0):
    with torch.no_grad():
        generated_encoded_song = []
        if seq != None:
            idx = torch.tensor([[string_to_int.encode(char) for char in seq]], device=device)
            generated_encoded_song = seq.copy()
        else:
            idx = torch.tensor([[string_to_int.encode(TERM_SYMBOL)]], device=device)
        
        while max_len == None or max_len > len(generated_encoded_song):
            idx_next = next_event_number(idx, temperature)
            char = string_to_int.decode(idx_next.item())
            if idx_next == string_to_int.encode(TERM_SYMBOL):
                break
            idx = torch.cat((idx, idx_next), dim=1) # B, T+1, C
            generated_encoded_song.append(char)
            
        return generated_encoded_song

Wenn wir neue Musikstücke generieren bevor das RNN trainiert wurde, dann erhalten wir keine guten Ergebnisse:

In [None]:
# number of songs we want to generate
n_scores = 5
temperature = 0.6
before_new_songs = []
for _ in range(n_scores):
    encoded_song = generate(max_len=13,temperature=temperature)
    print(f'generated {" ".join(encoded_song)} conisting of {len(encoded_song)} notes')
    before_new_songs.append(encoded_song)

In [None]:
before_generated_scores = encoder.decode_songs(before_new_songs)

In [None]:
before_generated_scores[0].show()

In [None]:
Audio(score_to_wav(before_generated_scores[2], 'before_g_song.wav'))

# 5. Training

Zum Training verwenden wir hier einen sog. ``DataLoader``. Dieser hilft uns dabei auf unsere Daten einfacher zugreifen zu können. Z.B., lassen wir unsere Daten vor dem Training durchmischen.

In [None]:
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_set, batch_size=batch_size,shuffle=True)

In [None]:
print(f'there are {len(train_set)} data points for training')
print(f'there are {len(val_set)} data points for validation')
print(f'there are {len(test_set)} data points for testing')

Der Code für das Training wirkt etwas kompliziert, da wir **Batches** verwenden, da wir es mit sehr vielen Daten zu tun haben und diese nicht immer alle (pro Trainingsschritt) durchs Netz schicken sondern immer nur einen Teil, nämlich ``batch_size`` viele. Eine **Epoche** ist dadruch definiert, dass in ihr alle Trainingsdaten einmal durchs Netz geschickt wurden.

Im Wesentlichen geschieht nichts anderes als:

1. Schicke **Batch** durchs netz (Forwardpass)
2. Berechne Fehler/Kosten
3. Propagiere Gradienten der Kostenfunktion bzgl. der **Modellparameter** rückwärts durchs Netz (Backwardpass)
4. Update Modellparameter

In [None]:
def train_one_epoch(epoch_index, tb_writer, n_epochs):
    running_loss = 0.0
    last_loss = 0.0
    all_steps = n_epochs * len(train_loader)
    
    for i, data in enumerate(train_loader):
        local_X, local_y = data
        local_X, local_y = local_X.to(device), local_y.to(device)
        optimizer.zero_grad()
        outputs = model(local_X)
        
        loss = criterion(outputs, local_y)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        if i % eval_interval == eval_interval-1:
            last_loss = running_loss / eval_interval
            
            steps = epoch_index * len(train_loader) + (i+1)
            
            print(
                f'Epoch [{epoch_index+1}/{n_epochs}], Step [{steps}/{all_steps}], Loss: {last_loss:.4f}')
            tb_x = epoch_index * len(train_loader) + i + 1
            tb_writer.add_scalar('Loss/train', last_loss, tb_x)
            running_loss = 0.
            
    return last_loss


In [None]:
# Initializing in a separate cell so we can easily add more epochs to the same run
def train(n_epochs,respect_val=False):
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    writer = SummaryWriter('runs/fashion_trainer_{}'.format(timestamp))
    best_vloss = 1_000_000

    for epoch in range(n_epochs):    
        model.train(True)
        avg_loss = train_one_epoch(epoch, writer, n_epochs)
        
        model.train(False)
        running_vloss = 0.0
        
        for i, vdata in enumerate(val_loader):
            
            local_X, local_y = vdata
            local_X, local_y = local_X.to(device), local_y.to(device)
            
            voutputs = model(local_X)
            vloss = criterion(voutputs, local_y)
            running_vloss += vloss
            
        avg_vloss = running_vloss / (i+1)
        print(f'Epoch [{epoch+1}/{n_epochs}], Train-Loss: {avg_loss:.4f}, Val-Loss: {avg_vloss:.4f}')
        
        writer.add_scalars('Training vs. Validation Loss', {'Training': avg_loss, 'Validation': avg_vloss}, epoch)
        writer.flush()
        
        if not respect_val or (respect_val and avg_vloss < best_vloss):
            best_vloss = avg_vloss
            model_path = './models/_model_{}_{}'.format(timestamp, epoch)
            torch.save(model.state_dict(), model_path)

In [None]:
train(n_epochs)

In [None]:
print(f'there are the following models to choose from:')

for model_file in os.listdir('./models/'):
    print(f'./models/{model_file}')

Das beste Modell des Trainings findet ihr im Ordner ``musical-interrogation/rnn/`` und könnt ihr wie folgt laden:

In [None]:
# loads a saved model
model_path = './models/pretrained_1_128_best_val'

if device.type == 'cpu':
    model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
elif torch.backends.mps.is_available():
    model.load_state_dict(torch.load(model_path, map_location=torch.device('mps')))
else:
    model.load_state_dict(torch.load(model_path))
model.eval()



# 6. Melodiegenerierung nach dem Training

Nach dem Training sollten die generierten Stücke eine bessere Qualität besitzen. Das bedeutet jedoch lediglich, dass die *Likelihood* besser ist und sagt noch nichts darüber aus ob die Stücke auch musikalisch "besser" sind.

In [None]:
# number of songs we want to generate
n_scores = 5
temperature = 0.6
after_new_songs = []
for _ in range(n_scores):
    encoded_song = generate(max_len=120,temperature=temperature)
    print(f'generated {" ".join(encoded_song)} conisting of {len(encoded_song)} notes')
    after_new_songs.append(encoded_song)

In [None]:
after_generated_scores = encoder.decode_songs(after_new_songs)

In [None]:
after_generated_scores[2].show()

In [None]:
Audio(score_to_wav(after_generated_scores[0], 'a_g_song.wav'))

Wir können auch einen Teil bestehendes Musikstücks verwenden und diesen erweitern:

In [None]:
' '.join(enc_songs[0])

In [None]:
n_notes = 10
part = encoder.take_notes(enc_songs[0], n_notes)
' '.join(part)

In [None]:
Audio(score_to_wav(encoder.decode_song(part), 'part.wav'))

In [None]:
enc_song = generate(part, max_len=120)
' '.join(enc_song)

In [None]:
song = encoder.decode_song(enc_song)

In [None]:
song.show()

In [None]:
Audio(score_to_wav(song, 'g_song.wav'))

# Fragen

+ Welches Problem fangen wir uns ein wenn wir Noten mit einer anderen Dauer unterstützen wollen?
+ Könnten wir eventuell nicht unterstützte Notenlängen einfach umwandeln?
+ Wie erweitern wir unser System sodass wir Vielstimmigkeit lernen und generieren können und was würde das für das Training bedeuten?
+ Was bedeutet *Überanpassung* (Overfitting) im Kontext unseres Problems? Kann diese wünschenswert sein?