# Symbolic Music Generation with Variational Autoencoders
### CSE 153 - Assignment 2
#### Edgar Guzman

# Description

**Task**: *Make some beautiful music!*

This project develops deep learning frameworks to generate symbolic music in both conditioned and unconditioned modes. Trained on thousands of copyright-free MIDI files, the goal is to develop structurally sound and audibly appealing piano compositions. The datasets used in this project are available as zip files from [PDMX](https://pnlong.github.io/PDMX.demo/), [MAESTRO](https://magenta.tensorflow.org/datasets/maestro#v300), and [MIDI Chords](https://github.com/ldrolez/free-midi-chords).

# Task 1a - 1D Variational Autoencoder

### Introduction 
Our first model aims to generate music using symbolic, unconditioned generation. We train a model on over roughly 2,500 single-instrument, double-staffed MIDI files to determine both note and duration distributions. All of these MIDI files were sampled from PDMX: A Large-Scale Public Domain MusicXML Dataset for Symbolic Music Processing. This dataset contains over 250,000 public domain files for use in ML training.

### Data Cleaning
Processing the data required the use of multiple music packages. Initial attempts used <code>MidiUtil</code> and <code>Mido</code>, but since the model requires the use of a piano roll, <code>pretty_midi</code> was used for accessibility. While most MIDI files were cleaned up and ready for use, many had errors that pretty_midi was unable to solve. For example, a few files had a time scale of 128/4 that appeared as zeros. Any file that threw out an error was ignored in our training model.

In [1]:
import os
import time
import numpy as np
import pretty_midi
import torch
import torch.nn as nn
import torch.optim as optim

from glob import glob
from torch.utils.data import Dataset, DataLoader

### Model Description
The algorithm I chose to employ was a Variational Autoencoder. The way this model works is best described as an hourglass-shaped neural network. This model works by taking as input a 2-channel piano roll of 128 possible pitches, multiplied by the number of notes we want to generate. Each note, pitch, and channel combination is called a node. The encoder goes through multiple layers, each with fewer nodes. These nodes capture key features that would be difficult to discover through manual feature extraction. Additionally, these nodes continue to decrease until they reach the latent dimension, the smallest layer with the most important features. The layers again increase until the output layer's size is the same as the input's shape. In short, **VAEs describe a probability distribution over the latent variables, whose encoder outputs the mean and variance of the distribution that is used to sample from the distribution and generate new notes.**

The goal of using a VAE is to effectively learn patterns from multiple music samples. The result of running this algorithm is a set of features, or model weights, that best represent the sample files as a whole. Through each iteration of the model, a loss function calculates how well the model is related to a batch of randomly sampled training files. Ideally, the loss decreases with each epoch. Realistically, the loss will be very high, as each musical piece varies greatly through many features. A low loss usually means that the model is going to predict dissonant chords in an attempt to fill the most used notes positions. We will discuss methods used to prevent this from happening. We want to optimize our weights to produce a structurally sound song without generating a song that sounds slightly similar to every song it was trained on.

### Parameters, Arguments, and Variables
The following parameters and their selected arguments are described below:

| Parameter | Description | Argument |
| --------- | ----------- | -------- |
| `seq_length` | Number of generated notes / Number of samples | 400 |
| `input_dim` | Input dimension shape | 2 * 128 * `seq_length` |
| `hidden_dim` | Encoder hidden dimension shape | 2048 |
| `latent_dim` | Encoder latent dimension shape | 128 |
| `output_dum` | Output dimension shape | `input_dim` |
| `root_pitch` | Default pitch for generating notes | 60 |
| `batch_size` | Number of samples processed in one iteration | 250 |
| `num_samlpes` | Number of MIDI files to generate | 1 `or` 3 |
| `max_notes_per_time` | Maximum chord size | 3 |

This model also uses the following variable values:

| Variable | Description | Value |
| -------- | ----------- | ----- |
| `max_duration` | Maximum note duration allowed | 50 |
| `num_epochs` | Number of epochs in the loss function | 2 |
| `threshold` | Probability threshold to qualify for candidacy | 0.035 |
| `max_history` | Maximum number of repeated notes | 4 |
| `offset` | Pitch shift change | 0 `or` 2 |
| `note_duration` | Default note duration | .25 (eighth note) |

In [2]:
# 1. Data Preparation: Convert MIDI to a sequence representation with note durations
def midi_to_sequence(midi_path, seq_length):
    try:
        pm = pretty_midi.PrettyMIDI(midi_path)
    except Exception as e:
        print(f"Error loading {midi_path}: {e}")
        # Return empty or dummy data if load fails
        piano_roll = np.zeros((128, seq_length))
        duration_array = np.zeros((128, seq_length))
        return piano_roll, duration_array
    piano_roll = pm.get_piano_roll(fs=20)  # 20 frames per second
    # Binarize piano roll
    piano_roll = (piano_roll > 0).astype(np.float32)

    # Generate note duration information
    duration_array = np.zeros_like(piano_roll)

    for pitch in range(128):
        pitch_vector = piano_roll[pitch, :]
        diff = np.diff(pitch_vector, prepend=0)
        onsets = np.where(diff == 1)[0]
        offsets = np.where(diff == -1)[0]
        if len(offsets) < len(onsets):
            offsets = np.append(offsets, len(pitch_vector))
        for onset, offset in zip(onsets, offsets):
            duration = offset - onset
            duration_array[pitch, onset:offset] = duration
            
    # Pad or truncate to fixed length
    if piano_roll.shape[1] < seq_length:
        pad_width = seq_length - piano_roll.shape[1]
        piano_roll = np.pad(piano_roll, ((0, 0), (0, pad_width)), mode='constant')
        duration_array = np.pad(duration_array, ((0, 0), (0, pad_width)), mode='constant')
    else:
        piano_roll = piano_roll[:, :seq_length]
        duration_array = duration_array[:, :seq_length]

    return piano_roll, duration_array

In [3]:
# 2. Dataset class
class MidiDataset(Dataset):
    def __init__(self, midi_files, seq_length=400):
        self.files = midi_files
        self.seq_length = seq_length

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        midi_path = self.files[idx]
        piano_roll, duration_array = midi_to_sequence(midi_path, self.seq_length)
        max_duration = 50
        duration_norm = np.clip(duration_array / max_duration, 0, 1)
        # Stack piano roll and duration as channels
        # Shape: (2, pitch, time)
        sample = np.stack([piano_roll, duration_norm], axis=0)
        return torch.tensor(sample, dtype=torch.float32)

In [4]:
# 3. Define VAE components with dual outputs
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc_mu = nn.Linear(hidden_dim, latent_dim)
        self.fc_logvar = nn.Linear(hidden_dim, latent_dim)
        self.relu = nn.ReLU()

    def forward(self, x):
        h = self.relu(self.fc1(x))
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar

class Decoder(nn.Module):
    def __init__(self, latent_dim, hidden_dim, output_dim, seq_length=400):
        super().__init__()
        self.seq_length = seq_length
        self.fc1 = nn.Linear(latent_dim, hidden_dim)
        self.output_dim = output_dim
        # Split into two heads: one for pitch, one for duration
        self.fc_pitch = nn.Linear(hidden_dim, 128 * seq_length)  # pitch output
        self.fc_duration = nn.Linear(hidden_dim, 128 * seq_length)  # duration output
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, z):
        h = self.relu(self.fc1(z))
        pitch_logits = self.fc_pitch(h)
        duration_logits = self.fc_duration(h)
        # reshape to (batch, channels=2, pitch=128, time=seq_length)
        pitch_logits = pitch_logits.view(-1, 128, self.seq_length)
        duration_logits = duration_logits.view(-1, 128, self.seq_length)
        # Sigmoid for pitch (binary presence)
        pitch_probs = self.sigmoid(pitch_logits)
        return pitch_probs, duration_logits

