In [1]:
import pandas as pd

In [3]:
df = pd.read_csv('/content/Dataset/train/chorale_000.csv')

In [4]:
df

Unnamed: 0,note0,note1,note2,note3
0,74,70,65,58
1,74,70,65,58
2,74,70,65,58
3,74,70,65,58
4,75,70,58,55
...,...,...,...,...
187,70,65,62,46
188,70,65,62,46
189,70,65,62,46
190,70,65,62,46


In [6]:
import os

train_files = sorted([os.path.join('Dataset', 'train', f) for f in os.listdir(os.path.join('Dataset', 'train')) if f.endswith('.csv')])
valid_files = sorted([os.path.join('Dataset', 'valid', f) for f in os.listdir(os.path.join('Dataset', 'valid')) if f.endswith('.csv')])
test_files = sorted([os.path.join('Dataset', 'test', f) for f in os.listdir(os.path.join('Dataset', 'test')) if f.endswith('.csv')])


In [7]:
train_data = [pd.read_csv(f).values.tolist() for f in train_files]
valid_data = [pd.read_csv(f).values.tolist() for f in valid_files]
test_data = [pd.read_csv(f).values.tolist() for f in test_files]

In [8]:
train_data[0]

[[74, 70, 65, 58],
 [74, 70, 65, 58],
 [74, 70, 65, 58],
 [74, 70, 65, 58],
 [75, 70, 58, 55],
 [75, 70, 58, 55],
 [75, 70, 60, 55],
 [75, 70, 60, 55],
 [77, 69, 62, 50],
 [77, 69, 62, 50],
 [77, 69, 62, 50],
 [77, 69, 62, 50],
 [77, 70, 62, 55],
 [77, 70, 62, 55],
 [77, 69, 62, 55],
 [77, 69, 62, 55],
 [75, 67, 63, 48],
 [75, 67, 63, 48],
 [75, 69, 63, 48],
 [75, 69, 63, 48],
 [74, 70, 65, 46],
 [74, 70, 65, 46],
 [74, 70, 65, 46],
 [74, 70, 65, 46],
 [72, 69, 65, 53],
 [72, 69, 65, 53],
 [72, 69, 65, 53],
 [72, 69, 65, 53],
 [72, 69, 65, 53],
 [72, 69, 65, 53],
 [72, 69, 65, 53],
 [72, 69, 65, 53],
 [74, 70, 65, 46],
 [74, 70, 65, 46],
 [74, 70, 65, 46],
 [74, 70, 65, 46],
 [75, 69, 63, 48],
 [75, 69, 63, 48],
 [75, 67, 63, 48],
 [75, 67, 63, 48],
 [77, 65, 62, 50],
 [77, 65, 62, 50],
 [77, 65, 60, 50],
 [77, 65, 60, 50],
 [74, 67, 58, 55],
 [74, 67, 58, 55],
 [74, 67, 58, 53],
 [74, 67, 58, 53],
 [72, 67, 58, 51],
 [72, 67, 58, 51],
 [72, 67, 58, 51],
 [72, 67, 58, 51],
 [72, 65, 57

In [9]:
from music21 import stream, chord

chorale = train_data[20]
s = stream.Stream()
for row in chorale:
    s.append(chord.Chord([n for n in row if n], quarterLength=1))
s.show('midi')

In [10]:
import numpy as np

min_note, max_note = 36, 81
window_size, window_offset, batch_size = 32, 16, 32


def make_xy(chorales):
    # get segments of 33 chords with 16 chords offset between them
    windows = [c[i:i + window_size + 1] for c in chorales for i in range(0, len(c) - window_size, window_offset)]

    data = np.array(windows, dtype=int)
    # if note is 0, keep it, otherwise rescale notes from 36-81 to 1-46
    data = np.where(data==0, 0, data - min_note + 1)
    # make the range 0-46 in total
    data = np.clip(data, 0, max_note - min_note + 1)

    flat = data.reshape(data.shape[0], -1)

    # return every chord except the last one for X (32 notes) and everything except the first one for Y (32 notes)
    return flat[:, :-1], flat[:, 1:]


X_train, Y_train = make_xy(train_data)
X_valid, Y_valid = make_xy(valid_data)
X_test,  Y_test  = make_xy(test_data)

In [11]:
X_train.shape  # 3111 times 32 chords with 4 notes each

(3111, 131)

In [12]:
Y_train.shape  # same but shifted by one

(3111, 131)

In [13]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Embedding, Conv1D, BatchNormalization, LSTM, Dropout
from tensorflow.keras.optimizers import Nadam

In [14]:
train_notes = set([z for x in train_data for y in x for z in y])
test_notes = set([z for x in test_data for y in x for z in y])
valid_notes = set([z for x in valid_data for y in x for z in y])

num_notes = len(set.union(train_notes, test_notes, valid_notes))
num_notes

47

In [15]:
model = Sequential()

# Embedding layer so the model can learn representations of the notes (learns vector representation of notes)
# Integers have no geometry, by learning embeddings, we can keep "nearby" notes close in vector space (e.g., same pitch-class, close octaves etc.)
# Since we don't have too many notes, we can use a small dimensionality like 5
model.add(Embedding(input_dim=num_notes, output_dim=5, input_shape=[None]))
# 1D convs allow us to extract temporal patterns in parallel (unlike RNN layers)
# 1D convs slide 1D kernels / filters over our feature vector to learn temporal patterns
# Padding causal means we cannot look ahead, so we keep causality
model.add(Conv1D(32, kernel_size=2, padding="causal", activation="relu"))  # here 32 filters of size 2
# Batch norms after each conv keep activations well-scaled and consistent across the whole stack of layers
# Counteracts vanishing / exploding gradients, allows for higher stable learning rates and faster training
model.add(BatchNormalization())
# Dilation rate means how much we look back
# For example kernel size 2 and dilation rate of one means look at t and t-1
# Kernel size 2 and dilation rate of two means look at t and t-2
# Kernel size 2 and dilation rate of 16 means look at t and t-16
# Stacking these increasing dilation rates allows us to efficiently cover short, medium and longer history
# We grow the receptive field without using a lot of model parameters
# We indirectly connect notes that are up to 32 positions apart
model.add(Conv1D(48, kernel_size=2, padding="causal", activation="relu", dilation_rate=2))
model.add(BatchNormalization())
# Also, since we increase dilation rate, each conv layer sees a wider time span
# More kernels / filters allow us to capture more kinds of patterns
# If we don't increase this, we could end up with a bottleneck here
model.add(Conv1D(64, kernel_size=2, padding="causal", activation="relu", dilation_rate=4))
model.add(BatchNormalization())
model.add(Conv1D(96, kernel_size=2, padding="causal", activation="relu", dilation_rate=8))
model.add(BatchNormalization())
model.add(Conv1D(128, kernel_size=2, padding="causal", activation="relu", dilation_rate=16))
model.add(BatchNormalization())
# Just a bit of regularization here so the model does not rely too much on individual features
model.add(Dropout(0.05))
# The conv layers summarized local and mid-range context into richer features for us
# The LSTM now only has to track longer structure of the music
# Doing LSTM first would lead to discovering local AND long patterns, which is slower and harder to optimize
# Convs first is cheaper and parallelizable
# We can basically combine the layers to look back 32 tokens (like a binary system numbers)
model.add(LSTM(256, return_sequences=True))
# Finally a dense layer to project the LSTM output to logits for each possible note
model.add(Dense(num_notes, activation='softmax'))

model.summary()

  super().__init__(**kwargs)


In [16]:

optimizer = Nadam(learning_rate=1e-3)
model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])

