##Task 2 (NEW)

## Overview

We extended the functionality of the Piano Genie project by Google to generate music based on words typed with the full QWERTY keyboard. This allows us to also feed in and output velocity into the model, where the project did not do so before.

The original Github for the project can be found here: https://github.com/chrisdonahue/piano-genie

### Inspiration

Monkeytype is a popular typing speed website that some of our members frequent. We were curious to see if we could generate music dynamically based on what the user was typing
https://monkeytype.com/

We saw the Piano Genie demo in class, and saw an opportunity to extend it to explore this idea. The original demo can be found here:
https://www.i-am.ai/piano-genie.html


## Usage

Use our utility HTML script to open a window where you can type out a paragraph. The HTML script will encode your typing as a combination of keys clicked, timestamp, and your word per minute.


The utility script will output these stats to a `.csv` file that can be passed into the model to generate new music.

## QWERTY Input

We experimented with a few different ideas for how the QWERTY keyboard could augment what music is played.

### Velocity per row

We tested assigning each row of a keyboard to different velocities to be encoded during typing. For instance, the row with the keys "QWERTYUIOP" would be assigned a higher velocity value than the row with the keys "ZXCVBNM".

We kept 8 note bins (forcing keys past the 8th in the row counting left to right to represent the 8th bin) during our initial experiments.


## Notes

We left some of the original documentation in to assist with setting context and if the project reviewers are interested in what work was done before. All original documentation in pure markdown blocks have been surrounded by codeblocks, which can be removed for your viewing convenience. Additionally, inline markdown is most likely partially or fully from the original repository. We mark documentation written fully by us with the tag (NEW). Also credit to copilot for assistance with the project.

## Performance Encoding (NEW)

We created a new system in order to encode our user's musical performance on a QWERTY keyboard into a format interpretable by a machine learning model.

### User UI

<center><img src="https://raw.githubusercontent.com/AnniePhan02/CSE253-Assignment2/main/task2/images/NewInputSystem.png" width=600px/></center>

The new UI for adding text is a simple text input box, with the ability to click in and type text. The user can then download their performance as a CSV file using the `Download CSV` button.

### CSV File Format

The first row of the CSV file is a header, containing the following column names:
* Key: This column stores the key pressed by the user on the QWERTY keyboard.
* Seconds: This column stores the time elapsed in seconds since the very first key press. The time is recorded with three decimal places.
* WPM: This column stores the words per minute calculated at the moment the key was pressed. The WPM is recorded with one decimal place.

Each subsequent row after the first in the CSV file represents a single key press event. The values in these rows correspond to the columns defined in the header: the specific key pressed, the time of the key press, and the calculated words per minute at that moment.

For example, the row `I,3.496,10.3` indicates that the key "I" was pressed at 3.496 seconds after the initial key press, and at that time, the user's typing speed was 10.3 words per minute.

### Example CSV of Short Performance

In the below performance, the phrase `I am going to pass` was typed into the terminal.

```
Key,Seconds,WPM
Shift,0.000,Infinity
Shift,3.373,7.1
I,3.496,10.3
 ,3.622,13.3
w,3.697,16.2
Backspace,3.957,18.2
a,4.023,20.9
m,4.121,23.3
 ,4.229,25.5
g,4.298,27.9
o,4.399,30.0
i,4.538,31.7
n,4.591,34.0
g,4.665,36.0
 ,4.731,38.0
t,4.798,40.0
o,4.837,42.2
 ,4.923,43.9
p,5.040,45.2
a,5.102,47.0
s,5.246,48.0
s,5.367,49.2

```


### Full Script For User Input

```HTML
<!DOCTYPE html>
<html>
  <body>
    <textarea id="t" rows="10" cols="60" placeholder="Start typing…"></textarea>
    <br />
    <button id="download">Download CSV</button>

    <script>
      let startTime = null;
      let charCount = 0;
      // Header: Key, Seconds since first press, WPM at that moment
      const rows = [["Key", "Seconds", "WPM"]];

      const log = (key, secs, wpm) => {
        rows.push([key, secs.toFixed(3), wpm.toFixed(1)]);
      };

      document.getElementById("t").addEventListener("keydown", (e) => {
        if (startTime === null) {
          startTime = performance.now();
        }
        charCount++;
        const now = performance.now();
        const elapsedMs = now - startTime;
        const elapsedSecs = elapsedMs / 1000;
        const elapsedMins = elapsedMs / 60000;
        // WPM = (chars ÷ 5) ÷ elapsedMinutes
        const wpm = charCount / 5 / elapsedMins;
        log(e.key, elapsedSecs, wpm);
      });

      document.getElementById("download").addEventListener("click", () => {
        const csvContent = rows.map((r) => r.join(",")).join("\n");
        const blob = new Blob([csvContent], { type: "text/csv" });
        const url = URL.createObjectURL(blob);
        const a = document.createElement("a");
        a.href = url;
        a.download = "typing_wpm_timestamps.csv";
        document.body.appendChild(a);
        a.click();
        document.body.removeChild(a);
        URL.revokeObjectURL(url);
      });
    </script>
  </body>
</html>
```

## Initial tests:

During our initial tests on whether or not we could get the pipeline to run, we converted our performance into 8 bins using the following script:

In [None]:
# version 1
def letter_to_button_keyboard(letter):
    # Map letters on the keyboard to button indices, top row, middle row, bottom row
    top = "qwertyuiop"
    middle = "asdfghjkl"
    bottom = "zxcvbnm"
    if letter in top:
        return min(top.index(letter), 8), 40
    elif letter in middle:
        return min(middle.index(letter), 8), 80
    elif letter in bottom:
        return min(bottom.index(letter), 8), 120
    else:
        return 0, 0

After working through the missing dependencies and various setup quirks, we were able to generate this with our input script and new music:

@ STEVEN INSERT HERE

## Pipeline (NEW):

Below are our modifications to the original pipeline. The pipeline has been modified in order to use the following inputs and outputs:

