In [None]:
import numpy as np
import tensorflow as tf
import pandas as pd
import os
from pathlib import Path
from tensorflow.keras import layers
from functools import partial

In [None]:
jsb_chorales_dir = Path("./drive/MyDrive/jsb_chorales")

In [None]:
jsb_chorales_dir

PosixPath('drive/MyDrive/jsb_chorales')

In [None]:
train_files = sorted(jsb_chorales_dir.glob("train/chorale_*.csv"))
test_files = sorted(jsb_chorales_dir.glob("test/chorale_*.csv"))
valid_files = sorted(jsb_chorales_dir.glob("valid/chorale_*.csv"))

In [None]:
len(train_files),len(test_files),len(valid_files)

(229, 77, 76)

In [None]:
def read_chorales(file_paths):
  return [pd.read_csv(file_path).values.tolist() for file_path in file_paths]

train_data = read_chorales(train_files)
valid_data = read_chorales(valid_files)
test_data = read_chorales(test_files)

In [None]:
notes = set()
for chorales in (train_data,valid_data,test_data):
  for chorale in chorales:
    for chord in chorale:
      notes |= set(chord)

n_notes = len(notes)
min_note = min(notes-{0})
max_note = max(notes)

In [None]:
n_notes,min_note,max_note

(47, 36, 81)

In [None]:
def preprocess(window):
  
  window = tf.where(window==0,window,window-min_note+1)
  return tf.reshape(window,[-1])

def create_target(batch):
  X = batch[:,:-1]
  Y = batch[:,1:]
  print(tf.shape(X),tf.shape(Y))
  return X,Y

def bach_dataset(chorales,window_size=32,window_shift_size=16,batch_size=32,cache=True,
                 shuffle_buffer_size=None):
  
  def batch_windows(window):
    return window.batch(window_size+1)


  def to_windows(chorale):
    dataset = tf.data.Dataset.from_tensor_slices(chorale)
    dataset = dataset.window(size=window_size+1,shift=window_shift_size,drop_remainder=True)
    return dataset.flat_map(batch_windows)
  
  dataset = tf.ragged.constant(chorales,ragged_rank=1)
  dataset = tf.data.Dataset.from_tensor_slices(dataset)
  dataset = dataset.flat_map(to_windows).map(preprocess)

  if cache:
    dataset = dataset.cache()
  
  if shuffle_buffer_size:
    dataset = dataset.shuffle(buffer_size=shuffle_buffer_size)
  
  dataset = dataset.batch(batch_size)
  dataset = dataset.map(create_target)

  return dataset.prefetch(1)

In [None]:
train_set = bach_dataset(train_data,shuffle_buffer_size=1000)
valid_set = bach_dataset(valid_data)
test_set = bach_dataset(test_data)

Tensor("Shape:0", shape=(2,), dtype=int32) Tensor("Shape_1:0", shape=(2,), dtype=int32)
Tensor("Shape:0", shape=(2,), dtype=int32) Tensor("Shape_1:0", shape=(2,), dtype=int32)
Tensor("Shape:0", shape=(2,), dtype=int32) Tensor("Shape_1:0", shape=(2,), dtype=int32)


In [None]:
for i,j in train_set.take(1):
  print(i)

tf.Tensor(
[[32 26 23 ... 37 34 22]
 [38 30 21 ... 38 32 29]
 [32 29 25 ... 37 32 29]
 ...
 [40 33 30 ... 35 32 27]
 [34 29 27 ... 29 27 24]
 [36 34 31 ... 34 25 19]], shape=(32, 131), dtype=int32)


In [None]:
n_embedding_dims = 5

AltConv1D = partial(layers.Conv1D,kernel_size=2,padding="causal",activation="relu")

