In [4]:
from pathlib import Path

import numpy as np
import pandas as pd
import tensorflow as tf

input_path = Path("..\\data\\01_preprocessed\\bach.csv")
all_files = pd.read_csv(input_path, sep=";", dtype={"duration":str})
all_files.head()


Unnamed: 0,measure,composer,corpus,partition,notes,node_id,pitch,duration
0,0,bach,bwv1.6,horn__,"['F4', 1.0]",0,F4,1.0
1,1,bach,bwv1.6,horn__,"['G4', 0.5]",1,G4,0.5
2,1,bach,bwv1.6,horn__,"['C4', 0.5]",2,C4,0.5
3,1,bach,bwv1.6,horn__,"['F4', 0.5]",3,F4,0.5
4,1,bach,bwv1.6,horn__,"['F3', 0.5]",4,F3,0.5


# Define duration vocabulary

In [5]:
print(all_files.duration.value_counts())
duration_vocab = [f"{x:.4f}" for x in np.arange(.0, 8.1, .125)]
duration_vocab[:15]

1.0000    55776
0.5000    39690
2.0000     6925
0.2500     5463
3.0000     2341
1.5000     1456
4.0000     1107
0.7500      150
0.1250       86
6.0000       21
8.0000        4
0.0000        3
Name: duration, dtype: int64


['0.0000',
 '0.1250',
 '0.2500',
 '0.3750',
 '0.5000',
 '0.6250',
 '0.7500',
 '0.8750',
 '1.0000',
 '1.1250',
 '1.2500',
 '1.3750',
 '1.5000',
 '1.6250',
 '1.7500']

# Pitch vocabulary

In [6]:
all_files.pitch.value_counts()

D4      7905
G4      6905
A4      6823
E4      6785
A3      6066
        ... 
C#2        1
D#2        1
G-5        1
F##4       1
F##3       1
Name: pitch, Length: 85, dtype: int64

In [7]:
octaves = range(1, 7, 1)
accents = ["", "#", "##", "-", "--"]
pitches = ["A", "B", "C", "D", "E", "F", "G"]

pitch_vocab = []
for pitch in pitches:
    for accent in accents:
        for octave in octaves:
            pitch_vocab.append(f"{pitch}{accent}{octave}")

pitch_vocab[:15]


['A1',
 'A2',
 'A3',
 'A4',
 'A5',
 'A6',
 'A#1',
 'A#2',
 'A#3',
 'A#4',
 'A#5',
 'A#6',
 'A##1',
 'A##2',
 'A##3']

# Preparing large dataset

In [8]:
COLUMNS = ["corpus","measure", "node_id", "pitch", "duration"]
tenors = all_files[all_files.partition == "tenor"][COLUMNS].\
    sort_values(["corpus","measure","node_id"])

tenors.head()

Unnamed: 0,corpus,measure,node_id,pitch,duration
315,bwv1.6,0,315,A3,1.0
316,bwv1.6,1,316,G3,1.0
317,bwv1.6,1,317,A3,1.0
318,bwv1.6,1,318,A3,1.0
319,bwv1.6,1,319,A3,1.0


In [41]:
from functools import reduce

ids_from_pitches = tf.keras.layers.StringLookup(
    vocabulary=pitch_vocab, mask_token=None
)

pitches_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_pitches.get_vocabulary(), invert=True, mask_token=None,
)

ids_from_durations = tf.keras.layers.StringLookup(
    vocabulary=duration_vocab, mask_token=None
)

durations_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_durations.get_vocabulary(), invert=True, mask_token=None
)

SEQ_LENGTH = 30

combined_pitch_datasets = []
combined_duration_datasets = []

for corpus in tenors.corpus.unique():
    sliced_data = tenors[tenors.corpus == corpus]
    slice_pitch_ids = ids_from_pitches(sliced_data.pitch)
    slice_duration_ids = ids_from_durations(sliced_data.duration)

    pitch_ids_dataset = tf.data.Dataset.from_tensor_slices(slice_pitch_ids).\
        batch(SEQ_LENGTH+1, drop_remainder=True)

    duration_ids_dataset = tf.data.Dataset.from_tensor_slices(slice_duration_ids).\
        batch(SEQ_LENGTH+1, drop_remainder=True)

    combined_pitch_datasets.append(pitch_ids_dataset)
    combined_duration_datasets.append(duration_ids_dataset)
    
