# Recursive learning of patterns via RNN

After we have learned to predict the next note via a neural network, it still lacks the capability to

* understand and predict 
* has no sense of time
* is not aware of velocity/amplitude of a note



In [None]:
import lightning as L
import numpy as np
import torch
from ki_ueben.datasets import Maestro3Dataset
from ki_ueben.midi import PianoRoll
from torch import nn, optim

Start by loading the dataset.

In [None]:
dataset = Maestro3Dataset()

In [None]:
dataset_record = dataset[10]
dataset_record

{'file_path': PosixPath('/Users/scheiba/github/ki-ueben-klavier-trainieren/book/chapters/data/maestro-v3.0.0/maestro-v3.0.0/2013/ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_11_R1_2013_wav--1.midi')}

Extract the events within a MIDI file through the `PianoRoll` class.

In [None]:
piano_roll = PianoRoll(dataset_record["file_path"])
piano_roll

PianoRoll(/Users/scheiba/github/ki-ueben-klavier-trainieren/book/chapters/data/maestro-v3.0.0/maestro-v3.0.0/2013/ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_11_R1_2013_wav--1.midi)

In [None]:
df_events = piano_roll.events()
df_events

Unnamed: 0,note,velocity,time
0,-1,0,0.000000
1,51,49,0.980469
2,63,57,0.997396
3,51,0,1.087240
4,50,64,1.093750
...,...,...,...
7301,-1,69,389.126302
7302,-1,64,389.145833
7303,-1,55,389.164062
7304,-1,37,389.184896


In [None]:
from typing import Callable

import pandas as pd


class MidiVectorTransform:
    """Transforms multiple MIDI events stored within a DataFrame,
    happening at the same time into a vector.
    The DataFrame needs to have the columns

    * velocity
    * note
    * time_delta (delta from previous event) - it is not necessary to quantize this
    """

    def __init__(self, velocity_steps: int = 32, time_steps: int = 125, dtype=np.uint8):
        self.dtype = dtype

        self.velocity_steps = velocity_steps
        self.velocity_mask = np.eye(self.velocity_steps, dtype=self.dtype)

        self.time_steps = time_steps
        self.time_mask = np.eye(self.time_steps, dtype=self.dtype)

        self.num_midi = 128
        self.midi_mask = np.eye(self.num_midi, dtype=self.dtype)

    @property
    def transform_size(self) -> int:
        return (2 * self.num_midi) + self.velocity_steps + self.time_steps

    def _events(self, df: pd.DataFrame, selector: Callable[[int], bool]) -> np.ndarray:
        events = np.zeros(self.num_midi, dtype=self.dtype)
        for _, event in df[df["velocity"].apply(selector)].iterrows():
            events = events + self.midi_mask[int(event.note)]
        return events

    def _events_on(self, df: pd.DataFrame) -> np.ndarray:
        return self._events(df, selector=lambda x: x > 0)

    def _events_off(self, df: pd.DataFrame) -> np.ndarray:
        return self._events(df, selector=lambda x: x == 0)

    def _velocity(self, df: pd.DataFrame) -> np.ndarray:
        # todo: should this ignore 0 velocity at all?
        velocity = np.zeros(self.velocity_steps, dtype=self.dtype)

        mean_velocity = df[df["velocity"] > 0].velocity.mean()
        if not pd.isna(mean_velocity):
            quantized_velocity = int(
                np.floor(mean_velocity / 127 * self.velocity_steps)
            )
            velocity = self.velocity_mask[quantized_velocity]

        return velocity

    def _time(self, df: pd.DataFrame) -> np.ndarray:
        time = np.zeros(self.time_steps, dtype=self.dtype)
        if len(df) > 0:
            # clip between 0 and 1
            time_diff = np.clip(df.iloc[0].time_delta, 0.0, 1.0)
            time = self.time_mask[
                int(
                    np.clip(
                        np.floor(time_diff * self.time_steps), 0, self.time_steps - 1
                    )
                )
            ]
        return time

    def un_transform(self, sample):
        raise NotImplementedError()

    def __call__(self, df: pd.DataFrame) -> np.ndarray:
        # filter out empty messages and pedal messages
        df = df[df["note"] != -1]
        return np.concatenate(
            [
                self._events_on(df),
                self._events_off(df),
                self._velocity(df),
                self._time(df),
            ],
            dtype=self.dtype,
        )