model = tf.keras.models.Sequential([
    layers.Embedding(input_dim=n_notes,output_dim=n_embedding_dims),
    AltConv1D(filters=32),layers.BatchNormalization(),
    AltConv1D(filters=48,dilation_rate=2),layers.BatchNormalization(),
    AltConv1D(filters=64,dilation_rate=4),layers.BatchNormalization(),
    AltConv1D(filters=96,dilation_rate=8),layers.BatchNormalization(),
    layers.LSTM(units=256,return_sequences=True),
    layers.Dense(units=n_notes,activation="softmax")
],name="bach_model")

model.summary()

Model: "bach_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       (None, None, 5)           235       
                                                                 
 conv1d (Conv1D)             (None, None, 32)          352       
                                                                 
 batch_normalization (BatchN  (None, None, 32)         128       
 ormalization)                                                   
                                                                 
 conv1d_1 (Conv1D)           (None, None, 48)          3120      
                                                                 
 batch_normalization_1 (Batc  (None, None, 48)         192       
 hNormalization)                                                 
                                                                 
 conv1d_2 (Conv1D)           (None, None, 64)          6

In [None]:
def get_run_log_dir():
  import time
  sub_dir = time.strftime("%H_%m_%S--%Y_%M_%D")
  return os.path.join("./my_log",sub_dir)

In [None]:
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=tf.keras.optimizers.Nadam(learning_rate=1e-3),
              metrics="accuracy")
tb_cb = tf.keras.callbacks.TensorBoard(get_run_log_dir())

history = model.fit(train_set,epochs=20,
                    validation_data=valid_set,
                    callbacks=[tb_cb])

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [None]:
from IPython.display import Audio

def notes_to_frequencies(notes):
    return 2 ** ((np.array(notes) - 69) / 12) * 440

def frequencies_to_samples(frequencies, tempo, sample_rate):
    note_duration = 60 / tempo 
    frequencies = np.round(note_duration * frequencies) / note_duration
    n_samples = int(note_duration * sample_rate)
    time = np.linspace(0, note_duration, n_samples)
    sine_waves = np.sin(2 * np.pi * frequencies.reshape(-1, 1) * time)
    sine_waves *= (frequencies > 9.).reshape(-1, 1)
    return sine_waves.reshape(-1)

def chords_to_samples(chords, tempo, sample_rate):
    freqs = notes_to_frequencies(chords)
    freqs = np.r_[freqs, freqs[-1:]] 
    merged = np.mean([frequencies_to_samples(melody, tempo, sample_rate)
                     for melody in freqs.T], axis=0)
    n_fade_out_samples = sample_rate * 60 // tempo 
    fade_out = np.linspace(1., 0., n_fade_out_samples)**2
    merged[-n_fade_out_samples:] *= fade_out
    return merged

def play_chords(chords, tempo=160, amplitude=0.1, sample_rate=44100, filepath=None):
    samples = amplitude * chords_to_samples(chords, tempo, sample_rate)
    if filepath:
        from scipy.io import wavfile
        samples = (2**15 * samples).astype(np.int16)
        wavfile.write(filepath, sample_rate, samples)
        return display(Audio(filepath))
    else:
        return display(Audio(samples, rate=sample_rate))

In [None]:
def generate_chorale(model,seed_chords,length=64,temperature=1):
  arpegio = preprocess(tf.constant(seed_chords,dtype=tf.int64))
  arpegio = tf.reshape(arpegio,[1,-1])

  for chord in range(length):
    for note in range(4):
      # print(tf.shape(model.predict(arpegio)))
      next_note_probas = model.predict(arpegio)[0,-1:]
      rescaled_probas = tf.math.log(next_note_probas)/temperature
      next_note = tf.random.categorical(rescaled_probas,num_samples=1)
      arpegio = tf.concat([arpegio,next_note],axis=1)
  
  arpegio = tf.where(arpegio==0,arpegio,arpegio+min_note-1)
  return tf.reshape(arpegio,[-1,4])

In [None]:
seed_chords = test_data[2][:8]

In [None]:
new_chorale = generate_chorale(model,seed_chords,56)

In [None]:
play_chords(new_chorale)

In [22]:
model.save("bach_model.h5")