class MusicVAE(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim, seq_length):
        super().__init__()
        self.seq_length = seq_length
        self.encoder = Encoder(input_dim, hidden_dim, latent_dim)
        self.decoder = Decoder(latent_dim, hidden_dim, input_dim)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        mu, logvar = self.encoder(x)
        z = self.reparameterize(mu, logvar)
        pitch_probs, duration_logits = self.decoder(z)
        return pitch_probs, duration_logits, mu, logvar

In [5]:
# 4. Loss function with dual components
def loss_function(pitch_probs, duration_logits, x, mu, logvar):
    # Split x into target pitch and duration
    target_pitch = x[:, 0, :, :]  # shape: (batch, 128, time)
    target_duration = x[:, 1, :, :]  # shape: (batch, 128, time)
    max_duration = 50

    # Compute binary cross-entropy for pitch
    BCE = nn.functional.binary_cross_entropy(pitch_probs, target_pitch, reduction='sum')
    # Compute MSE for durations (regression)
    duration_pred = duration_logits
    duration_target = target_duration
    # Denormalize durations for loss calculation if desired
    MSE = nn.functional.mse_loss(duration_pred, duration_target, reduction='sum')
    # KLD for VAE
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

    return BCE + MSE + KLD

C_MAJOR_SCALE = [0, 2, 4, 5, 7, 9, 11]
def pitch_shift_to_c_major(pitch, base_pitch=60):
    pitch_in_scale = pitch - base_pitch
    # Find the closest scale step
    distances = [abs(pitch_in_scale - interval) for interval in C_MAJOR_SCALE]
    min_index = np.argmin(distances)
    shifted_pitch = base_pitch + C_MAJOR_SCALE[min_index]
    return shifted_pitch

CHORD_PATTERNS = [
    [0, 4, 7],       # Major triad
    [-3, 0, 4],      # Minor triad1
    [-7, -3, 0],     # Minor triad2
    [-5, -1, 2]      # Other chord
]
def get_custom_chord(root_pitch):
    # Randomly select one of your custom chord patterns
    pattern = CHORD_PATTERNS[np.random.randint(len(CHORD_PATTERNS))]
    # Transpose pattern to the root pitch
    chord_pitches = [root_pitch + interval for interval in pattern]
    # Keep within MIDI pitch range
    chord_pitches = [p for p in chord_pitches if 0 <= p <= 127]
    return chord_pitches

In [240]:
# 5. Setup training
midi_files = (glob("mid/0/0/*.mid") +
              glob("mid/0/1/*.mid") + 
              glob("mid/0/2/*.mid") + 
              glob("mid/0/3/*.mid") + 
              glob("mid/0/4/*.mid"))
dataset = MidiDataset(midi_files, seq_length=400)
dataloader = DataLoader(dataset, batch_size=250, shuffle=True, num_workers=0)

input_dim = 2 * 128 * 400  # 2 channels: piano roll + duration
hidden_dim = 2048
latent_dim = 128

model = MusicVAE(input_dim, hidden_dim, latent_dim, seq_length=400)
optimizer = optim.Adam(model.parameters(), lr=1e-3)

print("Start Training")
t1 = time.time()
# 6. Training loop
num_epochs = 2
for epoch in range(num_epochs):
    total_loss = 0
    for batch in dataloader:
        batch = batch.view(batch.size(0), -1)  # flatten to (batch, input_dim)
        optimizer.zero_grad()
        pitch_probs, duration_logits, mu, logvar = model(batch)
        loss = loss_function(pitch_probs, duration_logits, batch.view(batch.size(0), 2, 128, -1), mu, logvar)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader)}")
t2 = time.time()
print(f"Finished Training in {round(t2 - t1, 2)}s")

Start Training
Epoch 1/2, Loss: 21945.138613672698
Epoch 2/2, Loss: 19212.82993596717
Finished Training in 371.63s
Done


### Music Generation
Our model then generates notes, either single notes or chords, based on the probability of that note being played. The way this works is simple: the VAE determines a probability distribution of every possible note that was played within a certain beat. The algorithm filters out any note with a probability below the threshold value. Then, it checks the length of this array. If there are more than three notes, there is a high probability of generating a dissonant chord; therefore, the three notes with the greatest probability are selected. If there are 1 or 2 notes, randomly select one. If no notes were above the threshold, generate the previous note. This is important so as to reduce randomness, increase tone harmonics, and produce a more audibly pleasing composition. Other constraints, such as limiting repeating notes to 4 consecutive notes, are also used to reduce noise and improve our generation capabilities. 

One advantage of implementing the model this way is that it prevents bias in our model. By limiting how often the most popular or frequent note is played, our model has a reduced risk of predicting the same note for all possible positions. A disadvantage of this method is that more advanced models that can benefit from larger datasets are available, but due to our limited time and memory constraints, a simpler model was chosen. Additionally, this model heavily predicts chords over single notes. The reasons why this happens are twofold. First, a low threshold value means that more candidate notes are chosen, so chords are more likely to be played. Second, this is due to the training dataset containing a significantly large number of chords, making single note generation more difficult.

In [6]:
# 7. Generate new music with note durations
def generate_music_with_repetition_control(model, latent_dim, num_samples=1, seq_length=400, max_notes_per_time=3):
    threshold= 0.035
    max_history = 4  # number of previous notes to compare
    with torch.no_grad():
        z = torch.randn(num_samples, latent_dim)
        pitch_probs, duration_logits = model.decoder(z)
        pitch_probs = pitch_probs.cpu().numpy()
        duration_logits = duration_logits.cpu().numpy()
    
        for i in range(num_samples):
            pm = pretty_midi.PrettyMIDI()
            instrument = pretty_midi.Instrument(program=0)
            dur_pred = duration_logits[i]
            dur_pred = np.clip(dur_pred, 0, 1)
    
            previous_notes = [[60]]

            start_time = 0
            for t in range(seq_length):
                selected_pitches = []
                pitch_probs_t = pitch_probs[i, :, t]
                pitch_probs_t = np.array(pitch_probs_t) / sum(pitch_probs_t)
                candidate_pitches = np.where(pitch_probs_t > threshold)[0].tolist()
                if len(candidate_pitches) == 0:
                    last_pitch = previous_notes[-1][-1]
                    shifted_pitch = pitch_shift_to_c_major(last_pitch)
                    selected_pitches.append(shifted_pitch)
                elif len(candidate_pitches) > max_notes_per_time:
                    # Pick a root pitch from candidates
                    root_pitch = np.random.choice(candidate_pitches)
                    # Generate the major chord pitches
                    chord_pitches = get_custom_chord(root_pitch)
                    # Assign the chord pitches as the selected notes
                    selected_pitches.extend(chord_pitches)
                else:
                    choose = np.random.choice(candidate_pitches)
                    shifted_pitch = pitch_shift_to_c_major(choose)
                    selected_pitches.append(shifted_pitch)

                offset = 0
                if all(selected_pitches == i for i in previous_notes):
                    offset = 2
                for pitch_lst in selected_pitches:
                    if type(pitch_lst) != list:
                        pitch_lst = [pitch_lst]
                    for p in pitch_lst:
                        note_duration = dur_pred[p, t]
                        if note_duration == 0:
                            note_duration = .25
                        end_time = start_time + note_duration 
                        note = pretty_midi.Note(velocity=100, pitch=p + offset, start=start_time, end=end_time)
                        instrument.notes.append(note)
                        
                if all(selected_pitches == i for i in previous_notes):
                    selected_pitches = np.array(selected_pitches)
                    selected_pitches += offset
                    selected_pitches = list(selected_pitches)
                previous_notes.append(selected_pitches)
                
                if len(previous_notes) > max_history:
                    previous_notes = previous_notes[-max_history:]
                start_time = end_time

            pm.instruments.append(instrument)
            pm.write(f"music/1d_VAE_{i}.mid")