model.fit(X_train, Y_train, epochs=20, validation_data=[X_valid, Y_valid], batch_size=batch_size)

Epoch 1/20
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 34ms/step - accuracy: 0.3233 - loss: 2.6652 - val_accuracy: 0.0856 - val_loss: 3.5088
Epoch 2/20
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 21ms/step - accuracy: 0.7562 - loss: 0.9177 - val_accuracy: 0.0432 - val_loss: 3.3746
Epoch 3/20
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 21ms/step - accuracy: 0.7954 - loss: 0.7220 - val_accuracy: 0.1164 - val_loss: 3.1305
Epoch 4/20
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 21ms/step - accuracy: 0.8122 - loss: 0.6484 - val_accuracy: 0.3233 - val_loss: 2.2708
Epoch 5/20
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 28ms/step - accuracy: 0.8263 - loss: 0.5863 - val_accuracy: 0.5474 - val_loss: 1.4817
Epoch 6/20
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 30ms/step - accuracy: 0.8363 - loss: 0.5450 - val_accuracy: 0.7397 - val_loss: 0.8774
Epoch 7/20
[1m98/98[0m [32m━━━

<keras.src.callbacks.history.History at 0x7b471e4c9eb0>

**Generate Music with Model**

In [17]:
import numpy as np


def sample_next_note(probs):
    probabilities = np.asarray(probs, dtype=float)  # probabilities for each note to be the next

    probs_sum = probabilities.sum()  # get the sum for normalization

    # if the probability sum is zero, negative or infinite -> just return the note with the highest probability
    if probs_sum <= 0 or not np.isfinite(probs_sum):
        return int(np.argmax(probabilities))

    probabilities /= probs_sum # otherwise normalize the probabilities to be between 0 and 1
    return np.random.choice(len(probabilities), p=probabilities)  # randomly select a note based on probability


def generate_chorale(model, seed_chords, length):
    token_sequence = np.array(seed_chords, dtype=int)  # get starting chords / notes
    token_sequence = np.where(token_sequence == 0, token_sequence, token_sequence - 36 + 1)  # map all notes to 0-46 as in training
    token_sequence = token_sequence.reshape(1, -1)

    # we generate note by note, not chord by chord
    for _ in range(length * 4):
        next_token_probabilities = model.predict(token_sequence, verbose=0)[0, -1]  # get probabilities for next note from model
        next_token = sample_next_note(next_token_probabilities)  # sample from probabilities with sample function
        token_sequence = np.concatenate([token_sequence, [[next_token]]], axis=1)

    token_sequence = np.where(token_sequence == 0, token_sequence, token_sequence + 36 - 1)   # map to MIDI (0 & 36-81)

    return token_sequence.reshape(-1, 4)

In [18]:
# Initial chords (seed)
seed_chords = test_data[2][:8]

chorale = seed_chords
s = stream.Stream()
for row in chorale:
    s.append(chord.Chord([n for n in row if n], quarterLength=1))
s.show('midi')

In [19]:
# Complete actual chorale (ground truth)
seed_chords = test_data[2]

chorale = seed_chords
s = stream.Stream()
for row in chorale:
    s.append(chord.Chord([n for n in row if n], quarterLength=1))
s.show('midi')

In [20]:
# Generate new chords based on initial two chords (8 notes)
# Results can be better and more creative by introducing temperature, top_p etc.
seed_chords = test_data[2][:8]
new_chorale = generate_chorale(model, seed_chords, 56)

In [21]:
new_chorale

array([[73, 68, 61, 53],
       [73, 68, 61, 53],
       [73, 68, 61, 53],
       [73, 68, 61, 53],
       [69, 66, 61, 54],
       [69, 66, 61, 54],
       [69, 66, 61, 54],
       [69, 66, 61, 54],
       [71, 66, 59, 56],
       [71, 66, 59, 56],
       [71, 64, 58, 57],
       [71, 64, 59, 56],
       [66, 64, 58, 57],
       [66, 64, 58, 57],
       [66, 63, 59, 59],
       [66, 63, 59, 59],
       [67, 63, 59, 59],
       [67, 63, 59, 59],
       [69, 63, 58, 59],
       [69, 63, 58, 59],
       [71, 63, 58, 59],
       [71, 63, 58, 59],
       [73, 66, 66, 58],
       [73, 66, 66, 56],
       [73, 66, 66, 54],
       [73, 66, 66, 54],
       [74, 66, 59, 54],
       [74, 66, 59, 54],
       [76, 66, 61, 52],
       [76, 66, 61, 52],
       [78, 69, 61, 54],
       [78, 69, 61, 54],
       [78, 71, 62, 55],
       [78, 71, 62, 55],
       [78, 69, 62, 54],
       [78, 69, 62, 54],
       [79, 71, 62, 56],
       [79, 71, 62, 56],
       [79, 71, 62, 57],
       [79, 71, 62, 57],


In [22]:
# listen to generated piece

chorale = new_chorale.tolist()
s = stream.Stream()
for row in chorale:
    s.append(chord.Chord([n for n in row if n], quarterLength=1))
s.show('midi')

In [23]:
def generate_random_chorale(length, rest_probability=0.2, pitch_low=36, pitch_high=81, seed=None):
    rng = np.random.default_rng(seed)  # random number generator
    random_pitches = rng.integers(pitch_low, pitch_high + 1, size=(length, 4))  # generate random notes

    # some masking to have both silence and random pitches
    rest_mask = rng.random((length, 4)) < float(rest_probability)
    chorale = np.where(rest_mask, 0, random_pitches).astype(int)

    return chorale

In [24]:
# listen to completely random music to compare the quality to what our model generated
chorale = generate_random_chorale(56).tolist()
s = stream.Stream()
for row in chorale:
    s.append(chord.Chord([n for n in row if n], quarterLength=1))
s.show('midi')

In [25]:
# save model
model.save('bach_generation_conv1d_lstm.keras')

In [26]:
from tensorflow import keras

loaded_model = keras.models.load_model("bach_generation_conv1d_lstm.keras")

In [27]:
new_chorale = generate_chorale(loaded_model, seed_chords, 56)

In [28]:
new_chorale

array([[73, 68, 61, 53],
       [73, 68, 61, 53],
       [73, 68, 61, 53],
       [73, 68, 61, 53],
       [69, 66, 61, 54],
       [69, 66, 61, 54],
       [69, 66, 61, 54],
       [69, 66, 61, 54],
       [74, 66, 59, 59],
       [74, 66, 59, 59],
       [74, 66, 59, 59],
       [74, 66, 59, 59],
       [73, 67, 59, 52],
       [73, 67, 59, 52],
       [38, 67, 59, 52],
       [67, 67, 59, 52],
       [67, 66, 59, 50],
       [67, 66, 59, 50],
       [67, 64, 59, 49],
       [64, 64, 59, 49],
       [64, 64, 59, 47],
       [66, 64, 59, 47],
       [66, 64, 59, 49],
       [66, 64, 59, 49],
       [66, 64, 59, 47],
       [66, 63, 59, 47],
       [66, 63, 59, 47],
       [66, 63, 59, 47],
       [66, 63, 59, 47],
       [66, 62, 59, 47],
       [66, 62, 59, 47],
       [66, 62, 59, 47],
       [66, 62, 59, 47],
       [66, 62, 59, 47],
       [66, 62, 59, 47],
       [66, 62, 59, 47],
       [66, 62, 59, 47],
       [66, 62, 59, 47],
       [66, 62, 59, 47],
       [71, 66, 62, 47],
