In [140]:
from IPython import display
import collections
import datetime
import fluidsynth
import glob
import numpy as np
import pathlib
import pandas as pd
import pretty_midi
import seaborn as sns
import tensorflow as tf

from matplotlib import pyplot as plt
from typing import Dict, List, Optional, Sequence, Tuple

# Tomb added
import random

In [141]:
#Download Chorales
data_dir = pathlib.Path('/Volumes/MAGIC1/CS50/myMusicGen/data/chorales')
if not data_dir.exists():
  tf.keras.utils.get_file(
      'midi',
      origin='https://github.com/jamesrobertlloyd/infinite-bach/tree/master/data/chorales/midi',
      extract=True,
      cache_dir='.', cache_subdir='data',
  )
filenames = glob.glob(str(data_dir/'**/*.mid*'))
print(filenames)
print('Number of files:', len(filenames))

['/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/000101b_.mid', '/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/000106b_.mid', '/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/000106trio.mid', '/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/000206b_.mid', '/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/000306b_.mid', '/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/000408b_.mid', '/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/000504b_.mid', '/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/000507b_.mid', '/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/000603b_.mid', '/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/000606b_.mid', '/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/000707b_.mid', '/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/000806b_.mid', '/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/000907b_.mid', '/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/001007b_.mid', '/Volumes/MAGIC1/CS50/myMusicGen/data/chorales/midi/001106b

In [142]:
class UnsupportedMidiFileException(Exception):
  "Unsupported MIDI File"

In [143]:
seqlen = 64
vocab_size = 128

In [144]:
def get_pianoroll(midi, nn_from, nn_thru, seqlen, tempo):
    pianoroll = midi.get_piano_roll(fs=2*tempo/60) # This is the core line which makes this matrix based on 8th note

    # print(f"piano_roll.shape[1] a.k.a song length!{pianoroll.shape[1]}")

    if pianoroll.shape[1] < seqlen:
        raise UnsupportedMidiFileException

    pianoroll = pianoroll[nn_from:nn_thru, 0:seqlen] # Pinoroll's value still NOT binary since it has velocity
    binary_pianoroll = np.heaviside(pianoroll, 0) # converting as a binary matrix
    transposed_pianoroll = np.transpose(binary_pianoroll)


    def check_pianoroll_dim(pianoroll):
        rows = len(pianoroll)  # This gives the number of rows
        columns = len(pianoroll[0])  # This assumes all rows have the same length

        print("Number of rows:", rows) # represents sequence length
        print("Number of columns:", columns) # represents the 4 octave range in midi 36-83
        print("Total dimesions of pianoroll is", rows*columns)
    
    # check_pianoroll_dim(transposed_pianoroll)
    
    # return binary_pianoroll
    return transposed_pianoroll

#### Read_Midi Explanation

Get Major key(keymode=0) or Minor key(keymode=1)<br>
key_number has values ​​from 0 to 11 for major keys and for minor keys,
12~23 is included, <br> so by **dividing it by 12** and converting it to an integer,<br> 

it will be 0 if it is a major key,<br>
it will be 1 if it is a minor key,<br>

and assign it accordingly to keymode.

In [145]:
def read_midi(filename, sop_alto, seqlen):
  
  def add_rest_nodes(pianoroll):  # If all the elemets are zero, the rest node says 1, else 0
    rests = 1 - np.sum(pianoroll, axis=1)
    rests = np.expand_dims(rests, 1)
    return np.concatenate([pianoroll, rests], axis=1)
  
  
  # read midi file
  midi = pretty_midi.PrettyMIDI(filename)

  # An Exception error is thrown if there is a modulation(key change)
  if len(midi.key_signature_changes) !=1:
    raise UnsupportedMidiFileException

  # Modulate the given key to C major or C minor
  key_number = midi.key_signature_changes[0].key_number
  # transpose_to_c(midi, key_number)

  # Get Major key(keynode=0) or Minor key(keynode=1)
  keymode = np.array([int(key_number / 12)])

  # The Exception error thrown when tempo changes
  tempo_time, tempo = midi.get_tempo_changes()
  if len(tempo) != 1:
    raise UnsupportedMidiFileException
  if sop_alto:
    # The exception thrown if there are less than 2 parts
    if len(midi.instruments) < 2:
      raise UnsupportedMidiFileException
    # Get pianoRoll binary Matrix for each of Soprano and alto parts
    pr_s = get_pianoroll(midi.instruments[0], 36, 84, seqlen, tempo[0])
    pr_a = get_pianoroll(midi.instruments[1], 36, 84, seqlen, tempo[0])
    pr_b = get_pianoroll(midi.instruments[2], 36, 84, seqlen, tempo[0])
    
    sop_w_rest = add_rest_nodes(pr_s)
    alt_w_rest = add_rest_nodes(pr_a)
    bass_w_rest = add_rest_nodes(pr_b)
    
    # return pr_s, pr_a, pr_b, keymode
    return sop_w_rest, alt_w_rest, bass_w_rest, keymode

  else:
    # Get a pianoroll which gathered all the parts
    pr = get_pianoroll(midi, 36, 84, seqlen, tempo[0])
    return pr, keymode

#### Make the training data

In [146]:
"""Get the ingredients"""
np.set_printoptions(threshold=np.inf) # Show the entire print, esp Matrix

x_all = [] # shape will be (495, 64, 49) the list which stores inputs of soprano melodies
y_all = [] # shape will be (495, 64, 49) the list which stores outputs of alto melodies
keymodes = [] # the list which stores the key whether it is major or minor
files = [] # stores the filenames of the MIDI files

# repeat the process with all the midi files
for f in glob.glob(str(data_dir/"**/*.mid*")):
  # print(f)
  try:
    sop, alt, bass, keymode = read_midi(f, True, 64)

    # add pianorolls which have been added rest elements to the lists
    x_all.append(sop)
    y_all.append(alt)
    keymodes.append(keymode)
    files.append(f)
  # throw exception for midi data which can not be used
  except UnsupportedMidiFileException:
    print("nah")

n_notes = len(x_all) # Not sure if it is correct
print(n_notes)

# convert x_all and y_all into NumPy array to make them more useful later

# Tomb added. These three lines of code is for getting the number of pitch range
# The pitch range is 49 at this point as rest_node dim is added, refer to prof's docu
x_all_np = np.array(x_all)
print(x_all_np.shape[2])
pitch_range = x_all_np.shape[2]

# x_all = np.array(x_all)
# y_all = np.array(y_all)

# You get an error with this  code but can see the shape
# x = pd.DataFrame(y_all)
# print(x)

nah
nah
nah
495
49


### Try Creating a tf.data.Dataset from the Parsed notes

Source Datasets:
The simplest way to create a dataset is to create it from a **python list**:

In [199]:
# x_ds = tf.data.Dataset.from_tensor_slices(x_all)
# y_ds = tf.data.Dataset.from_tensor_slices(y_all)
# print(x_ds.element_spec)
# for i, f in enumerate(x_ds):
#     print(i, f.numpy())

note_ds = tf.data.Dataset.from_tensor_slices(x_all)
note_ds.element_spec

TensorSpec(shape=(64, 49), dtype=tf.float64, name=None)

In [196]:
def create_sequences(
        dataset: tf.data.Dataset,
        seq_length: int,
        vocab_size = 128,
) -> tf.data.Dataset:
    """Returns TF Dataset of sequence and label examples"""
    seq_length = seq_length + 1
    
    print(dataset)
    # Take 1 extra for the labels
    windows = dataset.window(seq_length, shift=1, stride=1, drop_remainder=True) # stride=1 is default so nothing changed
    # for window in windows:
    #     print(f"window!! {list(window.as_numpy_iterator())}") # To see the inside of each window
   
    # Flat_map falltens the "dataset of datasets" into a dataset of tensors
    flatten = lambda x:x.batch(batch_size=seq_length, drop_remainder=True) # lambda x:: Defines an anonymous function that takes an input x. and this function per se is assigned to the flatten variable
    sequences = windows.flat_map(flatten) # An extra dimension will be added by this flat_map, which is the first dimension and size of batch_size which is seq_length+1
    # for sequence in sequences:
    #     print(f"seq!!{sequence.numpy()}")
  

    # Normalize note pitch
    # For instance, if x is a list or array [a, b, c], the scaling operation i.e. division would result in [a/vocab_size, b/1.0, c/1.0].
    # def scale_pitch(x):
    #     x = x / [vocab_size, 1.0, 1.0]
    #     return x
    
    # # Split the labels
    # def split_labels(sequences): # To separates the inputs from the labels within each sequence. It returns the normalized inputs and the labels.
    #     inputs = sequences[:-1]
    #     labels_dense = sequences[-1]
    #     labels = {key:labels_dense[i] for i, key in enumerate(key_order)} # the key_order is a global variable. 

    #     return scale_pitch(inputs), labels
    
    # # In Python, if you're using map from tf.data.Dataset, the map function allows you to apply a transformation to each element of the dataset.
    # # Meaning you do not have to pass in an argument to the split_labels function even tho it expects an argumment
    # return sequences.map(split_labels, num_parallel_calls=tf.data.AUTOTUNE)
    return sequences

In [197]:
seq_ds = create_sequences(note_ds, seqlen, vocab_size) # Vocab size is not used yet as I do not know how to handle normlization with this data

print(seq_ds.element_spec)


<_TensorSliceDataset element_spec=TensorSpec(shape=(64, 49), dtype=tf.float64, name=None)>
TensorSpec(shape=(65, 64, 49), dtype=tf.float64, name=None)


In [None]:
# elm = 2
# for seq in x_seq_ds.take(1):

#   print('sequence shape:', seq.shape)
#   print(f'sequence elements (first {elm}):', seq[0: elm]) 

In [186]:
batch_size = 64
buffer_size = n_notes - seqlen

train_ds = (seq_ds
            .shuffle(buffer_size)
            # .batch(batch_size, drop_remainder=True)
            .cache()
            .prefetch(tf.data.experimental.AUTOTUNE)
            )

print(train_ds.element_spec) # (batch_size, seqlen, pitch_range)

TensorSpec(shape=(65, 64, 49), dtype=tf.float64, name=None)


In [187]:
input_shape = (seqlen, pitch_range) # (64, 49)
learning_rate = 0.005

inputs = tf.keras.Input(input_shape)
x = tf.keras.layers.LSTM(128, input_shape=input_shape, return_sequences=True)(inputs)
outputs = tf.keras.layers.Dense(1)(x)

model = tf.keras.Model(inputs, outputs)

loss = tf.keras.losses.CategoricalCrossentropy(
        from_logits=True) # To set True automatically adds softmax

optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
metrics = tf.keras.metrics.CategoricalCrossentropy()

model.compile(loss=loss,
              optimizer=optimizer,
              metrics=[metrics])
model.summary()




Model: "model_14"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_15 (InputLayer)       [(None, 64, 49)]          0         
                                                                 
 lstm_25 (LSTM)              (None, 64, 128)           91136     
                                                                 
 dense_15 (Dense)            (None, 64, 1)             129       
                                                                 
Total params: 91265 (356.50 KB)
Trainable params: 91265 (356.50 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [188]:
losses = model.evaluate(train_ds, return_dict=True)
losses



{'loss': 0.0, 'categorical_crossentropy': 0.0}

### --Experimental or Unused code Below-- 

In [None]:
"""Check the dimension of the matrix"""

# rows = len(training_data)
# columns = len(training_data[0]) if training_data else 0  # Assuming all rows have the same length

# print("Shape of the list:", rows, "x", columns, f"shape{rows, columns}")
# print("The total dimension of this matrx is:", rows*columns)

'Check the dimension of the matrix'

In [None]:
# def read_midi(filename, sop_alto, seqlen):
#   midi = pretty_midi.PrettyMIDI(filename)

#   if len(midi.key_signature_changes) !=1: # An Exception error is thrown if there is a modulation(key change)
#     raise UnsupportedMidiFileException

#   key_number = midi.key_signature_changes[0].key_number # explained in the text
#   keymode = np.array([int(key_number / 12)])

  
#   _, tempo = midi.get_tempo_changes() # _ can be written as tempo_time but won't be used w/in this function
#   if len(tempo) != 1: # counting the number of elements in the tempo array. # The Exception error thrown when tempo changes
#     raise UnsupportedMidiFileException

#   if sop_alto: # The argument is coming in as boolean, True or False
#     if len(midi.instruments) < 2: # The exception thrown if there are less than 2 parts
#       raise UnsupportedMidiFileException

#     # Get pianoRoll binary Matrix for each of Soprano, alto, bass parts
#     pr_s = get_pianoroll(midi.instruments[0], 36, 84, seqlen, tempo[0]) # Get pianoroll's arguments (midi, nn_from, nn_thru, seqlen, tempo):
#     pr_a = get_pianoroll(midi.instruments[1], 36, 84, seqlen, tempo[0])
#     pr_b = get_pianoroll(midi.instruments[2], 36, 84, seqlen, tempo[0])

#     # return pr_s, keymode
#     return pr_s, pr_a, pr_b, keymode 
    

#   else:
#     # Get a pianoroll which gathered all the parts
#     pr = get_pianoroll(midi, 36, 84, seqlen, tempo[0])
#     return pr, keymode

In [None]:
# def add_rest_nodes(pianoroll):  # If all the elemets are zero, the rest node says 1, else 0
#   print("SUM!",np.sum(pianoroll, axis=1))
#   import sys
#   sys.exit()
#   rests = 1 - np.sum(pianoroll, axis=1)
#   rests = np.expand_dims(rests, 1)

#   return np.concatenate([pianoroll, rests], axis=1)

In [None]:
# training_data = [] # the len is 496

# for each_file in glob.glob(str(data_dir/"**/*.mid*")):
#   try:
#      sop, keymode = read_midi(each_file, False, seqlen)
#      training_data.append(add_rest_nodes(sop))
     
#   except UnsupportedMidiFileException:
#      print("Nah")

In [None]:
"""About tfp, tensorflow probability"""
# import tensorflow_probability as tfp
# import tensorflow as tf

# # Create a normal distribution with mean 0 and standard deviation 1
# normal_dist = tfp.distributions.Normal(loc=0., scale=1.)

# # Sample from the distribution
# samples = normal_dist.sample(1)  # Get 10 samples from the distribution

# # Compute log probability of a value
# log_prob = normal_dist.log_prob(0.9)  # Compute log probability of 0.5 in the distribution

# # Print the samples and log probability
# print("Samples:", samples)
# print("Log Probability of 0.5:", log_prob)


'About tfp, tensorflow probability'

In [None]:
"""About flat_map by tf.data.Dataset"""
# BATCH_SIZE1=1
# dataset = tf.data.Dataset.from_tensor_slices(
#           [[1, 2, 3], [4, 5, 6], [7, 8, 9]])
# print(dataset.element_spec)
# batched = dataset.batch(batch_size=BATCH_SIZE1, drop_remainder=True)
# print(batched)


'About flat_map by tf.data.Dataset'

In [None]:
"""About model size. This gets the error so gotta figure later"""

# model = tf.keras.Sequential()
# model.add(tf.keras.layers.LSTM(128, input_shape=(10, 20, 30)))
# model.add(tf.keras.layers.Dense(1))

# model.compile(optimizer='adam', loss='mse')


In [168]:
import tensorflow as tf
inputs = tf.random.normal([7, 5, 4])
lstm = tf.keras.layers.LSTM(4, return_sequences=True)
output = lstm(inputs)
print(output.shape)

(7, 5, 4)