In [23]:
# Generate and save new MIDI with note durations
generate_music_with_repetition_control(model, latent_dim, num_samples=3, seq_length=400)
print("Done")

Done


### Results
While our generated music wont sound like any of our training data, our baseline would have to be a sample of our training data. To evaluate what a "good" output is, we would need to perform a Subjective Listening Test, either using a mean opinion score. Listeners would have to rate the quality of the composition in comparison to the training data, or a related composition.

# Task 1b - 2D Variational Autoencoder

### Introduction
One major disadvantage of the previous model is its dimensionality. Attempting to generate 400 notes results in an input size of 102,400 Nodes! This can be mitigated by using a 2-D VAE. Since our piano rolls are 2-dimensional (where rows are pitches and columns are time intervals), we can instead opt to create an encoder with <code>Conv2d</code>. Our node size decreases by half, and our runtimes are improved. For our first model, an epoch would take roughly 3 minutes to complete. Our new model takes roughly 37 seconds per epoch.

### Parameters, Arguments, and Variables
The following parameters and their selected arguments have changed compared to the first model:

| Parameter | Description | Argument |
| --------- | ----------- | -------- |
| `input_dim` | Deprecated, is now `self._to_linear` | 256 * 8 * 25 |

This model also chaged the following variable values:

| Variable | Description | Value |
| -------- | ----------- | ----- |
| `num_epochs` | Number of epochs in the loss function | 35 `or` early stopping |
| `threshold` | Probability threshold to qualify for candidacy | 0.04 |

In [7]:
# Steps 1, 2, 4, and 7 are unmodified, and are not reproduced here for simplicity

# 3. Define 2D Convolutional Encoder with BatchNorm and Dropout
class ConvEncoder(nn.Module):
    def __init__(self, latent_dim, dropout_prob=0.3):
        super().__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(2, 32, kernel_size=(3, 3), stride=2, padding=1),  # Output: (32, 64, 200)
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Dropout(dropout_prob),

            nn.Conv2d(32, 64, kernel_size=(3, 3), stride=2, padding=1), # (64, 32, 100)
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Dropout(dropout_prob),

            nn.Conv2d(64, 128, kernel_size=(3, 3), stride=2, padding=1), # (128, 16, 50)
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Dropout(dropout_prob),

            nn.Conv2d(128, 256, kernel_size=(3, 3), stride=2, padding=1), # (256, 8, 25)
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Dropout(dropout_prob),
        )

        # Compute the flattened size after convolutions
        self._to_linear = 256 * 8 * 25  # based on above dimensions

        self.fc_mu = nn.Linear(self._to_linear, latent_dim)
        self.fc_logvar = nn.Linear(self._to_linear, latent_dim)

    def forward(self, x):
        batch_size = x.shape[0]
        x = self.conv_layers(x)  # shape: (batch, 256, 8, 25)
        x = x.view(batch_size, -1)  # flatten
        mu = self.fc_mu(x)
        logvar = self.fc_logvar(x)
        return mu, logvar

class Decoder(nn.Module):
    def __init__(self, latent_dim, hidden_dim, output_dim, seq_length=400):
        super().__init__()
        self.seq_length = seq_length
        self.fc1 = nn.Linear(latent_dim, hidden_dim)
        self.output_dim = output_dim
        # Split into two heads: one for pitch, one for duration
        self.fc_pitch = nn.Linear(hidden_dim, 128 * seq_length)  # pitch output
        self.fc_duration = nn.Linear(hidden_dim, 128 * seq_length)  # duration output
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, z):
        h = self.relu(self.fc1(z))
        pitch_logits = self.fc_pitch(h)
        duration_logits = self.fc_duration(h)
        # reshape to (batch, channels=2, pitch=128, time=seq_length)
        pitch_logits = pitch_logits.view(-1, 128, self.seq_length)
        duration_logits = duration_logits.view(-1, 128, self.seq_length)
        # Sigmoid for pitch (binary presence)
        pitch_probs = self.sigmoid(pitch_logits)
        return pitch_probs, duration_logits

class MusicVAE(nn.Module):
    def __init__(self, hidden_dim, latent_dim, seq_length):
        super().__init__()
        self.seq_length = seq_length
        self.encoder = ConvEncoder(latent_dim)
        # Keep your decoder as is or modify similarly
        self.decoder = Decoder(latent_dim, hidden_dim, output_dim=2*128*seq_length, seq_length=seq_length)
    
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        mu, logvar = self.encoder(x)
        z = self.reparameterize(mu, logvar)
        pitch_probs, duration_logits = self.decoder(z)
        return pitch_probs, duration_logits, mu, logvar

# 5. Setup training
midi_files = (glob("mid/0/0/*.mid") +
              glob("mid/0/1/*.mid") + 
              glob("mid/0/2/*.mid") + 
              glob("mid/0/3/*.mid") + 
              glob("mid/0/4/*.mid"))
dataset = MidiDataset(midi_files, seq_length=400)
dataloader = DataLoader(dataset, batch_size=250, shuffle=True, num_workers=0)

input_dim = 2 * 128 * 400  # 2 channels: piano roll + duration
hidden_dim = 2048
latent_dim = 128

model = MusicVAE(hidden_dim, latent_dim, seq_length=400)
optimizer = optim.Adam(model.parameters(), lr=1e-3)

print("Start Training")
t1 = time.time()
best_loss = float('inf')
loss_increased = 0

# To store the best model
best_model_state = None

num_epochs = 35
for epoch in range(num_epochs):
    total_loss = 0
    for batch in dataloader:
        optimizer.zero_grad()
        pitch_probs, duration_logits, mu, logvar = model(batch)
        loss = loss_function(pitch_probs, duration_logits, batch.view(batch.size(0), 2, 128, -1), mu, logvar)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    avg_loss = total_loss / len(dataloader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss}")

    if avg_loss < best_loss:
        best_loss = avg_loss
        best_model_state = model.state_dict()
    else:
        loss_increased = 1
    if loss_increased:
        print(f"Early stopping triggered after epoch {epoch+1}")
        break