global_pitch_dataset = reduce(lambda x, y: x.concatenate(y), combined_pitch_datasets)
global_duration_dataset = reduce(lambda x, y: x.concatenate(y), combined_duration_datasets)



# Splitting intput and target

In [36]:
from typing import Iterable, Tuple
def split_input_target(sequence: Iterable) -> Tuple[Iterable]:  # type: ignore
    """function splits sequence"""
    input_seq = sequence[:-1]  # type: ignore
    target_seq = sequence[1:]  # type: ignore

    return input_seq, target_seq

split_input_target(["A1", "A2", "A3"])


(['A1', 'A2'], ['A2', 'A3'])

In [59]:
pitch_dataset = global_pitch_dataset.map(split_input_target).batch(64, drop_remainder=True)
duration_dataset = global_duration_dataset.map(split_input_target).batch(64, drop_remainder=True)

for input, target in pitch_dataset.take(3):
    print(f"Input: {input}")
    print(f"Output: {target}")

for input, target in duration_dataset.take(3):
    print(f"Input: {input}")
    print(f"Output: {target}")


Input: [[  3 183   3 ... 183   3  51]
 [  3 183   3 ... 183   3  51]
 [  0  64  64 ...  94 183 153]
 ...
 [  3  33  70 ...  33   3 189]
 [ 64  94  64 ...  94  64  64]
 [ 64  51   3 ...  64  64  64]]
Output: [[183   3   3 ...   3  51   3]
 [183   3   3 ...   3  51   3]
 [ 64  64  64 ... 183 153 183]
 ...
 [ 33  70  33 ...   3 189 189]
 [ 94  64  51 ...  64  64  94]
 [ 51   3  64 ...  64  64  64]]
Input: [[  3  33  70 ...  94  70 160]
 [183   3  33 ...  33   3 124]
 [ 94  64  64 ... 123  64  33]
 ...
 [ 94  94   3 ...  94  70   3]
 [ 94   3  33 ...  70   3   3]
 [124 160 184 ...  33  94  94]]
Output: [[ 33  70  33 ...  70 160 124]
 [  3  33  64 ...   3 124 124]
 [ 64  64  33 ...  64  33  33]
 ...
 [ 94   3  33 ...  70   3  94]
 [  3  33  64 ...   3   3 124]
 [160 184 124 ...  94  94   3]]
Input: [[ 94  94   3 ...  94  70   3]
 [ 94   3  33 ...  70   3   3]
 [124 160 184 ...  33  94  94]
 ...
 [142  51 112 ... 142 154 154]
 [ 51  51 154 ...  51 112  64]
 [ 94   3   3 ...   3   3  33]]
Out

# Build a model

In [80]:
class PitchModel(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__(self)
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim, name="pitch_embedding")
        self.gru = tf.keras.layers.GRU(
            rnn_units,
            return_sequences=True,
            return_state=True,
            name="pitch_gru"
        )
        self.dense = tf.keras.layers.Dense(vocab_size, name="pitch_dense")

    def call(self, inputs, states=None, return_state=False, training=False):
        x = inputs
        x = self.embedding(x, training=training)
        if states is None:
            states = self.gru.get_initial_state(x)
        
        x, states = self.gru(x, initial_state=states, training=training)
        x = self.dense(x, training=training)

        if return_state:
            return x, states

        return x

# Length of the vocabulary in chars
vocab_size = len(ids_from_pitches.get_vocabulary())
# The embedding dimension
EMBEDDING_DIM = 256
# Number of RNN units
RNN_UNITS = 256

model = PitchModel(
    vocab_size=vocab_size,
    embedding_dim=EMBEDDING_DIM,
    rnn_units=RNN_UNITS
)

In [81]:
for input_example, target_example in pitch_dataset.take(1):
    example_predictions = model(input_example)
    print(example_predictions.shape)
    print(example_predictions)

