In [1]:
import pandas as pd
import os
import librosa
import pretty_midi
import numpy as np
import tensorflow as tf
import datetime


path = r'F:\Dataset_open_topic\maestro-v3.0.0_all\maestro-v3.0.0'
file = "maestro-v3.0.0.csv"
file_name = os.path.join(path, file)
df = pd.read_csv(file_name)  # or your cleaned version


In [2]:
train_df = df[df['split'] == 'train']
val_df   = df[df['split'] == 'validation']
test_df  = df[df['split'] == 'test']

In [3]:
train_paths = list(zip(train_df['audio_filename'], train_df['midi_filename']))
val_paths   = list(zip(val_df['audio_filename'], val_df['midi_filename']))
test_paths  = list(zip(test_df['audio_filename'], test_df['midi_filename']))

# Example: get full path
full_audio_path = os.path.join(path, train_paths[0][0])
full_midi_path = os.path.join(path, train_paths[0][1])

In [4]:
# --- Configuration ---
SR = 22050
HOP_LENGTH_SEC = 0.01
HOP_LENGTH = int(HOP_LENGTH_SEC * SR)
FMIN = 27.5
BINS_PER_OCTAVE = 36
N_BINS = 267
NUM_CLASSES = 128
WINDOW_SIZE = 9
STRIDE = 1

# Training 
EPOCH = 100
BATCH_SIZE = 256
LEARNING_RATE = 0.0001

In [None]:

def load_audio_and_midi(audio_path, midi_path):
    # Compute CQT
    y, _ = librosa.load(audio_path, sr=SR)
    C = librosa.cqt(
        y, sr=SR, hop_length=HOP_LENGTH, fmin=FMIN,
        n_bins=N_BINS, bins_per_octave=BINS_PER_OCTAVE
    )
    C_dB = librosa.amplitude_to_db(np.abs(C), ref=np.max)

    # Compute Piano Roll
    midi = pretty_midi.PrettyMIDI(midi_path)
    piano_roll = midi.get_piano_roll(fs=SR / HOP_LENGTH)

    # Align length
    n_frames = min(C_dB.shape[1], piano_roll.shape[1])
    return C_dB[:, :n_frames], piano_roll[:, :n_frames]


In [None]:
from tqdm import tqdm
cqt_list, piano_list = [], []
for audio_rel, midi_rel in tqdm(train_paths[:10]):
    cqt, piano = load_audio_and_midi(os.path.join(path, audio_rel),
                                     os.path.join(path, midi_rel))
    cqt_list.append(cqt)
    piano_list.append(piano)

cqt_train = np.concatenate(cqt_list, axis=1)
piano_train = np.concatenate(piano_list, axis=1)
np.savez_compressed('train_data.npz', cqt=cqt_train, piano=piano_train)

In [5]:
data = np.load('train_data.npz')

cqt_train = data['cqt']
piano_train = data['piano']

In [6]:
# from tqdm import tqdm

# path_train = r'F:\Dataset_open_topic\train_individual'
# cqt_train = []
# piano_train = []
# i = 0
# for file in tqdm(os.listdir(path_train)):
#     # print(file)
#     if i < 10:
#         file = os.path.join(path_train, file)
#         data = np.load(file)
#         cqt_train.append(data['cqt'])
#         piano_train.append(data['piano'])
#     i += 1

# cqt_train = np.concatenate(cqt_train, axis=1)
# piano_train = np.concatenate(piano_train, axis=1)

In [7]:
def create_sliding_windows(data, window_size=9, stride=1, pad_mode='edge', constant_value=0):
    pad = window_size // 2
    pad_width = ((0, 0), (pad, pad))

    if pad_mode == 'constant':
        padded_data = np.pad(data, pad_width, mode='constant', constant_values=constant_value)
    else:
        padded_data = np.pad(data, pad_width, mode=pad_mode)

    windows = np.array([
        padded_data[:, i:i + window_size]
        for i in range(0, data.shape[1], stride)
    ])
    
    return windows[..., np.newaxis]


def create_binary_labels(midi_windows, threshold=0):
    return (np.max(midi_windows, axis=2) > threshold).astype(np.float32)

In [8]:
X_input = create_sliding_windows(cqt_train, WINDOW_SIZE, STRIDE)
Y_windows = create_sliding_windows(piano_train, WINDOW_SIZE, STRIDE, pad_mode='constant', constant_value=0)
Y_output = create_binary_labels(Y_windows)

In [9]:
print("Input shape :", X_input.shape)   # e.g., (n, 267, 9, 1)
print("Output shape:", Y_output.shape)  # e.g., (n, 128)

Input shape : (551665, 267, 9, 1)
Output shape: (551665, 128, 1)


In [10]:
# # --- Model Training ---
# log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
# tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=0)

model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(10, (16, 2), activation='relu', padding='valid', input_shape=(N_BINS, WINDOW_SIZE, 1)),
    tf.keras.layers.MaxPooling2D((2, 1)),
    tf.keras.layers.Conv2D(20, (11, 3), activation='relu', padding='valid'),
    tf.keras.layers.MaxPooling2D((2, 1)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(256, activation='relu'),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(NUM_CLASSES, activation='sigmoid')
])


from tensorflow.keras import backend as K
from tensorflow.keras.metrics import AUC
from tensorflow.keras.callbacks import EarlyStopping


def f1_metric(y_true, y_pred):
    """Custom F1 score metric (approximated for use in training logs)."""
    y_pred_bin = K.round(y_pred)
    tp = K.sum(K.cast(y_true * y_pred_bin, 'float32'))
    predicted_positives = K.sum(K.cast(y_pred_bin, 'float32'))
    possible_positives = K.sum(K.cast(y_true, 'float32'))

    precision = tp / (predicted_positives + K.epsilon())
    recall = tp / (possible_positives + K.epsilon())
    f1 = 2 * (precision * recall) / (precision + recall + K.epsilon())
    return f1

# model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
#               loss='binary_crossentropy',
#               metrics=['binary_accuracy'])

# from tensorflow.keras.callbacks import EarlyStopping
# early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# model.compile(
#     optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
#     loss='binary_crossentropy',
#     metrics=[
#         'binary_accuracy',   # optional but still there
#         f1_metric,           # custom F1
#         AUC(name='auc'),     # area under curve (optional)
#     ]
# )

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
    loss='binary_crossentropy',
    metrics=[
        'binary_accuracy',
        f1_metric,
        AUC(name='auc'),
    ]
)
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 252, 8, 10)        330       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 126, 8, 10)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 116, 6, 20)        6620      
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 58, 6, 20)        0         
 2D)                                                             
                                                                 
 flatten (Flatten)           (None, 6960)              0         
                                                                 
 dense (Dense)               (None, 256)               1

In [11]:
# 1. Create a timestamped log directory
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

# 2. Create the TensorBoard callback
tensorboard_callback = tf.keras.callbacks.TensorBoard(
    log_dir=log_dir,
    histogram_freq=1,          # Set to 1 if you want weight histograms (optional)
    write_graph=True,          # Log the model graph (default = True)
    update_freq='epoch'        # or 'batch' if you want more frequent updates
)

early_stopping = EarlyStopping(monitor='val_loss', patience=10, min_delta=1e-4, restore_best_weights=True)

model.fit(
    X_input, Y_output,
    epochs=10,
    batch_size=BATCH_SIZE,
    validation_split=0.2,
    callbacks=[early_stopping, tensorboard_callback]
)

model.save("master_10_0.h5")

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [9]:
model.save("master_Earlystop_auc_100ep.h5")