transformer = MidiVectorTransform()

In [None]:
df

Unnamed: 0,note,velocity,time,time_delta,quantized_time
0,-1,0,0.000000,0.980469,0.000
1,51,49,0.980469,0.016927,0.976
2,63,57,0.997396,0.089844,0.992
3,51,0,1.087240,0.006510,1.080
4,50,64,1.093750,0.071615,1.088
...,...,...,...,...,...
7301,-1,69,389.126302,0.019531,389.120
7302,-1,64,389.145833,0.018229,389.144
7303,-1,55,389.164062,0.020833,389.160
7304,-1,37,389.184896,0.019531,389.184


In [None]:
def prepare_df(df: pd.DataFrame, num_time_steps: int = 125) -> pd.DataFrame:
    df["time_delta"] = df["time"].diff(-1).abs().fillna(1.0)
    df["quantized_time"] = np.floor(df["time"] * num_time_steps) / num_time_steps

    # remove pedal
    df = df[df["note"] > 0]

    return df


def df_quantized_iterator(df: pd.DataFrame):
    for group_df in df.groupby("quantized_time"):
        yield group_df

In [None]:
from typing import Dict

from tqdm import tqdm

vectors: Dict[str, np.array] = {}

for midi_path in tqdm(dataset):
    vector = []

    piano_roll = PianoRoll(dataset_record["file_path"])
    df = piano_roll.events()

    for _, x in df_quantized_iterator(prepare_df(df)):
        vector.append(transformer(x))

    vectors[midi_path["file_path"].name] = np.array(vector)
    break

  0%|          | 0/1276 [00:07<?, ?it/s]


In [None]:
vector = vectors["ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_2013_wav--1.midi"]

In [None]:
vector.shape

(3667, 413)

In [None]:
piano_roll

PianoRoll(/Users/scheiba/github/ki-ueben-klavier-trainieren/book/chapters/data/maestro-v3.0.0/maestro-v3.0.0/2013/ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_11_R1_2013_wav--1.midi)

In [None]:
vector[-1:, :].shape

(1, 413)

In [None]:
class LSTM(L.LightningModule):
    def __init__(
        self,
        hidden_size: int = 512,
        midi_size: int = 128,
        velocity_steps: int = 32,
        time_steps: int = 125,
    ):
        super().__init__()

        self.velocity_steps = velocity_steps
        self.time_steps = time_steps
        self.midi_size = midi_size

        self.input_dim = (2 * self.midi_size) + self.velocity_steps + self.time_steps
        self.hidden_size = hidden_size

        self.lstm = nn.LSTM(
            input_size=self.input_dim,
            hidden_size=hidden_size,
            num_layers=3,
            dropout=0.3,
            batch_first=True,
        )
        self.linear = nn.Linear(
            in_features=self.hidden_size,
            out_features=self.input_dim,
        )

    def forward(self, batch):
        # !!! batch needs to be!!!
        # * midi_in
        # * midi_out
        # * velocity
        # * time

        lstm_out, (hn, cn) = self.lstm(batch)
        # only select the last output of the lstm
        # we need to unsqueeze because selecting a single element
        # will delete the dimension as well
        lstm_out = lstm_out[:, -1:, :]
        return self.linear(lstm_out)

    def note_on(self, t: torch.Tensor) -> torch.Tensor:
        return t[:, :, 0 : self.midi_size]

    def note_off(self, t: torch.Tensor) -> torch.Tensor:
        return t[:, :, self.midi_size : self.midi_size * 2]

    def velocity(self, t: torch.Tensor) -> torch.Tensor:
        return t[:, :, self.midi_size * 2 : (self.midi_size * 2) + self.velocity_steps]

    def time(self, t: torch.Tensor) -> torch.Tensor:
        # @todo this is wrong!
        return t[:, :, -self.time_steps :]

    def training_step(self, batch, batch_idx):
        x, y = batch
        out = self.forward(x)

        # losses = torch.concatenate([
        #     nn.functional.mse_loss(self.note_on(y), self.note_on(out)),
        #     # nn.CrossEntropyLoss()(self.velocity(x), self.velocity(out))
        # ])

        note_on_loss = nn.functional.mse_loss(self.note_on(out), self.note_on(y))
        # have different losses for each section
        time_loss = nn.functional.cross_entropy(self.time(out), self.time(y))

        self.log("train_note_on_loss", note_on_loss)
        self.log("train_time_loss", time_loss)

        return note_on_loss + time_loss

    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=1e-3)


