In [1]:
import pandas as pd
import tensorflow as tf
import numpy as np
from music21 import stream, chord
import os

In [2]:
df = pd.read_csv("chorales/train/chorale_000.csv")

In [3]:
df
# each row in the CSV is one time step in the chorale
# each column is one voice

Unnamed: 0,note0,note1,note2,note3
0,74,70,65,58
1,74,70,65,58
2,74,70,65,58
3,74,70,65,58
4,75,70,58,55
...,...,...,...,...
187,70,65,62,46
188,70,65,62,46
189,70,65,62,46
190,70,65,62,46


In [4]:
train_files = sorted([os.path.join("chorales", "train", f) for f in os.listdir(os.path.join("chorales", "train")) if f.endswith(".csv")])
test_files = sorted([os.path.join("chorales", "test", f) for f in os.listdir(os.path.join("chorales", "test")) if f.endswith(".csv")])
valid_files = sorted([os.path.join("chorales", "valid", f) for f in os.listdir(os.path.join("chorales", "valid")) if f.endswith(".csv")])

In [5]:
train_files

['chorales\\train\\chorale_000.csv',
 'chorales\\train\\chorale_001.csv',
 'chorales\\train\\chorale_002.csv',
 'chorales\\train\\chorale_003.csv',
 'chorales\\train\\chorale_004.csv',
 'chorales\\train\\chorale_005.csv',
 'chorales\\train\\chorale_006.csv',
 'chorales\\train\\chorale_007.csv',
 'chorales\\train\\chorale_008.csv',
 'chorales\\train\\chorale_009.csv',
 'chorales\\train\\chorale_010.csv',
 'chorales\\train\\chorale_011.csv',
 'chorales\\train\\chorale_012.csv',
 'chorales\\train\\chorale_013.csv',
 'chorales\\train\\chorale_014.csv',
 'chorales\\train\\chorale_015.csv',
 'chorales\\train\\chorale_016.csv',
 'chorales\\train\\chorale_017.csv',
 'chorales\\train\\chorale_018.csv',
 'chorales\\train\\chorale_019.csv',
 'chorales\\train\\chorale_020.csv',
 'chorales\\train\\chorale_021.csv',
 'chorales\\train\\chorale_022.csv',
 'chorales\\train\\chorale_023.csv',
 'chorales\\train\\chorale_024.csv',
 'chorales\\train\\chorale_025.csv',
 'chorales\\train\\chorale_026.csv',
 

In [6]:
# each chorale is a list of lists on integers
train_data = [pd.read_csv(f).values.tolist() for f in train_files]
test_data = [pd.read_csv(f).values.tolist() for f in test_files]
valid_data = [pd.read_csv(f).values.tolist() for f in valid_files]

36 = C1, 
81 = A5, 
0 = silence

In [7]:
chorale = train_data[20] # example

s = stream.Stream()
for row in chorale:
    # loop through each time step of the chorale and then each note
    # if the note isn't silence, add it to the chord, length 1 quarter note
    # add the chord to the musical stream
    s.append(chord.Chord([n for n in row if n], quarterLength=1))

# show the MIDI player
s.show("midi")

### Preprocessing

In [8]:
min_note, max_note = 36, 81

# window_size = 32 -> train model on sequences of 32 time steps
# window_offset = 16 -> how far we slide the window each time
# batch_size = 32 -> how many windows we'll train at once
window_size, window_offset, batch_size = 32, 16, 32

def make_xy(chorales):
    # shape: (num_windows, window_size + 1, num_voices)
    windows = [c[i:i + window_size + 1] 
               for c in chorales 
               for i in range(0, len(c) - window_size, window_offset)]
    
    data = np.array(windows, dtype=int)

    # np.where(condition, value_if_true, value_if_false)
    data = np.where(data == 0, 0, data - min_note + 1)
    # clip data between 0 and max_note - min_note + 1
    data = np.clip(data, 0, max_note - min_note + 1)

    # new shape (num_windows, (window_size + 1) * num_voices)
    # flattens so each chorale timestep row is joined
    flat = data.reshape(data.shape[0], -1) 

    # X and Y have shape (num_windows, (window_size + 1) * num_voices - 1)
    return flat[:, :-1], flat[:, 1:]

X_train, Y_train = make_xy(train_data)
X_test, Y_test = make_xy(test_data)
X_valid, Y_valid = make_xy(valid_data)

In [9]:
X_train.shape

(3111, 131)

### Training Model

In [11]:
from tensorflow.keras.models import Sequential  # linear stack of layers
from tensorflow.keras.layers import Conv1D, Dense, Embedding, LSTM, Dropout, BatchNormalization
# Conv1D → learns local sequence patterns (sliding window)
# Dense → fully connected layer
# Embedding → maps integer tokens (notes) to dense vectors
# LSTM → recurrent layer for long-term sequence dependencies
# Dropout → randomly drop neurons to reduce overfitting
# BatchNormalization → normalizes activations for stable training

from tensorflow.keras.optimizers import Nadam  # Adam + Nesterov momentum optimizer

In [13]:
model = Sequential()

# input_dim = 47 = 46 + the silence
# output_dim = 5 maps each note to a vector of length 5, instead of one hot encoding for a length 47
# 5 is smaller, more trainable, captures similarity between notes, e.g. octaves or 5ths
# input_shape = [None] means it can take a sequence of any length
model.add(Embedding(input_dim=47, output_dim=5, input_shape=[None]))

# 32 filters, output will have 32 channels per time step
# kernel_size=2 -> the filter looks at 2 consecutive time steps at a time
# padding="casual" -> ensures convolution only uses past and current time steps
# relu activation
model.add(Conv1D(32, kernel_size=2, padding="causal", activation="relu"))
# normalize the activations -> zero mean, unit variance, helps speed up convergence and prevent exploding gradients
model.add(BatchNormalization())

# dilation_rate=2 means the kenrel skips every other note [note_t, note_(t+2)]
# dilation=1 → sees 2 consecutive notes
# dilation=2 → sees 4 notes apart
# dilation=4 → sees 8 notes apart
# dilation=8 → sees 16 notes apart
# dilation=16 → sees 32 notes apart
model.add(Conv1D(48, kernel_size=2, padding="causal", activation="relu", dilation_rate=2))
model.add(BatchNormalization())
model.add(Conv1D(64, kernel_size=2, padding="causal", activation="relu", dilation_rate=4))
model.add(BatchNormalization())
model.add(Conv1D(96, kernel_size=2, padding="causal", activation="relu", dilation_rate=8))
model.add(BatchNormalization())
model.add(Conv1D(128, kernel_size=2, padding="causal", activation="relu", dilation_rate=16))
model.add(BatchNormalization())

# dropout 5% of the neurons during each training step
model.add(Dropout(0.05))
# 256 units -> size of the hidden state
# return_sequences=True -> returns output for every time step, not just the last
model.add(LSTM(256, return_sequences=True))
# 47 -> number of neurons in the Dense layer, output softmax probs for each note
model.add(Dense(47, activation="softmax"))
# final shape is (batch_size, seq_length, 47)

model.summary()

In [None]:
# ran this in google colab with a T4 GPU
optimizer = Nadam(learning_rate=1e-3)
model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"])
model.fit(X_train, Y_train, epochs=10, validation_data=[X_valid, Y_valid], batch_size=batch_size)

In [21]:
from tensorflow.keras.models import load_model
model = load_model("music_model.keras") # loaded the model ran on Google Colab

In [33]:
def sample_next_note(probs):
    # probs is 1D array of length 47, each element is probability
    probabilities = np.asarray(probs, dtype=float) # convert to numpy array

    prob_sum = probabilities.sum()

    if prob_sum <= 0 or not np.isfinite(prob_sum):
        # if the sum of probs is 0 or not finite (NaN or Inf), something went wrong
        # just pick highest prob note
        return int(np.argmax(probabilities))

    # normalize the probs to sum to 1 exactly
    probabilities /= prob_sum

    # randomly select index accoring to the probabilities, generates variation
    return np.random.choice(len(probabilities), p=probabilities)

In [34]:
def generate_chorale(model, seed_chords, length):
    # model -> trained neural network
    # seed_chords -> small initial sequence of chords (list of lists), used to prime the network
    # length -> how many chords to generate
    
    token_sequence = np.array(seed_chords, dtype=int) # turn into numpy array
    # rescaling to match model training
    token_sequence = np.where(token_sequence == 0, 0, token_sequence - min_note + 1)
    # flatten: 1 -> batch size, -1 -> atuo flatten everything into single dimension
    # if token_sequence has shape (num_seed_chords, num_voices) -> (1, num_seed_chords * num_voices)
    token_sequence = token_sequence.reshape(1, -1)

    for _ in range(length * 4):
        # length = num chords to generate, multiply by 4 because each chord has 4 voices

        # input: (1, seq_length)
        # output: (1, seq_length, 47) -> softmax probs for the next note over 47 posssible notes for every timestep
        # [0, -1] -> 0 takes the only batch, -1 takes the last probability vecotr, shape (47,)
        next_token_probabilities = model.predict(token_sequence, verbose=0)[0, -1]
        # sample the next note
        next_token = sample_next_note(next_token_probabilities)
        #concatenate onto the chorale: [[next_token]] shape (1, 1) to match the batch dimension
        # axis=1 to add the new note at the end of the sequence
        token_sequence = np.concatenate([token_sequence, [[next_token]]], axis=1)

    # convert the notes back to the actual MIDI nums
    token_sequence = np.where(token_sequence == 0, 0, token_sequence + min_note - 1)

    # reshape, 4 columns for the 4 voices. -1 means figure out the row dimension
    return token_sequence.reshape(-1, 4)

In [36]:
chorale = test_data[2] # the third chorale in the test set

s = stream.Stream()
for row in chorale:
    # add the chords to the stream
    s.append(chord.Chord([n for n in row if n], quarterLength=1))

s.show("midi")

In [37]:
seed_chords = test_data[2][:8] # take the first 8 timesteps for generation
new_chorale = generate_chorale(model, seed_chords, 56) # generate 56 new chords
new_chorale

array([[73, 68, 61, 53],
       [73, 68, 61, 53],
       [73, 68, 61, 53],
       [73, 68, 61, 53],
       [69, 66, 61, 54],
       [69, 66, 61, 54],
       [69, 66, 61, 54],
       [69, 66, 61, 54],
       [68, 64, 59, 52],
       [68, 64, 59, 52],
       [68, 62, 59, 52],
       [68, 62, 59, 52],
       [64, 61, 61, 49],
       [64, 61, 61, 49],
       [64, 61, 61, 49],
       [64, 61, 61, 49],
       [69, 61, 59, 51],
       [69, 61, 59, 51],
       [69, 61, 59, 54],
       [69, 61, 59, 54],
       [68, 61, 59, 52],
       [68, 61, 59, 52],
       [68, 61, 59, 52],
       [68, 61, 59, 52],
       [66, 61, 57, 57],
       [66, 61, 57, 57],
       [66, 61, 57, 54],
       [66, 61, 57, 54],
       [73, 64, 59, 56],
       [73, 64, 59, 56],
       [73, 64, 61, 56],
       [73, 64, 61, 56],
       [74, 66, 59, 53],
       [74, 66, 59, 53],
       [74, 66, 59, 53],
       [74, 66, 59, 53],
       [73, 64, 61, 52],
       [73, 64, 61, 52],
       [73, 64, 59, 52],
       [73, 64, 59, 52],


In [38]:
chorale = new_chorale.tolist()
s = stream.Stream()
for row in chorale:
    s.append(chord.Chord([n for n in row if n], quarterLength=1))

s.show("midi")