(64, 30, 211)
tf.Tensor(
[[[-1.30140502e-02  1.32233733e-02 -1.77481174e-02 ...  1.47317424e-02
   -2.82256352e-03 -1.14547685e-02]
  [ 3.24489246e-03  9.63796861e-03  3.35728750e-04 ...  4.43810504e-03
   -2.48514232e-03 -4.10472602e-03]
  [-9.10016708e-03  1.60252675e-02 -1.65577326e-02 ...  1.69449002e-02
   -6.66462351e-03 -1.16197364e-02]
  ...
  [ 8.53319280e-03  8.75596330e-03 -4.29142732e-03 ...  2.52400711e-03
   -7.74160214e-03 -5.45661896e-05]
  [-6.48778398e-03  1.44962426e-02 -1.85864735e-02 ...  1.61049925e-02
   -8.71404447e-03 -1.01144528e-02]
  [-2.01475453e-02  2.03341385e-03  3.82974185e-03 ...  7.60482159e-03
   -2.44545024e-02 -2.03721970e-02]]

 [[-1.30140502e-02  1.32233733e-02 -1.77481174e-02 ...  1.47317424e-02
   -2.82256352e-03 -1.14547685e-02]
  [ 3.24489246e-03  9.63796861e-03  3.35728750e-04 ...  4.43810504e-03
   -2.48514232e-03 -4.10472602e-03]
  [-9.10016708e-03  1.60252675e-02 -1.65577326e-02 ...  1.69449002e-02
   -6.66462351e-03 -1.16197364e-02]
  ..

In [82]:
model.summary()

Model: "pitch_model_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 pitch_embedding (Embedding)  multiple                 54016     
                                                                 
 pitch_gru (GRU)             multiple                  394752    
                                                                 
 pitch_dense (Dense)         multiple                  54227     
                                                                 
Total params: 502,995
Trainable params: 502,995
Non-trainable params: 0
_________________________________________________________________


In [83]:
sampled_indices = tf.random.categorical(
    example_predictions[0], num_samples=1)

sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()
sampled_indices

array([167,  34, 150,  79, 137,   6, 177,  72, 145, 145,  91, 142, 149,
        17, 117,   3,  99, 172,   3,  60,  98, 153,  96, 210,  91, 163,
        90, 172,  68,  49], dtype=int64)

In [84]:
print("Input:\n", pitches_from_ids(input_example[0]).numpy())
print()
print("Next Char Predictions:\n", pitches_from_ids(sampled_indices).numpy())


Input:
 [b'A3' b'G3' b'A3' b'A3' b'A3' b'F3' b'F4' b'F4' b'F4' b'F4' b'G4' b'F4'
 b'D4' b'E4' b'F4' b'E4' b'D4' b'E4' b'C4' b'A3' b'F3' b'G3' b'A3' b'A3'
 b'G3' b'A3' b'F3' b'G3' b'A3' b'B-3']

Next Char Predictions:
 [b'F##5' b'B4' b'E--6' b'C-1' b'E##5' b'A6' b'F--3' b'C#6' b'E--1' b'E--1'
 b'D1' b'E-4' b'E--5' b'A##5' b'D--3' b'A3' b'D#3' b'F-4' b'A3' b'B--6'
 b'D#2' b'F3' b'D6' b'G--6' b'D1' b'F##1' b'C--6' b'F-4' b'C#2' b'B-1']


# Train the model

In [85]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
example_mean_loss = loss(target_example, example_predictions)
print(f"Prediction shape: {example_predictions.shape}")
print(f"Mean loss: {tf.exp(example_mean_loss)}")

Prediction shape: (64, 30, 211)
Mean loss: 210.66091918945312


In [86]:
model.compile(optimizer="adam", loss=loss)

In [87]:
import os
checkpoint_dir = r".\training_checkpoints"
checkpoint_prefix = os.path.join(checkpoint_dir, "chkpt_{epoch}")
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_dir, save_weights_only=True)

In [88]:
EPOCHS = 40
history = model.fit(pitch_dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40
Epoch 11/40
Epoch 12/40
Epoch 13/40
Epoch 14/40
Epoch 15/40
Epoch 16/40
Epoch 17/40
Epoch 18/40
Epoch 19/40
Epoch 20/40
Epoch 21/40
Epoch 22/40
Epoch 23/40
Epoch 24/40
Epoch 25/40
Epoch 26/40
Epoch 27/40
Epoch 28/40
Epoch 29/40
Epoch 30/40
Epoch 31/40
Epoch 32/40
Epoch 33/40
Epoch 34/40
Epoch 35/40
Epoch 36/40
Epoch 37/40
Epoch 38/40
Epoch 39/40
Epoch 40/40


# Music generation

In [76]:
type(pitches_from_ids)

keras.layers.preprocessing.string_lookup.StringLookup

In [167]:
import time
class OneStep(tf.keras.Model):
    def __init__(
        self, 
        model: tf.keras.Model, 
        pitches_from_ids: tf.keras.layers.StringLookup,
        ids_from_pitches: tf.keras.layers.StringLookup,
        temperature: float=1.,
    ):
        super().__init__()
        self.temperature = temperature
        self.model = model
        self.pitches_from_ids = pitches_from_ids
        self.ids_from_pitches = ids_from_pitches
        
        skip_ids = self.ids_from_pitches(['[UNK]'])[:, None]
        sparse_mask = tf.SparseTensor(
            values = [-float("inf")]*len(skip_ids),
            indices=skip_ids,
            dense_shape=[len(ids_from_pitches.get_vocabulary())]
        )
        self.prediction_mask = tf.sparse.to_dense(sparse_mask)

    @tf.function
    def generate_one_step(self, inputs, states=None):
        # convert tokens into token ids
        raged_input = tf.ragged.stack([tf.convert_to_tensor(inputs)])
        input_ids = self.ids_from_pitches(raged_input).to_tensor()
        print(f"{input_ids=}")

        # run the model
        # predicted logits shape is [batch, char, next_char_logits]
        predicted_logits, states = self.model(inputs=input_ids, states=states, return_state=True)
        print(f"{predicted_logits=}")

        # use only the last prediction
        predicted_logits = predicted_logits[:,-1, :]
        predicted_logits = predicted_logits/self.temperature

        #apply prediction_mask : prevent ["UNK"] from being generated
        predicted_logits = predicted_logits + self.prediction_mask

        #sample the output logits to generate token ids
        predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
        predicted_ids = tf.squeeze(predicted_ids, axis=-1)

        # convert from token ids to characters
        predicted_pitches = self.pitches_from_ids(predicted_ids)

        return predicted_pitches, states


one_step_model = OneStep(model, pitches_from_ids, ids_from_pitches)
start = time.time()
states = None
next_pitch = ["A1", "G1", "A1", ]
result = next_pitch

for n in range(27):
    next_pitch, states = one_step_model.generate_one_step(
        next_pitch, states=states)
    result.append(next_pitch.numpy()[0].decode())

end = time.time()

print(f"{result=}")


input_ids=<tf.Tensor 'RaggedToTensor/RaggedTensorToTensor:0' shape=(1, None) dtype=int64>
predicted_logits=<tf.Tensor 'pitch_model_4/pitch_dense/BiasAdd:0' shape=(1, None, 211) dtype=float32>
input_ids=<tf.Tensor 'RaggedToTensor/RaggedTensorToTensor:0' shape=(1, None) dtype=int64>
predicted_logits=<tf.Tensor 'pitch_model_4/pitch_dense/BiasAdd:0' shape=(1, None, 211) dtype=float32>
result=['A1', 'G1', 'A1', 'F#4', 'E4', 'D4', 'E4', 'F4', 'G4', 'C#4', 'D4', 'C#4', 'D4', 'G4', 'F#4', 'E4', 'B3', 'E4', 'D4', 'D4', 'C#4', 'A3', 'A3', 'G3', 'F#3', 'G3', 'B3', 'B3', 'C4', 'D4']


In [156]:
durations = tenors[tenors.corpus == "bwv1.6"].duration[:30]
durations

315    1.0000
316    1.0000
317    1.0000
318    1.0000
319    1.0000
320    1.0000
321    1.0000
322    1.0000
323    1.0000
324    1.0000
325    1.0000
326    0.5000
327    0.5000
328    1.0000
329    0.5000
330    0.5000
331    1.0000
332    1.0000
333    0.5000
334    0.5000
335    0.5000
336    0.5000
337    1.0000
338    0.5000
339    0.5000
340    0.5000
341    0.5000
342    0.5000
343    0.5000
344    1.0000
Name: duration, dtype: object

In [168]:
import music21 as mu


generated_stream = mu.stream.Stream()
for el, dur in zip(result, durations):
    generated_stream.append(mu.note.Note(el, quarterLength=float(dur)))
generated_stream.show("midi")

In [161]:
original_stream = mu.stream.Stream()
original_notes = tenors[tenors.corpus == "bwv1.6"].pitch[:30]
for el, dur in zip(original_notes, durations):
    original_stream.append(mu.note.Note(el, quarterLength=float(dur)))
original_stream.show("midi")