*   Input: our new CSV file format representing what keys users have clicked while typing
*   Output: A MIDI file that converts general music "bins" (where the bins are the keys the users clicked) to proper notes. In other words, converts our user's keyboard input into a musical piece.

We have modified the pipeline to accept **26 bins**, and also output **velocity**.



<!-- # Music Co-creation Tutorial Part 1: Training a generative model of music
### [Chris Donahue](https://chrisdonahue.com), [Anna Huang](https://research.google/people/105787/), [Jon Gillick](https://www.jongillick.com/)

This is the first part of a two-part tutorial entitled [*Interactive music co-creation with PyTorch and TensorFlow.js*](https://github.com/chrisdonahue/music-cocreation-tutorial/), prepared as part of the ISMIR 2021 tutorial *Designing generative models for interactive co-creation*. This part of the tutorial will demonstrate how to **train a generative model of music in PyTorch**, and **port its weights to TensorFlow.js** format for interaction. The [final result is here](https://chrisdonahue.com/music-cocreation-tutorial)—see our [GitHub repo](https://github.com/chrisdonahue/music-cocreation-tutorial/) for part 2. -->


```
## Primer on Piano Genie

The generative model we will train is called [Piano Genie](https://magenta.tensorflow.org/pianogenie) (Donahue et al. 2019). Piano Genie is a system which maps amateur improvisations on a miniature 8-button keyboard ([video](https://www.youtube.com/watch?v=YRb0XAnUpIk), [demo](https://piano-genie.glitch.me)) into realistic performances on a full 88-key piano.

To achieve this, Piano Genie adopts an _autoencoder_ approach. First, an _encoder_ maps professional piano performances into this 8-button space. Then, a _decoder_ attempts to reconstruct the original piano performance from the 8-button version. The entire system is trained end-to-end to minimize the decoder's reconstruction error. At performance time, we replace the encoder with a user improvising on an 8-button controller, and use the pre-trained decoder to generate a corresponding piano performance.

<center><img src="https://raw.githubusercontent.com/chrisdonahue/music-cocreation-tutorial/main/part-1-py-training/figures/overview.png" width=600px/></center>

At a low-level, both the encoder and the decoder for Piano Genie are lightweight recurrent neural networks, which are suitable for real-time performance even on mobile CPUs. The discrete bottleneck is achieved using a technique called _integer-quantized autoencoding_ (IQAE), which was also proposed in the Piano Genie paper.
```

In [None]:
#@title **(Step 1)** Parse MIDI piano performances into simple lists of notes

# @markdown *Note*: Check this box to rebuild the dataset from scratch.
REBUILD_DATASET = False  # @param{type:"boolean"}

# @markdown To train Piano Genie, we will use a dataset of professional piano performances called [MAESTRO](https://magenta.tensorflow.org/datasets/maestro) (Hawthorne et al. 2019).
# @markdown Each performance in this dataset was captured by a Disklavier, a computerized piano which can record human performances in MIDI format, i.e., as timestamped sequences of notes.

PIANO_LOWEST_KEY_MIDI_PITCH = 21
PIANO_NUM_KEYS = 88

import gzip
import json
from collections import defaultdict

from tqdm.notebook import tqdm


def download_and_parse_maestro():
    # Install pretty_midi
    !!pip install pretty_midi
    import pretty_midi

    # Download MAESTRO dataset (Hawthorne+ 2018)
    !!wget -nc https://storage.googleapis.com/magentadata/datasets/maestro/v2.0.0/maestro-v2.0.0-midi.zip
    !!unzip maestro-v2.0.0-midi.zip

    # Parse MAESTRO dataset
    dataset = defaultdict(list)
    with open("maestro-v2.0.0/maestro-v2.0.0.json", "r") as f:
        for attrs in tqdm(json.load(f)):
            split = attrs["split"]
            midi = pretty_midi.PrettyMIDI("maestro-v2.0.0/" + attrs["midi_filename"])
            assert len(midi.instruments) == 1
            # @markdown Formally, a piano performance is a sequence of notes: $\mathbf{x} = (x_1, \ldots, x_N)$, where each $x_i = (t_i, d_i, k_i, v_i)$, signifying:
            notes = [
                (
                    # @markdown 1. (When the key was pressed) An _onset_ time $t_i \in \mathbb{T}$, where $\mathbb{T} = \{ t \in \mathbb{R} \mid 0 \leq t \leq T \}$
                    float(n.start),
                    # @markdown 2. (How long the key was held) A _duration_ $d_i \in \mathbb{R}_{>0}$
                    float(n.end) - float(n.start),
                    # @markdown 3. (Which key was pressed) A _key_ index $k_i \in \mathbb{K}$, where $\mathbb{K} = \{\text{A0}, \ldots, \text{C8}\}$ and $|\mathbb{K}| = 88$
                    int(n.pitch - PIANO_LOWEST_KEY_MIDI_PITCH),
                    # @markdown 4. (How hard the key was pressed) A _velocity_ $v_i \in \mathbb{V}$, where $\mathbb{V} = \{1, \ldots, 127\}$
                    int(n.velocity),
                )
                for n in midi.instruments[0].notes
            ]

            # This list is in sorted order of onset time, i.e., $t_{i-1} \leq t_i ~\forall~i \in \{2, \ldots, N\}$.
            notes = sorted(notes, key=lambda n: (n[0], n[2]))
            assert all(
                [
                    all(
                        [
                            # Start times should be non-negative
                            n[0] >= 0,
                            # Note durations should be strictly positive, i.e., $d_i > 0$
                            n[1] > 0,
                            # Key index should be in range of the piano
                            0 <= n[2] and n[2] < PIANO_NUM_KEYS,
                            # Velocity should be valid
                            1 <= n[3] and n[3] < 128,
                        ]
                    )
                    for n in notes
                ]
            )
            dataset[split].append(notes)

        return dataset


if REBUILD_DATASET:
    DATASET = download_and_parse_maestro()
    with gzip.open("maestro-v2.0.0-simple.json.gz", "w") as f:
        f.write(json.dumps(DATASET).encode("utf-8"))
else:
    !!wget -nc https://github.com/chrisdonahue/music-cocreation-tutorial/raw/main/part-1-py-training/data/maestro-v2.0.0-simple.json.gz
    with gzip.open("maestro-v2.0.0-simple.json.gz", "rb") as f:
        DATASET = json.load(f)

print([(s, len(DATASET[s])) for s in ["train", "validation", "test"]])

In [None]:
# Step 1: Inspect the data structure and content
print("Dataset splits and number of performances:")
for split, performances in DATASET.items():
    print(f"- {split}: {len(performances)} performances")

# Inspect the first performance in the training set
if DATASET['train']:
    first_performance = DATASET['train'][0]
    print("\nStructure of the first performance (first 5 notes):")
    print(first_performance[:5])
    print("\nData types of the first note:")
    if first_performance:
        first_note = first_performance[0]
        for i, value in enumerate(first_note):
            print(f"- Element {i}: {type(value)}")

In [None]:
# Install necessary libraries
!pip install pandas matplotlib seaborn numpy

In [None]:


import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Step 2: Analyze note properties

# Combine all notes from all splits into a single list for analysis
all_notes = []
for split, performances in DATASET.items():
    for performance in performances:
        all_notes.extend(performance)

# Create a pandas DataFrame for easier analysis
notes_df = pd.DataFrame(all_notes, columns=['onset_time', 'duration', 'key_index', 'velocity'])

# Distribution of key indices
plt.figure(figsize=(12, 6))
sns.histplot(notes_df['key_index'], bins=PIANO_NUM_KEYS, kde=False)
plt.title('Distribution of Piano Key Indices')
plt.xlabel('Key Index (0-87)')
plt.ylabel('Frequency')
plt.show()

# Distribution of velocities
plt.figure(figsize=(12, 6))
sns.histplot(notes_df['velocity'], bins=127, kde=False)
plt.title('Distribution of Velocities')
plt.xlabel('Velocity (1-127)')
plt.ylabel('Frequency')
plt.show()

# Distribution of durations
plt.figure(figsize=(12, 6))
sns.histplot(notes_df['duration'], bins=50, kde=True)
plt.title('Distribution of Note Durations')
plt.xlabel('Duration (seconds)')
plt.ylabel('Frequency')
plt.xlim(0, 5) # Limit x-axis for better visualization of common durations
plt.show()

# Distribution of inter-onset intervals
# Calculate inter-onset intervals for each performance
all_iois = []
for split, performances in DATASET.items():
    for performance in performances:
        onsets = [n[0] for n in performance]
        iois = np.diff(onsets)
        all_iois.extend(iois)

plt.figure(figsize=(12, 6))
sns.histplot(all_iois, bins=50, kde=True)
plt.title('Distribution of Inter-Onset Intervals')
plt.xlabel('Inter-Onset Interval (seconds)')
plt.ylabel('Frequency')
plt.xlim(0, 2) # Limit x-axis for better visualization of common intervals
plt.show()

In [None]:
# Step 3: Analyze performance properties

# Number of notes per performance
num_notes_per_performance = [len(p) for split, performances in DATASET.items() for p in performances]
plt.figure(figsize=(12, 6))
sns.histplot(num_notes_per_performance, bins=50, kde=True)
plt.title('Distribution of Number of Notes per Performance')
plt.xlabel('Number of Notes')
plt.ylabel('Frequency')
plt.show()

# Total duration of performances
total_duration_per_performance = [p[-1][0] - p[0][0] if p else 0 for split, performances in DATASET.items() for p in performances]
plt.figure(figsize=(12, 6))
sns.histplot(total_duration_per_performance, bins=50, kde=True)
plt.title('Distribution of Total Performance Duration')
plt.xlabel('Duration (seconds)')
plt.ylabel('Frequency')
plt.show()

## Reintroducing Velocity to the autoencoder (NEW)

As noted in the original documentation, the original version of Piano Genie did not utilize velocity to make its predictions. Since we use the entire QWERTY keyboard for our project, we saw an opportunity to test if the model performance would differ by adding velocity to the autoencoder.

In the `PianoDecoder` class, we needed to reintroduce the velocity parameter initially dropped from the inputs, as shown below:

```python
inputs = [
            F.one_hot(k_m1, PIANO_NUM_KEYS + 1),
            t.unsqueeze(dim=2),
            b.unsqueeze(dim=2),
            v.unsqueeze(dim=2),
        ]
```

We also had to do a similar thing for the encoder, changing function signatures:

```python
def forward(self, k, t, v):
        inputs = [
            F.one_hot(k, PIANO_NUM_KEYS),
            t.unsqueeze(dim=2),
            v.unsqueeze(dim=2),
        ]
```

### Location in original docs mentioning lack of velocity:

```
we anticipate that it will be frustrating for users if the model predicts dynamics on their behalf, so we remove velocity terms  𝐯 :
```

In [None]:
# @title **(Step 2)** Define Piano Genie autoencoder

# @markdown Our intended interaction for Piano Genie is to have users perform on a miniature 8-button keyboard and automatically map each of their button presses to a key on a piano.
# @markdown Similarly to our formalization of piano performances, we will formalize a "button performance" as a sequence of "notes", where piano keys $k_i$ are replaced with buttons $b_i$, and we remove velocity since our button controller is not velocity sensitive.
# @markdown So a button performance $\mathbf{c}$ is:

# @markdown - $\mathbf{c} = (c_1, \ldots, c_N)$, where $c_i = (t_i, d_i, b_i \in \mathbb{B})$, i.e., (onsets, durations, buttons), and $\mathbb{B} = \{ \color{#EE2B29}\blacksquare, \color{#ff9800}\blacksquare, \color{#ffff00}\blacksquare, \color{#c6ff00}\blacksquare, \color{#00e5ff}\blacksquare, \color{#2979ff}\blacksquare, \color{#651fff}\blacksquare, \color{#d500f9}\blacksquare \}$

# @markdown And a corresponding piano performance is:

# @markdown - $\mathbf{x} = (x_1, \ldots, x_N)$, where $x_i = (t_i, d_i, k_i, v_i)$, i.e., (onsets, durations, keys, velocities)

# @markdown To map button performances into piano performances, we will train a generative model $P(\mathbf{x} \mid \mathbf{c})$.
# @markdown In practice, we will factorize this joint distribution over note sequences $\mathbf{x}$ into the product of conditional probabilities of individual notes: $P(\mathbf{x} \mid \mathbf{c}) = \prod_{i=1}^{N} P(x_i \mid \mathbf{x}_{< i}, \mathbf{c})$.

# @markdown Hence, our **overall goal is to learn** $P(x_i \mid \mathbf{x}_{< i}, \mathbf{c})$,
# @markdown which we will **approximate by modeling**:

# @markdown <center>$P(k_i \mid \mathbf{k}_{<i}, \mathbf{t}_{\leq i}, \mathbf{b}_{\leq i})$.</center>

# @markdown We arrived at this approximation by working through constraints imposed by the interaction (details at the end).

import torch
import torch.nn as nn
import torch.nn.functional as F

# @markdown #### **Decoder**

# @markdown <center><img src="https://raw.githubusercontent.com/chrisdonahue/music-cocreation-tutorial/main/part-1-py-training/figures/decoder.png" width=600px/></center>
# @markdown <center><b>Piano Genie decoder processing $N=4$ notes</b></center>

# @markdown The approximation $P(k_i \mid \mathbf{k}_{<i}, \mathbf{t}_{\leq i}, \mathbf{b}_{\leq i})$ constitutes the decoder of Piano Genie, which we will parameterize using an RNN.
# @markdown This is the portion of the model that users will interact with.
# @markdown To achieve our intended real-time interaction, we will compute and sample from this RNN at the instant the user presses a button, passing as input the key from the previous timestep, the current time, the button the user pressed, and a vector which summarizes the ongoing history.

# @markdown Formally, the decoder is a function:
# @markdown $D_{\theta}: k_{i-1}, t_i, b_i, \mathbf{h}_{i-1} \mapsto \mathbf{\hat{k}}_i, \mathbf{h}_i$, where:

# @markdown - $k_0$ is a special start-of-sequence token $<\text{S}>$

# @markdown - $\mathbf{h}_i$ is a vector summarizing timesteps $1, \ldots, i$

# @markdown - $\mathbf{h}_0$ is some initial value (zeros) for that vector

# @markdown - $\mathbf{\hat{k}}_i \in \mathbb{R}^{88}$ are the output logits for timestep $i$

SOS = PIANO_NUM_KEYS

class PianoGenieDecoder(nn.Module):
    def __init__(self, rnn_dim=128, rnn_num_layers=2):
        super().__init__()
        self.rnn_dim = rnn_dim
        self.rnn_num_layers = rnn_num_layers
        #change this to 4
        self.input = nn.Linear(PIANO_NUM_KEYS + 4, rnn_dim)
        self.lstm = nn.LSTM(
            rnn_dim,
            rnn_dim,
            rnn_num_layers,
            batch_first=True,
            bidirectional=False,
        )
        self.output = nn.Linear(rnn_dim, 88)

    def init_hidden(self, batch_size, device=None):
        h = torch.zeros(self.rnn_num_layers, batch_size, self.rnn_dim, device=device)
        c = torch.zeros(self.rnn_num_layers, batch_size, self.rnn_dim, device=device)
        return (h, c)

    def forward(self, k, t, b, v, h_0=None):
        # Prepend <S> token to shift k_i to k_{i-1}
        k_m1 = torch.cat([torch.full_like(k[:, :1], SOS), k[:, :-1]], dim=1)

        # Encode input
        inputs = [
            F.one_hot(k_m1, PIANO_NUM_KEYS + 1),
            t.unsqueeze(dim=2),
            b.unsqueeze(dim=2),
            v.unsqueeze(dim=2),
        ]
        x = torch.cat(inputs, dim=2)

        # Project encoded inputs
        x = self.input(x)

        # Run RNN
        if h_0 is None:
            h_0 = self.init_hidden(k.shape[0], device=k.device)
        x, h_N = self.lstm(x, h_0)

        # Compute logits
        hat_k = self.output(x)

        return hat_k, h_N


# @markdown #### **Encoder**

# @markdown <center><img src="https://i.imgur.com/P3bQFsC.png" width=600px/></center>
# @markdown <center><b>Piano Genie encoder processing $N=4$ notes</b></center>

# @markdown Because we lack examples of human button performances, we use an encoder to automatically learn to map piano performances into synthetic button performances.
# @markdown The encoder takes as input a sequence of keys and onset times and produces an equal-length sequence of buttons.
# @markdown Formally, the encoder is a function: $E_{\varphi} : \mathbf{k}, \mathbf{t} \mapsto \mathbf{b}$.

# @markdown Note the conceptual difference between the decoder and the encoder: the decoder process one sequence item at a time, while the encoder maps an entire input sequence to an output sequence.
# @markdown This is because the decoder (which we will use during inference) needs to process information as it becomes available in real time, whereas the encoder (which we only use during training) can observe the entire piano sequence before translating it into buttons.
# @markdown Despite this conceptual difference, in practice the encoder is also an RNN (though a bidirectional one) under the hood.

class PianoGenieEncoder(nn.Module):
    def __init__(self, rnn_dim=128, rnn_num_layers=2):
        super().__init__()
        self.rnn_dim = rnn_dim
        self.rnn_num_layers = rnn_num_layers
        self.input = nn.Linear(PIANO_NUM_KEYS + 2, rnn_dim)
        self.lstm = nn.LSTM(
            rnn_dim,
            rnn_dim,
            rnn_num_layers,
            batch_first=True,
            bidirectional=True,
        )
        self.output = nn.Linear(rnn_dim * 2, 1)

    def forward(self, k, t, v):
        inputs = [
            F.one_hot(k, PIANO_NUM_KEYS),
            t.unsqueeze(dim=2),
            v.unsqueeze(dim=2),
        ]
        x = self.input(torch.cat(inputs, dim=2))
        # NOTE: PyTorch uses zeros automatically if h is None
        x, _ = self.lstm(x, None)
        x = self.output(x)
        return x[:, :, 0]


# @markdown #### **Quantizing encoder output to discrete buttons**

# @markdown <center><img src="https://raw.githubusercontent.com/chrisdonahue/music-cocreation-tutorial/main/part-1-py-training/figures/quantization.png" width=600px/></center>
# @markdown <center><b>Quantizing continuous encoder output (grey line) to eight discrete values (colorful line segments)</b></center>

# @markdown You may have noticed in the code that the encoder outputs a real-valued scalar (let's call it $e_i \in \mathbb{R}$) at each timestep, but our goal is to output one of eight discrete buttons, i.e., $b_i \in \mathbb{B}$.
# @markdown To achieve this, we will quantize this real-valued scalar as the centroid of the nearest of eight bins between $[-1, 1]$ (see figure above):

# @markdown <center>$b_i = 2 \cdot \frac{\tilde{b}_i - 1}{B - 1} - 1$, where $\tilde{b}_i = \text{round} \left( 1 + (B - 1) \cdot \min \left( \max \left( \frac{e_i  + 1}{2}, 0 \right), 1 \right) \right)$</center>

class IntegerQuantizer(nn.Module):
    def __init__(self, num_bins):
        super().__init__()
        self.num_bins = num_bins

    def real_to_discrete(self, x, eps=1e-6):
        x = (x + 1) / 2
        x = torch.clamp(x, 0, 1)
        x *= self.num_bins - 1
        x = (torch.round(x) + eps).long()
        return x

    def discrete_to_real(self, x):
        x = x.float()
        x /= self.num_bins - 1
        x = (x * 2) - 1
        return x

    def forward(self, x):
        # Quantize and compute delta (used for straight-through estimator)
        with torch.no_grad():
            x_disc = self.real_to_discrete(x)
            x_quant = self.discrete_to_real(x_disc)
            x_quant_delta = x_quant - x

        # @markdown In the backwards pass, we will use the straight-through estimator (Bengio et al. 2013), i.e., pretend that this discretization did not happen when computing gradients.
        # Quantize w/ straight-through estimator
        x = x + x_quant_delta

        return x


# @markdown #### **Defining the autoencoder**

# @markdown Finally, the Piano Genie autoencoder is simply the composition of the encoder, quantizer, and decoder.

class PianoGenieAutoencoder(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.enc = PianoGenieEncoder(
            rnn_dim=cfg["model_rnn_dim"],
            rnn_num_layers=cfg["model_rnn_num_layers"],
        )
        self.quant = IntegerQuantizer(cfg["num_buttons"])
        self.dec = PianoGenieDecoder(
            rnn_dim=cfg["model_rnn_dim"],
            rnn_num_layers=cfg["model_rnn_num_layers"],
        )

    def forward(self, k, t, v):
        e = self.enc(k, t, v)
        b = self.quant(e)
        hat_k, _ = self.dec(k, t, b, v)
        return hat_k, e


# @markdown #### **Approximating $P(x_i \mid \mathbf{x}_{< i}, \mathbf{c})$**

# @markdown This section walks through how we designed an approximation to $P(x_i \mid \mathbf{x}_{< i}, \mathbf{c})$ which would be appropriate for our intended interaction. You probably don't need to understand this, but some may find it helpful as an illustration of how to design a generative model around constraints imposed by interaction.

# @markdown First, we expand the terms, treating the onsets $\mathbf{t}$ and durations $\mathbf{d}$ as part of the button performance $\mathbf{c}$:

# @markdown <center>$P(x_i \mid \mathbf{x}_{< i}, \mathbf{c}) = P(k_i, v_i \mid \mathbf{k}_{<i}, \mathbf{v}_{<i}, \mathbf{t}, \mathbf{d}, \mathbf{b})$</center>

# @markdown Because we want this interaction to be real-time, we must remove any information that might not be available at time $t_i$ (the moment the user presses a button), which includes future onsets $\mathbf{t}_{>i}$, future buttons $\mathbf{b}_{>i}$, and all durations $\mathbf{d}$, since notes can be held indefinitely:

# @markdown <center>$\approx P(k_i, v_i \mid \mathbf{k}_{<i}, \mathbf{v}_{<i}, \mathbf{t}_{\leq i}, \mathbf{b}_{\leq i})$</center>

# @markdown Finally, we anticipate that it will be frustrating for users if the model predicts dynamics on their behalf, so we remove velocity terms $\mathbf{v}$:

# @markdown <center>$\approx P(k_i, \mid \mathbf{k}_{<i}, \mathbf{t}_{\leq i}, \mathbf{b}_{\leq i})$</center>

## (STEP 3) Modifying training pipeline (NEW)

After modifying the autoencoder, we needed to figure out a way in order to fix the function calls in the training pipeline so the encoder could properly receive the velocity values, and that we could properly interpret the new output.

We needed to add a new `batch_v` parameter. `batch_v` is a list that stores the velocity values for each note in a minibatch of piano performances. `n[3]` represents the velocity value in a note, which can be derived by looking at how the `download_and_parse_maestro` function structures the notes list.

In context, the changes described are below.

```python
        # Key features
        batch_k.append([n[2] for n in subsample])
        batch_v.append([n[3] for n in subsample])

        # Onset features
        # NOTE: For stability, we pass delta time to Piano Genie instead of time.
        t = np.diff([n[0] for n in subsample])
        t = np.concatenate([[1e8], t])
        t = np.clip(t, 0, CFG["data_delta_time_max"])
        batch_t.append(t)

    return (torch.tensor(batch_k).long(), torch.tensor(batch_t).float(), torch.tensor(batch_v).float())
```

In [None]:
'@title **(Step 3)** Train Piano Genie'

# @markdown *Note*: Check this box to log training curves to [Weights & Biases](https://wandb.ai/) (which will prompt you to log in).
USE_WANDB = False  # @param{type:"boolean"}

# @markdown Now that we've defined the autoencoder, we need to train it.
# @markdown We will train the entire autoencoder end-to-end to minimize the reconstruction loss of the decoder.

# @markdown <center>$\mathcal{L}_{\text{recons}} = \frac{1}{N} \sum_{i=1}^{N} \text{CrossEntropy}(\text{Softmax}(\mathbf{\hat{k}}_i), k_i)$</center>

# @markdown This loss alone does not encourage the encoder to produce button sequences with any particular structure, so the behavior of the decoder will likely be fairly unpredictable at interaction time.
# @markdown We think it might be intuitive to users if the decoder respected the _contour_ of their performance, i.e., if higher buttons produced higher notes and lower buttons produced lower notes.
# @markdown Hence, we include a loss term which encourages the encoder to produces button sequences which align with the contour of the piano key sequences.

# @markdown <center>$\mathcal{L}_{\text{contour}} = \frac{1}{N - 1} \sum_{i=2}^{N} \max (0, 1 - (k_i - k_{i-1}) \cdot (e_i - e_{i-1}))^2$</center>

# @markdown Finally, we find empirically that the encoder often outputs values outside of the $[-1, 1]$ range used for discretization.
# @markdown Hence, we add a loss term which explicitly encourages this behavior

# @markdown <center>$\mathcal{L}_{\text{margin}} = \frac{1}{N} \sum_{i=1}^{N} \max(0, |e_i| - 1)^2$</center>

# @markdown Thus, our final loss function is:
# @markdown <center>$\mathcal{L} = \mathcal{L}_{\text{recons}} + \mathcal{L}_{\text{contour}} + \mathcal{L}_{\text{margin}}$</center>


CFG = {
    "seed": 0,
    # Number of buttons in interface
    "num_buttons": 26,
    # Onset delta times will be clipped to this maximum
    "data_delta_time_max": 1.0,
    # Max time stretch for data augmentation (+- 5%)
    "data_augment_time_stretch_max": 0.05,
    # Max transposition for data augmentation (+- tritone)
    "data_augment_transpose_max": 6,
    # RNN dimensionality
    "model_rnn_dim": 128,
    # RNN num layers
    "model_rnn_num_layers": 2,
    # Training hyperparameters
    "batch_size": 32,
    "seq_len": 128,
    "lr": 3e-4,
    "loss_margin_multiplier": 1.0,
    "loss_contour_multiplier": 1.0,
    "summarize_frequency": 128,
    "eval_frequency": 128,
    "max_num_steps": 50000
}

import pathlib
import random

import numpy as np

if USE_WANDB:
    try:
        import wandb
    except ModuleNotFoundError:
        !!pip install wandb
        import wandb

# Init
run_dir = pathlib.Path("piano_genie")
run_dir.mkdir(exist_ok=True)
with open(pathlib.Path(run_dir, "cfg.json"), "w") as f:
    f.write(json.dumps(CFG, indent=2))
if USE_WANDB:
    wandb.init(project="music-cocreation-tutorial", config=CFG, reinit=True)

# Set seed
if CFG["seed"] is not None:
    random.seed(CFG["seed"])
    np.random.seed(CFG["seed"])
    torch.manual_seed(CFG["seed"])
    torch.cuda.manual_seed_all(CFG["seed"])

# Create model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = PianoGenieAutoencoder(CFG)
model.train()
model.to(device)
print("-" * 80)
for n, p in model.named_parameters():
    print(f"{n}, {p.shape}")

# Create optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=CFG["lr"])

# Subsamples performances to create a minibatch
def performances_to_batch(performances, device, train=True):
    batch_k = []
    batch_t = []
    batch_v = []
    for p in performances:
        # Subsample seq_len notes from performance
        assert len(p) >= CFG["seq_len"]
        if train:
            subsample_offset = random.randrange(0, len(p) - CFG["seq_len"])
        else:
            subsample_offset = 0
        subsample = p[subsample_offset : subsample_offset + CFG["seq_len"]]
        assert len(subsample) == CFG["seq_len"]

        # Data augmentation
        if train:
            stretch_factor = random.random() * CFG["data_augment_time_stretch_max"] * 2
            stretch_factor += 1 - CFG["data_augment_time_stretch_max"]
            transposition_factor = random.randint(
                -CFG["data_augment_transpose_max"], CFG["data_augment_transpose_max"]
            )
            subsample = [
                (
                    n[0] * stretch_factor,
                    n[1] * stretch_factor,
                    max(0, min(n[2] + transposition_factor, PIANO_NUM_KEYS - 1)),
                    n[3],
                )
                for n in subsample
            ]

        # Key features
        batch_k.append([n[2] for n in subsample])
        batch_v.append([n[3] for n in subsample])

        # Onset features
        # NOTE: For stability, we pass delta time to Piano Genie instead of time.
        t = np.diff([n[0] for n in subsample])
        t = np.concatenate([[1e8], t])
        t = np.clip(t, 0, CFG["data_delta_time_max"])
        batch_t.append(t)

    return (torch.tensor(batch_k).long(), torch.tensor(batch_t).float(), torch.tensor(batch_v).float())


# Train
step = 0
best_eval_loss = float("inf")
while CFG["max_num_steps"] is None or step < CFG["max_num_steps"]:
    if step % CFG["eval_frequency"] == 0:
        model.eval()

        with torch.no_grad():
            eval_losses_recons = []
            eval_violates_contour = []
            for i in range(0, len(DATASET["validation"]), CFG["batch_size"]):
                eval_batch = performances_to_batch(
                    DATASET["validation"][i : i + CFG["batch_size"]],
                    device,
                    train=False,
                )
                eval_k, eval_t, eval_v = tuple(t.to(device) for t in eval_batch)
                eval_hat_k, eval_e = model(eval_k, eval_t, eval_v)
                eval_b = model.quant.real_to_discrete(eval_e)
                eval_loss_recons = F.cross_entropy(
                    eval_hat_k.view(-1, PIANO_NUM_KEYS),
                    eval_k.view(-1),
                    reduction="none",
                )
                eval_violates = torch.logical_not(
                    torch.sign(torch.diff(eval_k, dim=1))
                    == torch.sign(torch.diff(eval_b, dim=1)),
                ).float()
                eval_violates_contour.extend(eval_violates.cpu().numpy().tolist())
                eval_losses_recons.extend(eval_loss_recons.cpu().numpy().tolist())

            eval_loss_recons = np.mean(eval_losses_recons)
            if eval_loss_recons < best_eval_loss:
                torch.save(model.state_dict(), pathlib.Path(run_dir, "model.pt"))
                best_eval_loss = eval_loss_recons

        eval_metrics = {
            "eval_loss_recons": eval_loss_recons,
            "eval_contour_violation_ratio": np.mean(eval_violates_contour),
        }
        if USE_WANDB:
            wandb.log(eval_metrics, step=step)
        print(step, "eval", eval_metrics)

        model.train()

    # Create minibatch
    batch = performances_to_batch(
        random.sample(DATASET["train"], CFG["batch_size"]), device, train=True
    )
    k, t, v = tuple(t.to(device) for t in batch)

    # Run model
    optimizer.zero_grad()
    k_hat, e = model(k, t, v)

    # Compute losses and update params
    loss_recons = F.cross_entropy(k_hat.view(-1, PIANO_NUM_KEYS), k.view(-1))
    loss_margin = torch.square(
        torch.maximum(torch.abs(e) - 1, torch.zeros_like(e))
    ).mean()
    loss_contour = torch.square(
        torch.maximum(
            1 - torch.diff(k, dim=1) * torch.diff(e, dim=1),
            torch.zeros_like(e[:, 1:]),
        )
    ).mean()
    loss = torch.zeros_like(loss_recons)
    loss += loss_recons
    if CFG["loss_margin_multiplier"] > 0:
        loss += CFG["loss_margin_multiplier"] * loss_margin
    if CFG["loss_contour_multiplier"] > 0:
        loss += CFG["loss_contour_multiplier"] * loss_contour
    loss.backward()
    optimizer.step()
    step += 1

    if step % CFG["summarize_frequency"] == 0:
        metrics = {
            "loss_recons": loss_recons.item(),
            "loss_margin": loss_margin.item(),
            "loss_contour": loss_contour.item(),
            "loss": loss.item(),
        }
        if USE_WANDB:
            wandb.log(metrics, step=step)
        print(step, "train", metrics)

# Download the trained model so we don't lose it!
from google.colab import files

files.download('piano_genie/model.pt')
files.download('piano_genie/cfg.json')

## Ignoring Step 4 and 5 (NEW)

We chose to ignore Step 4 and 5, as the old interface would not be compatible with a model that expects 26 bins.

After Step 5, we will describe our new pipeline for encoding what the user types and interpreting that output file.


In [None]:
# # @title **(Step 4)** Port trained decoder parameters to Tensorflow.js format

# # @markdown In this step, we will use the TensorFlow.js Python library to export our model's parameters in a binary format, to be loaded later by the JavaScript client.

# !!pip install tensorflowjs

# from tensorflowjs.write_weights import write_weights

# # Load saved model dict
# d = torch.load("piano_genie/model.pt", map_location=torch.device("cpu"))
# d = {k: v.numpy() for k, v in d.items()}

# # Convert to tensorflow-js format
# pathlib.Path("piano_genie/dec_tfjs").mkdir(exist_ok=True)
# write_weights(
#     [[{"name": k, "data": v} for k, v in d.items() if k.startswith("dec")]],
#     "piano_genie/dec_tfjs",
# )

In [None]:
# # @title **(Step 5)** Create test case to check correctness of JavaScript port

# # @markdown Finally, we will serialize a sequence of inputs to and outputs from our trained model to create a test case for our JavaScript reimplementation.
# # @markdown This is critically important—I have ported many models from Python to JavaScript and have yet to get it right on the first try.
# # @markdown Porting models from PyTorch to TensorFlow.js is additionally tricky because parameters of the same shape are often used differently by the two APIs.

# # Restore model from saved checkpoint
# device = torch.device("cpu")
# with open("piano_genie/cfg.json", "r") as f:
#     cfg = json.load(f)
# model = PianoGenieAutoencoder(cfg)
# model.load_state_dict(torch.load("piano_genie/model.pt", map_location=device))
# model.eval()
# model.to(device)

# # Serialize a batch of inputs/outputs as JSON
# with torch.no_grad():
#     ground_truth_keys, input_dts = performances_to_batch(
#         [DATASET["validation"][0]], device, train=False
#     )
#     output_logits, input_buttons = model(ground_truth_keys, input_dts)
#     input_buttons = model.quant.real_to_discrete(input_buttons)

#     input_dts = input_dts[0].cpu().numpy().tolist()
#     ground_truth_keys = ground_truth_keys[0].cpu().numpy().tolist()
#     input_keys = [PIANO_NUM_KEYS] + ground_truth_keys[:-1]
#     input_buttons = input_buttons[0].cpu().numpy().tolist()
#     output_logits = output_logits[0].cpu().numpy().tolist()

#     test = {
#         n: eval(n)
#         for n in ["input_dts", "input_keys", "input_buttons", "output_logits"]
#     }
#     with open(pathlib.Path("piano_genie", "test.json"), "w") as f:
#         f.write(json.dumps(test))

## New Step 4 and 5 (NEW)

### Taking User Input

We created a new HTML website that encodes user input as a CSV file. The file can be found on our Github here: https://github.com/AnniePhan02/CSE253-Assignment2/blob/main/task2/index.html

### Feeding user input file into the model

Running Steps 1-5 will result in two files: `cfg.json` and `model.pt`.

The key function to generate output from the model is the `step()` function.



Example `CFG.json`

```json
{
  "seed": 0,
  "num_buttons": 26,
  "data_delta_time_max": 1.0,
  "data_augment_time_stretch_max": 0.05,
  "data_augment_transpose_max": 6,
  "model_rnn_dim": 128,
  "model_rnn_num_layers": 2,
  "batch_size": 32,
  "seq_len": 128,
  "lr": 0.0003,
  "loss_margin_multiplier": 1.0,
  "loss_contour_multiplier": 1.0,
  "summarize_frequency": 128,
  "eval_frequency": 128,
  "max_num_steps": 50000
}
```

## Running the Newly Generated Model

The repository we pulled from had some base code for training the model, but not for running it. We created the following script with the assistance of copilot in order to feed input and interpret output from the model.

### Defining Bins Based On Keys

Since we have 26 keys, we made 26 bins

```python
def letter_to_button_26(letter):
    # Map letters to button indices for a 26-letter keyboard layout
    alphabet = "abcdefghijklmnopqrstuvwxyz"
    if letter in alphabet:
        return alphabet.index(letter)
    else:
        return 0  # Default case for unsupported characters

```

### Full Script

In [None]:
import torch
import json
from genie import PianoGenieAutoencoder, SOS


cfg = json.load(open("cfg.json"))
model = PianoGenieAutoencoder(cfg)
model.load_state_dict(torch.load("model.pt", map_location="cpu"))
model.eval()

# 2) Prepare decoder state for a single stream
#    We’ll run batch_size=1 since we’re in interactive mode
h = model.dec.init_hidden(batch_size=1)
# k_prev holds the last output key index; start with SOS
k_prev = torch.full((1, 1), SOS, dtype=torch.long)

# 3) Now, each time the user presses a button, you have:
#    b_i: integer 0…7    (which button)
#    t_i: float          (absolute onset time in seconds)
#    v_i: int 1…127      (velocity)
#
# You’ll convert these into tensors, call the decoder, sample/argmax,
# and then feed that key back in as the next k_prev.


def step(b_i, t_i, v_i, k_prev, h):
    # 3a) button needs to be the *real-valued* centroid in [–1,1]
    b_real = model.quant.discrete_to_real(torch.tensor([[b_i]]))  # → shape (1,1)
    # 3b) wrap time & velocity
    t = torch.tensor([[t_i]], dtype=torch.float)
    v = torch.tensor([[v_i]], dtype=torch.float)

    # 3c) run decoder for one timestep
    with torch.no_grad():
        logits, h = model.dec(k_prev, t, b_real, v, h)
        # logits: (1,1,88)
        probs = torch.softmax(logits[0, 0], dim=-1)
        k_i = torch.multinomial(probs, num_samples=1)  # or .argmax()

    return k_i.reshape(1, 1), h, probs


# 4) Example usage:
#    Suppose the user hits button 3 at time=0.57s with velocity=90:
# k1, h, p = step(b_i=3, t_i=0.57, v_i=90, k_prev=k_prev, h=h)


# version 1
def letter_to_button_keyboard(letter):
    # Map letters on the keyboard to button indices, top row, middle row, bottom row
    top = "qwertyuiop"
    middle = "asdfghjkl"
    bottom = "zxcvbnm"
    if letter in top:
        return min(top.index(letter), 8), 40
    elif letter in middle:
        return min(middle.index(letter), 8), 80
    elif letter in bottom:
        return min(bottom.index(letter), 8), 120
    else:
        return 0, 0


def letter_to_button_26(letter):
    # Map letters to button indices for a 26-letter keyboard layout
    alphabet = "abcdefghijklmnopqrstuvwxyz"
    if letter in alphabet:
        return alphabet.index(letter)
    else:
        return 0  # Default case for unsupported characters


# read typing_intervals.csv
import csv

# 4) Example usage:
#    Suppose the user hits button 3 at time=0.57s with velocity=90:
# k1, h, p = step(b_i=3, t_i=0.57, v_i=90, k_prev=k_prev, h=h)

notes = []

filename = "typing_wpm_timestamps.csv"
filename_no_ext = filename.split(".")[0]

with open(filename, "r") as f:
    reader = csv.reader(f)
    # skip header
    next(reader)
    for row in reader:
        print(row)
        letter, time, wpm = row[0], row[1], row[2]

        if not letter or not time:
            print("Skipping empty row")
            continue

        # check if wpm is numeric
        # if not wpm.isnumeric():
        #     print(f"Skipping non-numeric wpm: {wpm}")
        #     continue

        time = float(time)
        wpm = float(wpm)

        print(letter, time, wpm)
        # convert letter to button index and velocity
        letter = letter.lower()

        # velocity is used from mapping function, mapping is based on keyboard layout
        button = letter_to_button_26(letter)
        velocity = int(wpm * 2)
        # ensure velocity is in range 1-127
        velocity = max(1, min(127, velocity))
        k_prev, h, probs = step(b_i=button, t_i=time, v_i=velocity, k_prev=k_prev, h=h)
        notes.append((k_prev.item(), time, velocity))
print(notes)
# generate the midi file
import pretty_midi
import time

pm = pretty_midi.PrettyMIDI()
instr = pretty_midi.Instrument(program=0)

for i, (note, onset, vel) in enumerate(notes):
    # define a duration for each note
    if i + 1 < len(notes):
        end = notes[i + 1][1]
    else:
        end = onset + 0.5
    pm_note = pretty_midi.Note(velocity=vel, pitch=note, start=onset, end=end)
    print(pm_note)
    instr.notes.append(pm_note)

pm.instruments.append(instr)
filename = f"output_{time.time()}_{filename_no_ext}.mid"
pm.write(filename)