t2 = time.time()
print(f"Finished Training in {round(t2 - t1, 2)}s")

# Load the best model after training
if best_model_state is not None:
    model.load_state_dict(best_model_state)

Start Training




Epoch 1/35, Loss: 70979369.0
Epoch 2/35, Loss: 11610682.6
Epoch 3/35, Loss: 4469826.95
Epoch 4/35, Loss: 4029529.2
Epoch 5/35, Loss: 12916944.1
Early stopping triggered after epoch 5
Finished Training in 222.45s


### Results
To improve compositional clarity, sampled single notes are evaluated in the C-major scale.

While the loss function results in a greater value than the previous model, the MIDI files generated sound clearer and more structurally sound. This can be thought of as reducing overfitting on the training dataset to better generalize to unseen music.

In [12]:
import numpy as np
import pretty_midi

def generate_music_with_repetition_control(model, latent_dim, num_samples=1, seq_length=400, max_notes_per_time=3):
    threshold= 0.04
    max_history = 4  # number of previous notes to compare
    with torch.no_grad():
        z = torch.randn(num_samples, latent_dim)
        pitch_probs, duration_logits = model.decoder(z)
        pitch_probs = pitch_probs.cpu().numpy()
        duration_logits = duration_logits.cpu().numpy()

        for i in range(num_samples):
            pm = pretty_midi.PrettyMIDI()
            instrument = pretty_midi.Instrument(program=0)
            dur_pred = duration_logits[i]
            dur_pred = np.clip(dur_pred, 0, 1)

            previous_notes = [[60]]

            start_time = 0
            for t in range(seq_length):
                # Retrieve pitch probabilities at time t
                pitch_probs_t = pitch_probs[i, :, t]
                total_prob = np.sum(pitch_probs_t)
                pitch_probs_t = pitch_probs_t / total_prob
                # Candidate pitches above threshold
                candidate_pitches = np.where(pitch_probs_t > threshold)[0].tolist()
                candidate_probs = pitch_probs_t[candidate_pitches]
                # If no candidate pitches, shift last pitch
                if len(candidate_pitches) == 0:
                    last_pitch = previous_notes[-1][-1]
                    shifted_pitch = pitch_shift_to_c_major(last_pitch)
                    selected_pitches = [shifted_pitch]
                elif len(candidate_pitches) >= max_notes_per_time:
                    # Select top 'max_notes_per_time' pitches based on probability
                    # Get indices of top probabilities
                    top_indices = np.argsort(candidate_probs)[-max_notes_per_time:][::-1]
                    selected_pitches = [candidate_pitches[idx] for idx in top_indices]
                else:
                    # Random choice among candidates
                    choose_idx = np.random.choice(len(candidate_pitches))
                    choose_pitch = candidate_pitches[choose_idx]
                    # shifted_pitch = pitch_shift_to_c_major(choose_pitch)
                    selected_pitches = [choose_pitch]

                offset = 0
                # Check for repetition
                if all(selected_pitches == i for i in previous_notes):
                    offset = 2

                # Append notes to MIDI
                end_time = start_time
                for p in selected_pitches:
                    note_duration = dur_pred[p, t]
                    if note_duration == 0:
                        note_duration = 0.25
                    end_time = start_time + note_duration
                    note = pretty_midi.Note(velocity=100, pitch=p + offset, start=start_time, end=end_time)
                    instrument.notes.append(note)

                # Prepare for next iteration
                if all(selected_pitches == i for i in previous_notes):
                    selected_pitches = np.array(selected_pitches) + offset
                    selected_pitches = list(selected_pitches)

                previous_notes.append(selected_pitches)
                if len(previous_notes) > max_history:
                    previous_notes = previous_notes[-max_history:]
                start_time = end_time

            pm.instruments.append(instrument)
            pm.write(f"music/2d_VAE_{i}.mid")

# Usage (assuming model and functions are defined)
generate_music_with_repetition_control(model, latent_dim, num_samples=3, seq_length=400)
print("Done")

Done


# Task 2 - Markov Chains

### Introduction
Our second model continues our use of MIDI files, this time for symbolic conditioned generation. This time, the model aims to generate music given a music scale, as well as a keyword to determine a chord progression to be used. The goal of this model will be to generate music that harmonizes; one that generates notes and chords following a scale pattern. For example, the model can generate a "vi IV I V F Major Hopeful" composition. We are also able to train our model using the entirety of the PDMX dataset, as well as another publicly available dataset: Maestro. Maestro contains over 1000 MIDI files that are longer and more robust than those of PDMX. Most importantly, these compositions contain very few chords, giving us more features related to single note generation.

### Data Cleaning
Given that over 250,000 files were used to train our model, caching results was critical to improve consecutive runtimes. 14 different JSON files are used, each taking over half an hour to create, but  only seconds to load in. For this model, MIDITok and symusic were the primary packages used to process the data. No files were left out, as these packages were able to properly extract all the necessary features.

In [None]:
# import required packages
import os
import time

import json
import ast
import random
from glob import glob
from collections import defaultdict

import numpy as np
from numpy.random import choice

from symusic import Score
from miditok import REMI, TokenizerConfig
from midiutil import MIDIFile

In [None]:
random.seed(42)
start = time.time()

### Model Description
The algorithm I chose to implement for this model was an expanded Markov Chain, using n-gram probabilities to generate both pitch and duration for each note. The model determines the next note to play based on a random choice, influenced by the probabilities of the trigrams. Simply put: this algorithm predicts the next musical element based on the current state and its transitional probabilities. The goal of a Markov chain is to increase stylistic coherence. This model differs from the VAE used in Task 1a and 1b as there is no loss function. This becomes a problem as Markov chains can follow a pattern that diverges from the original composition. To prevent this, a few constraints are included. First, since most of the music is in 4/4 time scale, we constrain the generations to beats that conform to this time scale. Additionally, notes not belonging to the given input scale are not used. Finally, keywords determine chord progression that plays on every first beat of a scale. All of these features improve the structural coherence of the composition. Advantages of this model are its ability to train on larger datasets with minimal memory usage, while a disadvantage is its simplicity limits how accurate, coherent, or audibly appealing a composition can become.

Similar to our previous task model, our generated music will be evaluated through Subjective Learning tests. Additionally, since this model is simpler and can possibly generate more coherent music by following more basic conditions, we can also determine audio diversity, and generate an Inception Score from our training data, or from an outside composition. During comparison, features such as chord similarity would improve our score. Baselines for this task would include whether the generated composition effectively follows a sequence of notes, or if its distribution of chords is similar to the training set.

**Warning:** Due to GitHub's file size restrictions, <code>beat_extractions.json</code> (466 MB) and <code>note_extractions.json</code> (231 MB) are not pushed to the project page. Additionally, <code>mid</code>(1.79 GB), the folder that contains all 250,000+ MIDI files, is available for download at https://github.com/pnlong/PDMX/.

In [None]:
# Collect MIDI files
midi_files = []
for i in range(18):
    for j in range(58):
        mids = glob(f"mid/{i}/{j}/*.mid")
        midi_files += mids