lstm = LSTM()

print(lstm)

LSTM(
  (lstm): LSTM(413, 512, num_layers=3, batch_first=True, dropout=0.3)
  (linear): Linear(in_features=512, out_features=413, bias=True)
)


In [None]:
lstm.note_on(out).shape

torch.Size([1, 1, 128])

In [None]:
from typing import Iterator

import torch.utils.data


class TimeSeriesDataset(torch.utils.data.IterableDataset):
    def __init__(self, vectors: np.ndarray, num_pre: int, num_post: int = 1):
        self.vectors = vectors
        self.num_pre = num_pre
        self.num_post = num_post

    def __iter__(self) -> Iterator:
        for offset in np.arange(
            0, self.vectors.shape[0] + self.num_post + self.num_pre
        ):
            yield self.vectors[offset : offset + self.num_pre], self.vectors[
                offset + self.num_pre : offset + self.num_pre + self.num_post
            ]


dataset = TimeSeriesDataset(
    vectors=torch.Tensor(vector.astype(np.float32)), num_pre=100, num_post=1
)

In [None]:
dataloader = torch.utils.data.DataLoader(dataset=dataset, batch_size=28)

In [None]:
trainer = L.Trainer(limit_train_batches=100, max_epochs=2, log_every_n_steps=1)
trainer.fit(model=lstm, train_dataloaders=dataloader)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name   | Type   | Params
----------------------------------
0 | lstm   | LSTM   | 6.1 M 
1 | linear | Linear | 211 K 
----------------------------------
6.3 M     Trainable params
0         Non-trainable params
6.3 M     Total params
25.251    Total estimated model params size (MB)
/Users/scheiba/github/ki-ueben-klavier-trainieren/venv/lib/python3.10/site-packages/lightning/pytorch/trainer/connectors/data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.


Training:   0%|          | 0/100 [00:00<?, ?it/s]

Epoch 1: 100%|██████████| 100/100 [00:47<00:00,  2.10it/s, v_num=42]

`Trainer.fit` stopped: `max_epochs=2` reached.


Epoch 1: 100%|██████████| 100/100 [00:47<00:00,  2.09it/s, v_num=42]


In [None]:
trainer.logged_metrics

{'train_note_on_loss': tensor(0.0041), 'train_time_loss': tensor(-0.)}

In [None]:
out = lstm.forward(torch.Tensor(vector[0:100].astype(np.float32)).unsqueeze(0))

In [None]:
out[0, 0, :]

