In [1]:
!pip install tensorflow-tpu -f https://storage.googleapis.com/libtpu-tf-releases/index.html --force

Looking in links: https://storage.googleapis.com/libtpu-tf-releases/index.html
Collecting tensorflow-tpu
  Downloading tensorflow_tpu-2.18.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.1 kB)
Collecting absl-py>=1.0.0 (from tensorflow-tpu)
  Downloading absl_py-2.3.1-py3-none-any.whl.metadata (3.3 kB)
Collecting astunparse>=1.6.0 (from tensorflow-tpu)
  Downloading astunparse-1.6.3-py2.py3-none-any.whl.metadata (4.4 kB)
Collecting flatbuffers>=24.3.25 (from tensorflow-tpu)
  Downloading flatbuffers-25.2.10-py2.py3-none-any.whl.metadata (875 bytes)
Collecting gast!=0.5.0,!=0.5.1,!=0.5.2,>=0.2.1 (from tensorflow-tpu)
  Downloading gast-0.6.0-py3-none-any.whl.metadata (1.3 kB)
Collecting google-pasta>=0.1.1 (from tensorflow-tpu)
  Downloading google_pasta-0.2.0-py3-none-any.whl.metadata (814 bytes)
Collecting libclang>=13.0.0 (from tensorflow-tpu)
  Downloading libclang-18.1.1-py2.py3-none-manylinux2010_x86_64.whl.metadata (5.2 kB)