for i in range(10):
    mids = glob(f"maestro-v3.0.0/{i}/*.midi")
    midi_files += mids
    
config = TokenizerConfig(num_velocities=1, use_chords=False, use_programs=False)
tokenizer = REMI(config)
    
# Check if the tokenizer already exists
if os.path.exists("cache/tokenizer.json"):
    tokenizer.from_pretrained("cache/tokenizer.json")
else:
    tokenizer.train(vocab_size=1000, files_paths=midi_files)
    tokenizer.save("cache/tokenizer.json")
    
tok_time = time.time()
print(f"Task finished in {round(tok_time - start, 2)}s")

In [None]:
# e.g.:
midi = Score(midi_files[0])
tokens = tokenizer(midi)[0].tokens
tokens[:10]

### Q1

In [None]:
#Q1
def note_extraction(midi_file):
    # Q1a: Your code goes here
    midi = Score(midi_file)
    tokens = tokenizer(midi)[0].tokens
    valid = [i for i in tokens if "Pitch" in i]
    nums = [int(j.split("_")[-1]) for j in valid]

    return nums

def note_frequency(midi_files):
    x = 0
    CACHE_FILE = 'cache/note_frequencies.json'
    if os.path.exists(CACHE_FILE):
        with open(CACHE_FILE, 'r') as f:
            content = f.read()
        x = ast.literal_eval(content)
    if x:
        return x
        
    # Q1b: Your code goes here
    freqs = defaultdict(int)
    for i in midi_files:
        nums = note_extraction(i)
        for j in nums:
            freqs[j] += 1
            
    with open(CACHE_FILE, 'w') as f:
        f.write(repr(dict(freqs)))
            
    return dict(freqs)

CACHE_FILE_3 = 'cache/note_extractions.json'
if os.path.exists(CACHE_FILE_3):
    with open(CACHE_FILE_3, 'r') as f:
        note_extractions = json.load(f)
else:
    note_extractions = [note_extraction(i) for i in midi_files]
    with open(CACHE_FILE_3, 'w') as f:
        json.dump(note_extractions, f)

### Q2

In [None]:
#Q2
def note_unigram_probability(midi_files):
    note_counts = note_frequency(midi_files)
    unigramProbabilities = note_counts.copy()
    vals = 0

    # Q2: Your code goes here
    for i in unigramProbabilities.values():
        vals += i
    for i in unigramProbabilities:
        unigramProbabilities[i] /= vals
    
    return unigramProbabilities

### Q3

In [None]:
#Q3
def note_bigram_probability(midi_files):    
    x, y = 0, 0
    CACHE_FILE = 'cache/bigram_transitions.json'
    if os.path.exists(CACHE_FILE):
        with open(CACHE_FILE, 'r') as f:
            content = f.read()
        x = ast.literal_eval(content)
    
    CACHE_FILE_2 = 'cache/bigram_transition_probabilities.json'
    if os.path.exists(CACHE_FILE_2):
        with open(CACHE_FILE_2, 'r') as f:
            content = f.read()
        y = ast.literal_eval(content)
        
    if x and y:
        return x, y

    bigramTransitions = defaultdict(list)
    bigramTransitionProbabilities = defaultdict(list)
    

    for files in note_extractions:
        for i in range(len(files) - 1):
            first = files[i]
            last = files[i + 1]
            bigramTransitions[first].append(last)

    for key, vals in bigramTransitions.items():
        counts = defaultdict(int)
        for note in vals:
            counts[note] += 1
        total = sum(counts.values())
        probs = []
        uniques = []
        for note, count in counts.items():
            uniques.append(note)
            probs.append(count / total)
        bigramTransitionProbabilities[key] = probs
        bigramTransitions[key] = uniques
        
    with open(CACHE_FILE, 'w') as f:
        f.write(repr(dict(bigramTransitions)))
    with open(CACHE_FILE_2, 'w') as f:
        f.write(repr(dict(bigramTransitionProbabilities)))

    return bigramTransitions, bigramTransitionProbabilities
        
x, y = note_bigram_probability(midi_files)
def sample_next_note(note):
    # Q3b: Your code goes here
    max_value = max(y[note])
    max_index = y[note].index(max_value)
    
    return x[note][max_index]

### Q5

In [None]:
#Q5
def note_trigram_probability(midi_files):
    x, y = 0, 0
    CACHE_FILE = 'cache/trigram_transitions.json'
    if os.path.exists(CACHE_FILE):
        with open(CACHE_FILE, 'r') as f:
            content = f.read()
        x = ast.literal_eval(content)
    
    CACHE_FILE_2 = 'cache/trigram_transition_probabilities.json'
    if os.path.exists(CACHE_FILE_2):
        with open(CACHE_FILE_2, 'r') as f:
            content = f.read()
        y = ast.literal_eval(content)
        
    if x and y:
        return x, y

    trigramTransitions = defaultdict(list)
    trigramTransitionProbabilities = defaultdict(list)

    # Q5a: Your code goes here
    trigramTransitionCounts = defaultdict(lambda: defaultdict(int))
    totalCounts = defaultdict(int)

    for files in note_extractions:
        files = np.array(files)
        if len(files) < 3:
            continue
        for i in range(2, len(files)):
            prev2 = files[i - 2]
            prev1 = files[i - 1]
            curr = files[i]
            key = (prev2, prev1)
            trigramTransitionCounts[key][curr] += 1
            totalCounts[key] += 1

    for key in trigramTransitionCounts:
        total = totalCounts[key]
        next_notes = list(trigramTransitionCounts[key].keys())
        probs = [trigramTransitionCounts[key][n] / total for n in next_notes]
        trigramTransitions[key] = next_notes
        trigramTransitionProbabilities[key] = probs

    with open(CACHE_FILE, 'w') as f:
        f.write(repr(dict(trigramTransitions)))
    with open(CACHE_FILE_2, 'w') as f:
        f.write(repr(dict(trigramTransitionProbabilities)))
    
    return trigramTransitions, trigramTransitionProbabilities

### Q6