tensor([ 3.8971e-03, -1.2172e-03, -2.4754e-03, -3.7213e-03, -2.7787e-03,
        -2.5138e-04,  1.7865e-03, -2.6892e-04,  1.8892e-03, -1.5990e-03,
         3.2224e-04,  9.8781e-04, -1.4894e-03, -2.0432e-03,  2.9413e-03,
        -1.7089e-03,  9.7648e-03, -1.2814e-03, -1.8693e-03, -3.8153e-03,
        -5.2325e-03,  2.3664e-03,  8.2530e-04, -1.2044e-03, -2.1420e-03,
        -6.7895e-04, -1.5671e-03,  2.7583e-03,  3.3898e-03,  5.4249e-03,
        -2.0362e-03,  3.2021e-03, -9.8711e-04,  2.5240e-03, -2.1509e-04,
        -1.2053e-04,  1.2406e-03,  3.5846e-04, -2.5151e-04, -5.0175e-03,
        -2.1750e-03, -5.8116e-03,  8.4303e-03,  1.2473e-02,  3.3065e-03,
         4.9784e-04,  3.2720e-03, -1.1198e-03,  5.0466e-03,  9.6616e-03,
         6.7661e-03,  3.2104e-02,  1.1356e-02,  1.5208e-02,  2.1413e-02,
        -1.3461e-03,  1.1828e-02, -4.4166e-03,  3.0715e-02,  1.0709e-02,
         2.2356e-02,  1.2477e-02,  1.4657e-02,  4.9933e-02,  9.8639e-03,
         5.0932e-03,  3.4576e-02,  2.8624e-02,  3.1

In [None]:
torch.argmax(lstm.velocity(out)[0, 0])

tensor(28)

In [None]:
lstm.note_on(out)[0, 0] > 0.04

tensor([False, False, False, False, False, False, False, False, False, False,
        False, False, False, False, False, False, False, False, False, False,
        False, False, False, False, False, False, False, False, False, False,
        False, False, False, False, False, False, False, False, False, False,
        False, False, False, False, False, False, False, False, False, False,
        False, False, False, False, False, False, False, False, False, False,
        False, False, False,  True, False, False, False, False, False, False,
        False, False, False, False, False, False, False, False, False, False,
        False, False, False, False, False, False, False, False, False, False,
        False, False, False, False, False, False, False, False, False, False,
        False, False, False, False, False, False, False, False, False, False,
        False, False, False, False, False, False, False, False, False, False,
        False, False, False, False, False, False, False, False])

In [None]:
lstm.velocity_steps

32

In [None]:
lstm.velocity(out[0, 0]).max()

tensor(0.0518, grad_fn=<MaxBackward1>)

In [None]:
np.argmax(lstm.time(out[0, 0]).detach().numpy())

269

In [None]:
lstm.time_steps

125

In [None]:
out.shape

torch.Size([1, 1, 413])

In [None]:
(out > 0.2).sum()

tensor(1)

In [None]:
vectors.shape

(6677, 413)

In [None]:
dataset = torch.utils.data.TensorDataset(
    torch.Tensor(vectors[:-1]), torch.Tensor(vectors[1:])
)

In [None]:
dataset[10][0].shape

torch.Size([413])

In [None]:
output = lstm.forward(
    torch.Tensor(vectors[10:20]).unsqueeze(dim=0),
)

In [None]:
vectors[10:20].shape

(10, 413)

In [None]:
output.shape

torch.Size([1, 333])

In [None]:
class LSTM(L.LightningModule):
    def __init__(
        self, mp 
        n_features=88,
        hidden_size=12,
        seq_len=12,
        batch_size=12,
        num_layers=12,
        dropout=0.2,
        output_size=88,
    ):
        super().__init__()

        self.n_features = n_features
        self.hidden_size = hidden_size
        self.seq_len = seq_len
        self.batch_size = batch_size
        self.num_layers = num_layers
        self.dropout = dropout
        self.output_size = output_size
    
        self.lstm = nn.LSTM(
            input_size=self.n_features,
            hidden_size=self.hidden_size,
            num_layers=self.num_layers,
            dropout=self.dropout,
            batch_first=True,
        )
        self.linear = nn.Linear(self.hidden_size, self.output_size)
    