Collecting opt-einsum>=2.3.2 (f

In [1]:
!pip install pretty_midi



In [2]:
# imports
import os
import pretty_midi
import pandas as pd
import numpy as np

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, ConfusionMatrixDisplay, make_scorer

from random import sample
from warnings import catch_warnings
from concurrent.futures import ThreadPoolExecutor
from random import sample

In [3]:
import tensorflow as tf
print(tf.__version__)

 This a JAX bug; please report an issue at https://github.com/jax-ml/jax/issues
  _warn(f"cloud_tpu_init failed: {exc!r}\n This a JAX bug; please report "


2.18.0


In [4]:
# tensorflow imports
from tensorflow import keras
from keras import Sequential, layers, utils, callbacks
from keras.layers import Conv1D, MaxPooling1D, GlobalAveragePooling1D, Dense, Input, Normalization, Flatten, Dropout
from keras.utils import to_categorical
from keras.callbacks import EarlyStopping

In [5]:
try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver(tpu="local")
    print('Device:', tpu.master())
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.TPUStrategy(tpu)
except:
    strategy = tf.distribute.get_strategy()
print('Number of replicas:', strategy.num_replicas_in_sync)

Device: 
Number of replicas: 8


In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
# DATA_DIR = "../selectedcomposers"

# google drive folder
DATA_DIR = "/content/drive/MyDrive/selectedcomposers"

composer_list = ["Bach", "Beethoven", "Chopin", "Mozart"]

# get all of the files so we can run a concurrent batch to speed up processing
file_label_pairs = []
for composer in os.listdir(DATA_DIR):
    composer_dir = os.path.join(DATA_DIR, composer)
    if not os.path.isdir(composer_dir):
        continue
    for root, _, files in os.walk(composer_dir):
        for fn in files:
            if fn.lower().endswith((".mid", ".midi")):
                file_label_pairs.append((os.path.join(root, fn), composer))

In [9]:
frequency = 20
seq_size = frequency * 10

# pad the given sequence with zeros if it is too short
def add_padding(seq, x, missing):
  right = np.zeros((x, missing))

  return np.hstack((seq,right))

# get all of the valid sequences from the provided piano roll data.
def get_sequences(piano_data):
  x, y = piano_data.shape
  sequences = []
  if(y < seq_size):
    sequence = add_padding(piano_data, x, seq_size-y)
    sequences.append(sequence)
  else:
    start = 0
    end = 0
    while(end < y):
      end = start + seq_size
      if end > y:
        sequence = piano_data[:,start:y]
        padded = add_padding(sequence, x, (start + seq_size - y))
        sequences.append(padded)
      else:
        sequence = piano_data[:,start:end]
        sequences.append(sequence)
      start += 10

  return sequences

# generate the midi object, remove any invalid notes,
# then get the sequences from the piano roll data
def get_seq_data(file_path):
  midi = pretty_midi.PrettyMIDI(file_path)
  midi.remove_invalid_notes()
  return get_sequences(midi.get_piano_roll(fs=frequency))

In [10]:
midi_data = []

for (path, composer) in file_label_pairs:
  midi_data.append({
    'composer': composer,
    'sequences': get_seq_data(path)
  })



In [11]:
midi_df = pd.DataFrame(midi_data)

print(f"\nTotal shape: {midi_df.shape}")
print("\nCount by composer:")
print(midi_df['composer'].value_counts())


Total shape: (1627, 2)

Count by composer:
composer
Bach         1024
Mozart        255
Beethoven     212
Chopin        136
Name: count, dtype: int64


In [20]:
# get just the sequence data for each composer and the target row
seq_data = []
for midi in midi_data:
    for seq in midi['sequences']:
        seq_data.append({
            'sequence': np.transpose(seq),
            'composer': midi['composer']
        })

seq_df = pd.DataFrame(seq_data)
seq_df['composer'].value_counts()

Unnamed: 0_level_0,count
composer,Unnamed: 1_level_1
Bach,298248
Beethoven,201956
Mozart,199234
Chopin,57416


In [21]:
rand_sample = 15000
sampled_df = seq_df.groupby("composer", group_keys=False).sample(n=rand_sample, random_state=511)
sampled_df['composer'].value_counts()

Unnamed: 0_level_0,count
composer,Unnamed: 1_level_1
Bach,15000
Beethoven,15000
Chopin,15000
Mozart,15000


In [22]:
X = np.stack(sampled_df['sequence'].values)
y = sampled_df['composer'].values

# shuffle the dataset with replicable seed
indices = np.arange(X.shape[0])
np.random.seed(23)
np.random.shuffle(indices, )

X_shuffled = X[indices]
y_shuffled = y[indices]

In [23]:
# label encode the labels
ohe = OneHotEncoder()
y = ohe.fit_transform(y_shuffled.reshape(-1, 1))
y = y.toarray()
y

array([[0., 1., 0., 0.],
       [0., 1., 0., 0.],
       [0., 1., 0., 0.],
       ...,
       [0., 1., 0., 0.],
       [1., 0., 0., 0.],
       [0., 0., 1., 0.]])

In [24]:
# split into train/test/val
TEST_TRAIN_SPLIT = 0.8
TEST_VAL_SPLIT = 0.1
total = len(X_shuffled)

# train, test, and val data
X_train = X_shuffled[:int(total * TEST_TRAIN_SPLIT)].copy()
y_train = y[:int(total * TEST_TRAIN_SPLIT)].copy()
X_test = X_shuffled[int(total * (TEST_TRAIN_SPLIT + TEST_VAL_SPLIT)):].copy()
y_test = y[int(total * (TEST_TRAIN_SPLIT + TEST_VAL_SPLIT)):].copy()
X_val = X_shuffled[int(total * TEST_TRAIN_SPLIT):int(total * (TEST_TRAIN_SPLIT + TEST_VAL_SPLIT))].copy()
y_val = y[int(total * TEST_TRAIN_SPLIT):int(total * (TEST_TRAIN_SPLIT + TEST_VAL_SPLIT))].copy()

print("Train / Val / Test shapes:",
      X_train.shape, X_val.shape, X_test.shape)

Train / Val / Test shapes: (48000, 200, 128) (6000, 200, 128) (6000, 200, 128)


In [None]:
# Save the train, test, and val data to an external file.
np.savez_compressed(
    '../featured datasets/data_splits.npz',
    X_train=X_train, y_train=y_train,
    X_val=X_val,     y_val=y_val,
    X_test=X_test,   y_test=y_test
)

In [None]:
# Fetch the train, test, and val data and load it in.
data = np.load('data_splits.npz')
X_train = data['X_train']
y_train = data['y_train']
X_val   = data['X_val']
y_val   = data['y_val']
X_test  = data['X_test']
y_test  = data['y_test']

In [26]:
# global training parameters
NUM_EPOCHS = 50
BATCH_SIZE = 32
LEARNING_RATE = 0.001

with strategy.scope():
  cnn_model = tf.keras.Sequential([

    # tf.keras.layers.Normalization(axis=None),

    tf.keras.layers.Conv1D(256, kernel_size=3, activation='relu', padding='causal'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.MaxPooling1D(2),

    tf.keras.layers.Conv1D(256, kernel_size=3, activation='relu', padding='causal'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.MaxPooling1D(2),

    tf.keras.layers.Conv1D(128, kernel_size=3, activation='relu', padding='causal'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.MaxPooling1D(2),

    tf.keras.layers.Conv1D(128, kernel_size=3, activation='relu', padding='causal'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.MaxPooling1D(2),

    tf.keras.layers.Conv1D(64, kernel_size=3, activation='relu', padding='causal'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.MaxPooling1D(2),

    tf.keras.layers.Conv1D(64, kernel_size=3, activation='relu', padding='causal'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.MaxPooling1D(2),


    tf.keras.layers.Flatten(),

    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dropout(0.3),
    tf.keras.layers.Dense(len(composer_list), activation='softmax')
  ])

  # Compile the model
  cnn_model.compile(
      optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
      loss='categorical_crossentropy',
      metrics=['categorical_accuracy', 'precision', 'recall', 'f1_score']
  )

In [27]:
# Create the callback
early_stop = EarlyStopping(
    monitor='val_categorical_accuracy',
    patience=5,
    mode='max',
    min_delta=0.0001,
    restore_best_weights=True
)

# setup checkpoint
checkpoint = keras.callbacks.ModelCheckpoint(
  filepath='/content/drive/MyDrive/AAI-511/best_cnn.keras',
  monitor='val_categorical_accuracy',
  mode='max',
  save_best_only=True
)

history = cnn_model.fit(
  X_train, y_train,
  validation_data=(X_val,y_val),
  epochs=NUM_EPOCHS,
  batch_size=BATCH_SIZE,
  callbacks=[early_stop, checkpoint]
)

Epoch 1/50
[1m1500/1500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 14ms/step - categorical_accuracy: 0.3634 - f1_score: 0.3500 - loss: 1.3821 - precision: 0.5597 - recall: 0.0934 - val_categorical_accuracy: 0.3662 - val_f1_score: 0.3014 - val_loss: 1.2433 - val_precision: 0.7704 - val_recall: 0.0850
Epoch 2/50
[1m1500/1500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 12ms/step - categorical_accuracy: 0.5941 - f1_score: 0.5800 - loss: 0.9572 - precision: 0.7148 - recall: 0.3969 - val_categorical_accuracy: 0.5052 - val_f1_score: 0.4644 - val_loss: 1.2066 - val_precision: 0.7183 - val_recall: 0.2668
Epoch 3/50
[1m1500/1500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 12ms/step - categorical_accuracy: 0.6670 - f1_score: 0.6578 - loss: 0.8215 - precision: 0.7449 - recall: 0.5319 - val_categorical_accuracy: 0.5630 - val_f1_score: 0.5439 - val_loss: 1.0924 - val_precision: 0.6881 - val_recall: 0.3772
Epoch 4/50
[1m1500/1500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m

In [28]:
loss, cat_acc, precision, recall, f1 = cnn_model.evaluate(X_test, y_test)

[1m188/188[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 11ms/step - categorical_accuracy: 0.7862 - f1_score: 0.7877 - loss: 0.6195 - precision: 0.8376 - recall: 0.6780


In [29]:
print(f'Loss: {loss}\nAccuracy: {cat_acc}\nPrecision: {precision}\nRecall: {recall},\nF1: {f1}')

Loss: 0.6194760203361511
Accuracy: 0.7861666679382324
Precision: 0.8375540375709534
Recall: 0.6779999732971191,
F1: [0.85863084 0.6921562  0.934495   0.6656626 ]