In [None]:
#Q6
duration2length = {
    "0.1.8": 1, 
    "0.2.8": 2, # sixteenth note, 0.25 beat in 4/4 time signature
    "0.3.8": 3,
    "0.4.8": 4, # eighth note, 0.5 beat in 4/4 time signature
    "0.5.8": 5,
    "0.6.8": 6,
    "0.7.8": 7, 
    "1.0.8": 8, # quarter note, 1 beat in 4/4 time signature
    "1.1.8": 9, 
    "1.2.8": 10, 
    "1.3.8": 11, 
    "1.4.8": 12, 
    "1.5.8": 13, 
    "1.6.8": 14, 
    "1.7.8": 15, 
    "2.0.8": 16, # half note, 2 beats in 4/4 time signature
    "2.1.8": 17, 
    "2.2.8": 18, 
    "2.3.8": 19, 
    "2.4.8": 20, 
    "2.5.8": 21, 
    "2.6.8": 22, 
    "2.7.8": 23, 
    "3.0.8": 24, 
    "3.1.8": 25, # 3 beats and 1/8 of a beat
    "3.2.8": 26, 
    "3.3.8": 27, 
    "3.4.8": 28, 
    "3.5.8": 29, 
    "3.6.8": 30, 
    "3.7.8": 31, 
    "4.0.4": 32, # whole note, 4 beats in 4/4 time signature
    "4.1.4": 34, 
    "4.2.4": 36, 
    "4.3.4": 38, # 4 beats and 3/4 of a beat
    "5.0.4": 40, 
    "5.1.4": 42, 
    "5.2.4": 44, 
    "5.3.4": 46, 
    "6.0.4": 48, 
    "6.1.4": 50, 
    "6.2.4": 52, 
    "6.3.4": 54, 
    "7.0.4": 56, 
    "7.1.4": 58, 
    "7.2.4": 60, 
    "7.3.4": 62, 
    "8.0.4": 64, 
    "8.1.4": 66, 
    "8.2.4": 68, 
    "8.3.4": 70, 
    "9.0.4": 72, 
    "9.1.4": 74, 
    "9.2.4": 76, 
    "9.3.4": 78, 
    "10.0.4": 80, 
    "10.1.4": 82, 
    "10.2.4": 84, 
    "10.3.4": 86, 
    "11.0.4": 88, 
    "11.1.4": 90, 
    "11.2.4": 92, 
    "11.3.4": 94, 
    "12.0.4": 96, 
}

def beat_extraction(midi_file):
    # Q6: Your code goes here
    midi = Score(midi_file)
    tokens = tokenizer(midi)[0].tokens
    pos_dur = [i.split("_")[-1] for i in tokens if "Position" in i or "Duration" in i]
    pos_len = [duration2length[i] if i in duration2length.keys() else int(i) for i in pos_dur]
    beats = list(zip(pos_len[::2], pos_len[1::2]))
    return beats

CACHE_FILE = "cache/beat_extractions.json"
if os.path.exists(CACHE_FILE):
    with open(CACHE_FILE, 'r') as f:
        content = json.load(f)
    beat_extractions = []
    for i in content:
        beats = []
        for j, k in i:
            beats.append(tuple([int(j), int(k)]))
        beat_extractions.append(beats)
    content = 0
else:
    beat_extractions = [beat_extraction(i) for i in midi_files]

### Q7

In [None]:
#Q7
def beat_bigram_probability(midi_files):
    x, y = 0, 0
    CACHE_FILE = 'cache/bigram_beat_transitions.json'
    if os.path.exists(CACHE_FILE):
        with open(CACHE_FILE, 'r') as f:
            content = f.read()
        x = ast.literal_eval(content)
    
    CACHE_FILE_2 = 'cache/bigram_beat_transition_probabilities.json'
    if os.path.exists(CACHE_FILE_2):
        with open(CACHE_FILE_2, 'r') as f:
            content = f.read()
        y = ast.literal_eval(content)
        
    if x and y:
        return x, y
        
    bigramBeatTransitions = defaultdict(list)
    bigramBeatTransitionProbabilities = defaultdict(list)

    # Q7: Your code goes here
    transition_counts = defaultdict(lambda: defaultdict(int))

    for beats in beat_extractions:
        beat_lengths = [length for (_, length) in beats]

        for i in range(len(beat_lengths) - 1):
            prev_length = beat_lengths[i]
            curr_length = beat_lengths[i + 1]
            transition_counts[prev_length][curr_length] += 1

    for prev_length, next_lengths_counts in transition_counts.items():
        total = sum(next_lengths_counts.values())
        probabilities = []
        next_lengths = []
        for length, count in next_lengths_counts.items():
            next_lengths.append(length)
            probabilities.append(count / total)
        bigramBeatTransitions[prev_length] = next_lengths
        bigramBeatTransitionProbabilities[prev_length] = probabilities

    with open(CACHE_FILE, 'w') as f:
        f.write(repr(dict(bigramBeatTransitions)))
    with open(CACHE_FILE_2, 'w') as f:
        f.write(repr(dict(bigramBeatTransitionProbabilities)))

    return bigramBeatTransitions, bigramBeatTransitionProbabilities

### Q8

In [None]:
#Q8
def beat_pos_bigram_probability(midi_files):
    x, y = 0, 0
    CACHE_FILE = 'cache/bigram_beat_pos_transitions.json'
    if os.path.exists(CACHE_FILE):
        with open(CACHE_FILE, 'r') as f:
            content = f.read()
        x = ast.literal_eval(content)
    
    CACHE_FILE_2 = 'cache/bigram_beat_pos_transition_probabilities.json'
    if os.path.exists(CACHE_FILE_2):
        with open(CACHE_FILE_2, 'r') as f:
            content = f.read()
        y = ast.literal_eval(content)
        
    if x and y:
        return x, y
        
    bigramBeatPosTransitions = defaultdict(list)
    bigramBeatPosTransitionProbabilities = defaultdict(list)

    # Q8a: Your code goes here
    transition_counts = defaultdict(lambda: defaultdict(int))

    for beats in beat_extractions:
        for i in range(len(beats)):
            curr_pos, curr_length = beats[i]
            transition_counts[curr_pos][curr_length] += 1
    
    for pos, next_lengths_counts in transition_counts.items():
        total = sum(next_lengths_counts.values())
        next_lengths = []
        probs = []
        for length, count in next_lengths_counts.items():
            next_lengths.append(length)
            probs.append(count / total)
        bigramBeatPosTransitions[pos] = next_lengths
        bigramBeatPosTransitionProbabilities[pos] = probs

    with open(CACHE_FILE, 'w') as f:
        f.write(repr(dict(bigramBeatPosTransitions)))
    with open(CACHE_FILE_2, 'w') as f:
        f.write(repr(dict(bigramBeatPosTransitionProbabilities)))
    
    return bigramBeatPosTransitions, bigramBeatPosTransitionProbabilities

### Q9

In [None]:
#Q9
def beat_trigram_probability(midi_files):
    x, y = 0, 0
    CACHE_FILE = 'cache/trigram_beat_transitions.json'
    if os.path.exists(CACHE_FILE):
        with open(CACHE_FILE, 'r') as f:
            content = f.read()
        x = ast.literal_eval(content)
    
    CACHE_FILE_2 = 'cache/trigram_beat_transition_probabilities.json'
    if os.path.exists(CACHE_FILE_2):
        with open(CACHE_FILE_2, 'r') as f:
            content = f.read()
        y = ast.literal_eval(content)
        
    if x and y:
        return x, y
    
    trigramBeatTransitions = defaultdict(list)
    trigramBeatTransitionProbabilities = defaultdict(list)
    trigramTransitionCounts = defaultdict(lambda: defaultdict(int))
    totalCounts = defaultdict(int)

    for midi_file in midi_files:
        beats = beat_extraction(midi_file)
        # Need at least 3 beats to form a trigram
        for i in range(1, len(beats)):
            prev1_pos, prev1_len = beats[i - 1]
            curr_pos, curr_len = beats[i]
            key = (prev1_len, curr_pos)
            trigramTransitionCounts[key][curr_len] += 1
            totalCounts[key] += 1

    # Convert counts to probabilities
    for key in trigramTransitionCounts:
        total = totalCounts[key]
        next_lengths = list(trigramTransitionCounts[key].keys())
        probs = [trigramTransitionCounts[key][n] / total for n in next_lengths]
        trigramBeatTransitions[key] = next_lengths
        trigramBeatTransitionProbabilities[key] = probs

    with open(CACHE_FILE, 'w') as f:
        f.write(repr(dict(trigramBeatTransitions)))
    with open(CACHE_FILE_2, 'w') as f:
        f.write(repr(dict(trigramBeatTransitionProbabilities)))

    return trigramBeatTransitions, trigramBeatTransitionProbabilities

In [None]:
test_beats = [2,4,8]
your_transition, your_probability = beat_trigram_probability(midi_files)
yours = []
for note in test_beats:
    index = your_transition[(4, 0)].index(note)
    yours.append(your_probability[(4, 0)][index])

### Q10

In [None]:
def music_generate(length= 500, scale= "C", mode= "Major", keyword= "Hopeful"):
    # Map scale name to root note MIDI number (assuming C4 = 60)
    root_notes = {
        "C": 60, "C#": 61,
        "Db": 61, "D": 62, "D#": 63,
        "Eb": 63, "E": 64,
        "F": 65, "F#": 66,
        "Gb": 66, "G": 67, "G#": 68,
        "Ab": 68, "A": 69, "A#": 70,
        "Bb": 70, "B": 71
    }
    # Major and minor scale intervals in semitones
    scale_intervals = {
        "Major": [0, 2, 4, 5, 7, 9, 11],
        "Minor": [0, 2, 3, 5, 7, 8, 10]
    }

    # Get the root MIDI note number
    root_note = root_notes.get(scale, 60)  # Default to C if not found
    # Get the intervals for the specified mode
    intervals = scale_intervals.get(mode, scale_intervals["Major"])
    # Calculate the scale notes (one octave)
    scale_notes = {(root_note + interval) % 12 for interval in intervals}
    
    # Map for specific chords for Hopeful
    chord_map = {
    0: [33, 57, 60, 64],
    1: [41, 57, 60, 65],
    2: [36, 55, 60, 64],
    3: [31, 55, 59, 62],
    }
    
    # Shift chord notes by the scale root offset
    def shift_chord(chord_notes, offset):
        return [(note + offset) for note in chord_notes]

    # Compute the offset
    scale_root_mod = root_note % 12
    # Apply shift to each chord
    shifted_chords = {key: shift_chord(notes, scale_root_mod) for key, notes in chord_map.items()}

    # List of chords for progression
    chords = [shifted_chords[name] for name in [0, 1, 2, 3]]
    allowed_notes = scale_notes

    # sample notes
    unigramProbabilities = note_unigram_probability(midi_files)
    bigramTransitions, bigramTransitionProbabilities = note_bigram_probability(midi_files)
    trigramTransitions, trigramTransitionProbabilities = note_trigram_probability(midi_files)

    # Q10: Your code goes here ...
    sampled_notes = []

    # sample beats
    sampled_beats = []

    bigramBeatTransitions, bigramBeatTransitionProbabilities = beat_bigram_probability(midi_files)
    bigramBeatPosTransitions, bigramBeatPosTransitionProbabilities = beat_pos_bigram_probability(midi_files)
    trigramBeatTransitions, trigramBeatTransitionProbabilities = beat_trigram_probability(midi_files)

    notes_list = list(unigramProbabilities.keys())
    probs = list(unigramProbabilities.values())

    # Function to filter notes based on scale
    def filter_notes(note_candidates):
        if allowed_notes is None:
            return note_candidates
        else:
            return [n for n in note_candidates if n % 12 in allowed_notes]

    # Initialize current note
    current_note = np.random.choice(notes_list, p=probs)
    current_note_candidates = filter_notes([current_note])
    if current_note_candidates:
        current_note = np.random.choice(current_note_candidates)
    else:
        # fallback if no notes in scale
        current_note = np.random.choice(notes_list, p=probs)

    prev_note1 = current_note  # for bigram
    prev_note2 = None  # for trigram
    prev_beat2 = None
    prev_beat1 = None

    # Initialize beat length
    beat_length = np.random.choice(list(duration2length.values()))
    current_beat_pos = 0

    count_notes = 0
    while count_notes < length:
        # Generate next note using trigram model if possible
        if prev_note2 is not None:
            key = (prev_note2, prev_note1)
            if key in trigramTransitions:
                next_notes = trigramTransitions[key]
                probs_trigram = trigramTransitionProbabilities[key]
                # Filter next notes based on scale
                filtered_next_notes = filter_notes(next_notes)
                if filtered_next_notes:
                    probs_filtered = [p for n, p in zip(next_notes, probs_trigram) if n in filtered_next_notes]
                    next_note = np.random.choice(filtered_next_notes, p=np.array(probs_filtered)/sum(probs_filtered))
                else:
                    # fallback to unigram if no notes in scale
                    next_note = np.random.choice(notes_list, p=list(unigramProbabilities.values()))
            else:
                # Fallback to unigram
                next_note = np.random.choice(notes_list, p=list(unigramProbabilities.values()))
        else:
            # If only one previous note, fallback to bigram
            next_note = np.random.choice(notes_list, p=list(unigramProbabilities.values()))
            
        # Generate beat length conditioned on previous two beats
        if prev_beat2 is not None and prev_beat1 is not None:
            key = (prev_beat2, prev_beat1)
            if key in trigramBeatTransitions:
                next_lengths = list(trigramBeatTransitions[key])
                probs = trigramBeatTransitionProbabilities[key]
                beat_length = np.random.choice(next_lengths, p=probs)
            elif prev_beat1 in bigramBeatTransitions:
                # fallback to bigram
                next_lengths = list(bigramBeatTransitions[prev_beat1])
                probs = bigramBeatTransitionProbabilities[prev_beat1]
                beat_length = np.random.choice(next_lengths, p=probs)
            else:
                # fallback to random
                beat_length = np.random.choice(list(duration2length.values()))
        else:
            # fallback
            beat_length = np.random.choice(list(duration2length.values()))
            
        # Update previous two beats
        prev_beat2 = prev_beat1
        prev_beat1 = beat_length

        # Append note info
        sampled_notes.append(next_note)
        sampled_beats.append(beat_length)

        # Update previous notes for trigram model
        prev_note2 = prev_note1
        prev_note1 = next_note

        # Update beat position
        current_beat_pos += beat_length
        if current_beat_pos >= 32:
            # Reset to start of next bar
            current_beat_pos = 0

        count_notes += 1

    # Ensure beat length is at least 1
    sampled_beats = [i if i > 1 else i + 1 for i in sampled_beats]
    sampled_beats = [i if (i == 1 or i % 2 == 0) else i-1 for i in sampled_beats]
    sampled_beats = [i if (i < 4 or i % 4 == 0)  else i-2 for i in sampled_beats]
    sampled_beats = [i if (i < 8 or i % 8 == 0)  else i-4 for i in sampled_beats]
    sampled_beats = [i if (i < 16 or i % 16 == 0)  else i-2 for i in sampled_beats]

    # Convert sampled notes and beats into MIDI
    midi = MIDIFile(1)
    track = 0
    time = 0  # start at beat 0
    midi.addTrackName(track, time, "Generated Music")
    midi.addTempo(track, time, 120)

    current_time = 0
    total_time = 0
    current_pos = 0  # beat position within the bar

    for note, beat_len in zip(sampled_notes, sampled_beats):
        # Convert beat beat_len to MIDI duration (divide by 8)
        duration = beat_len / 8.0
        # Add note: (track, channel, pitch, time, duration)
        midi.addNote(track, 0, note, current_time, duration, 100)

        # Increment time: move to next beat position
        current_pos += beat_len
        if current_pos >= 32:
            # Reset position at the start of new bar
            current_pos = 0
            current_time += duration  # move to next bar
        else:
            current_time += duration

    # Insert chords at the start, each as a whole note (duration=4 beats)
    chord_time = 0
    chord_duration = 4  # whole note
    while (current_time - chord_time) // 16 >= 1 :  
        for chord_notes in chords:
            for note in chord_notes:
                midi.addNote(track, 0, note, chord_time, chord_duration, 100)
            # Move time forward by chord duration
            chord_time += chord_duration
            
    # Save MIDI file
    with open(f"music/test/{scale}{mode}.mid", "wb") as f:
        midi.writeFile(f)

    print(f"Generated music in {scale} {mode} scale")

## Tests

In [None]:
def testQ1a():
    yours = note_extraction(midi_files[0])
    print(yours)

In [None]:
t1_start = time.time()
testQ1a()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

In [None]:
def testQ1b():
    yours = note_frequency(midi_files)
    print(yours)

In [None]:
t1_start = time.time()
testQ1b()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

In [None]:
def testQ2():
    yours = note_unigram_probability(midi_files)
    print(yours)

In [None]:
t1_start = time.time()
testQ2()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

In [None]:
def testQ3a():
    your_transition, your_probability = note_bigram_probability(midi_files)
    print(your_transition[74]) # Example
    print(your_probability[74])

In [None]:
t1_start = time.time()
testQ3a()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

In [None]:
def testQ3b():
    test_notes = [92, 35, 54] # some notes that have only one possible next note
    yours = []
    correct = []
    for note in test_notes:
        yours.append(sample_next_note(note))

    print(yours)

In [None]:
t1_start = time.time()
testQ3b()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

In [None]:
def testQ4():
    test_file = midi_files[0]
    yours = [note_bigram_perplexity(test_file)]
    print(yours)

In [None]:
t1_start = time.time()
testQ4()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

In [None]:
def testQ5a():
    test_notes = [71,72,73]
    your_transition, your_probability = note_trigram_probability(midi_files)
    return your_transition, your_probability
    # print(your_transition)
    # print(your_probability)

In [None]:
t1_start = time.time()
t1, t2 = testQ5a()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

In [None]:
def testQ5b():
    test_file = midi_files[0]
    yours = [note_trigram_perplexity(test_file)]
    print(yours)

In [None]:
t1_start = time.time()
testQ5b()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

In [None]:
def testQ6():
    test_files = midi_files[:5]
    yours = []
    for file in test_files:
        beats = beat_extraction(file)
        yours += [beat[0] for beat in beats]
        yours += [beat[1] for beat in beats]

    print(yours)

In [None]:
t1_start = time.time()
testQ6()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

In [None]:
# CACHE_FILE_3 = 'cache/beat_extractions.json'
# with open(CACHE_FILE_3, 'w') as f:
#     json.dump(beat_extractions, f)

In [None]:
def testQ7():
    test_beats = [2,4,8]
    your_transition, your_probability = beat_bigram_probability(midi_files)
    yours = []
    correct = []
    for note in test_beats:
        index = your_transition[4].index(note)
        yours.append(your_probability[4][index])

    print(yours)

In [None]:
t1_start = time.time()
testQ7()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

In [None]:
def testQ8a():
    test_beats = [2,4,8]
    your_transition, your_probability = beat_pos_bigram_probability(midi_files)
    # print(your_transition, your_probability)
    yours = []
    for note in test_beats:
        print(your_transition[0])
        index = your_transition[0].index(note)
        print(index)
        yours.append(your_probability[0][index])

    print(yours)

In [None]:
t1_start = time.time()
testQ8a()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

In [None]:
def testQ8b():
    test_file = midi_files[0]
    yours = list(beat_bigram_perplexity(test_file))
    print(yours)

In [None]:
t1_start = time.time()
testQ8b()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

In [None]:
def testQ9a():
    test_beats = [2,4,8]
    your_transition, your_probability = beat_trigram_probability(midi_files)
    yours = []
    for note in test_beats:
        index = your_transition[(4, 0)].index(note)
        yours.append(your_probability[(4, 0)][index])

    print(yours)

In [None]:
t1_start = time.time()
testQ9a()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

In [None]:
def testQ9b():
    test_file = midi_files[0]
    yours = [beat_trigram_perplexity(test_file)]

    print(yours)

In [None]:
t1_start = time.time()
testQ9b()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

## Generate Music

In [None]:
# def testQ10():
#     for mode in ["Major", "Minor"]:
#         for scale in ["C", "C#", "D", "D#", "E", "F", "F#", "G", "G#", "A", "A#", "B"]:
#             finished = 0
#             while not finished:
#                 try:
#                     music_generate(500, scale, mode)
#                     finished = 1
#                 except Exception as e:
#                     print("Trying again")
def testQ10():
    music_generate(500, "F", "Major")

In [None]:
t1_start = time.time()
testQ10()
t1_end = time.time()
print(f"Test finished in {round(t1_end - t1_start, 2)}s")

# Analysis and Final Remarks

Based on the audio outputs from these models, 2-dimensional Variational Autoencoders have resulted in the most audibly appealing MIDI compositions. This model is also the fastest to generate music (given that caching is not used), making it the most effective model this project has developed. While the model has only been trained to generate symbolic musical compositions, VAEs are great for use in continuous audio generation, such as .wav formats. The implementation is similar in style to our models: take as input a melSpectrogram or audio file, encode into a set of parameters, decode back to a melSpectrogram and back to an audio format of choice. Perhaps in the near future this project will be updated to include samples of generated music in this manner.

Additional neural networks and deep learning models to try for symbolic music generation include Generative Adversarial Networks (GANs) and Long Short-Term Memory (LSTMs).

Finally, we should discuss how our datasets have been used in the past. With the increased calls for artist protection against AI copyright infringement, there needed to be useful copyright-free training data to train models while being up to date with modern music trends. 

PDMX has been used to train music generation models and for MIDI conversion applications. While VAEs and Markov Chains are useful for symbolic generation, others have opted to use LSTMs, CNNs, GANs, and HMMs. Results are difficult to compare from a mathematical perspective, but our music generation differs from others in that either chords or single notes were prevalent in most of the music; a combination of both was rarely ever observed.

MAESTRO is a dataset composed of over 200 hours of piano compositions. This data comes in both MIDI and wav formats. This dataset has been used for many ML algorithms related to remote piano composition judging.

**While this method of generating music is no longer the state of the art, the knowledge acquired from this project has vastly improved my understanding of machine learning concepts, and the implementation of Convolutional Neural Networks shows that I can effectively apply these skills to real-world situations.**