In [1]:
import zipfile
import os

def unzip_file(zip_file_path, extract_to_path):
    """Unzips a zip file to a specified directory.

    Args:
        zip_file_path (str): The path to the zip file.
        extract_to_path (str): The path to the directory where the contents will be extracted.
    """
    try:
        with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
            zip_ref.extractall(extract_to_path)
        print(f"Successfully unzipped '{zip_file_path}' to '{extract_to_path}'")
    except FileNotFoundError:
        print(f"Error: The zip file '{zip_file_path}' was not found.")
    except zipfile.BadZipFile:
        print(f"Error: '{zip_file_path}' is not a valid zip file.")
    except Exception as e:
        print(f"An error occurred during unzipping: {e}")


zip_file = "/kaggle/input/pianonotescnn-preprocessing/_output_.zip"
extract_dir = "/kaggle/temp"
unzip_file(zip_file, extract_dir)

Successfully unzipped '/kaggle/input/pianonotescnn-preprocessing/_output_.zip' to '/kaggle/temp'


In [2]:
import os
import shutil

def clear_working_directory(directory="/kaggle/working/"):
    """Removes all files and directories within the specified directory."""
    try:
        for item in os.listdir(directory):
            item_path = os.path.join(directory, item)
            if os.path.isfile(item_path):
                os.remove(item_path)
                print(f"Removed file: {item_path}")
            elif os.path.isdir(item_path):
                shutil.rmtree(item_path)
                print(f"Removed directory: {item_path}")
        print(f"Successfully cleared the contents of: {directory}")
    except FileNotFoundError:
        print(f"Error: Directory not found: {directory}")
    except OSError as e:
        print(f"Error: Could not clear directory {directory}. {e}")

# Example usage:
# clear_working_directory()

clear_working_directory()

Removed file: /kaggle/working/__notebook__.ipynb
Successfully cleared the contents of: /kaggle/working/


In [3]:
%pip install pretty_midi


Collecting pretty_midi
  Downloading pretty_midi-0.2.10.tar.gz (5.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m45.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting mido>=1.1.16 (from pretty_midi)
  Downloading mido-1.3.3-py3-none-any.whl.metadata (6.4 kB)
Downloading mido-1.3.3-py3-none-any.whl (54 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.6/54.6 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pretty_midi
  Building wheel for pretty_midi (setup.py) ... [?25l[?25hdone
  Created wheel for pretty_midi: filename=pretty_midi-0.2.10-py3-none-any.whl size=5592287 sha256=704311086728804d84fd5d56361c4fc33d098988c48bd78125d2507af99f89f8
  Stored in directory: /root/.cache/pip/wheels/cd/a5/30/7b8b7f58709f5150f67f98fde4b891ebf0be9ef07a8af49f25
Successfully built pretty_midi
Installing collected packages: mido, pretty_

In [4]:
# # MAESTRO Model Training Script
#
# This script trains a model for piano note recognition using the
# preprocessed data from the MAESTRO dataset.
import os
import numpy as np
import tensorflow as tf
# Import mixed precision module
from keras import layers, models, mixed_precision
import json
import glob
from sklearn.model_selection import train_test_split
import pretty_midi # Ensure this is installed if running locally

# ## 1. Configuration

# Path to the processed data directory (must match your preprocessing script)
PROCESSED_DATA_PATH = "/kaggle/temp/random_frames"
# Load dataset info to get parameters used during preprocessing
# DATASET_INFO_PATH = os.path.join(PROCESSED_DATA_PATH, 'dataset_info.json') # dataset_info.json is not created in random sampling

# Model & Training Parameters
# Input shape depends on the mel spectrogram dimensions from preprocessing
# (n_mels, time_steps_in_mel_spectrogram)
# Let's load one sample to determine the shape
# Adjust BATCH_SIZE and EPOCHS based on your system resources and desired training time
BATCH_SIZE = 128
EPOCHS = 100# Start with a small number and increase as needed
N_KEYS = 88  # Number of piano keys (output size)

SAMPLE_RATE = 16000
DURATION = 0.1 # You mentioned changing this
HOP_LENGTH = 512
N_FFT = 512 # You mentioned changing this
N_MELS = 64 # You mentioned changing this
FMIN = 27.5
FMAX = 4186.0
TOTAL_FRAMES_TO_SAVE = 150000

# ## Data Loading Block (Added by User)
import tensorflow as tf
import numpy as np
import glob
import os

# Path to your processed data
PROCESSED_DATA_PATH = "/kaggle/temp/random_frames"
FEATURES_DIR = os.path.join(PROCESSED_DATA_PATH, 'features')
LABELS_DIR = os.path.join(PROCESSED_DATA_PATH, 'labels')



def calculate_class_weights(note_counts, max_weight=10.0): # Keeping the max_weight parameter for consistency, though it won't be used here
    """
    Returns a constant weight of 3 for the positive class of each note.
    The note_counts and max_weight parameters are kept for consistency with the original function signature
    but are not used in this implementation.
    """
    class_weights = np.full(88, 3.0, dtype=np.float32) # Create an array of 88 elements, all set to 3.0
    return class_weights


# Calculate class weights
class_weights = calculate_class_weights(0, 3.0)
print("\nCalculated Class Weights (Positive Class):", class_weights)

# 1. Collect all feature and label file paths
feature_files = sorted(glob.glob(os.path.join(FEATURES_DIR, '*.npz')))
label_files = sorted(glob.glob(os.path.join(LABELS_DIR, '*.npy')))

# 2. Load all data into lists (or NumPy arrays)
all_features = []
all_labels = []

print("Loading data into memory...")
for feature_file, label_file in zip(feature_files, label_files):
    with np.load(feature_file) as data:
        all_features.append(data['mel_spectrogram'])
    all_labels.append(np.load(label_file))

# Convert lists to NumPy arrays
all_features_np = np.array(all_features, dtype=np.float32)
all_labels_np = np.array(all_labels, dtype=np.float32).squeeze(axis=1) # Assuming you fixed the label shape

# *** ADD THIS LINE HERE ***
all_features_np = np.expand_dims(all_features_np, axis=-1) # Add a channel dimension at the end

print("Converting to TensorFlow tensors...")
# 3. Convert NumPy arrays to TensorFlow tensors
all_features_tensor = tf.convert_to_tensor(all_features_np)
all_labels_tensor = tf.convert_to_tensor(all_labels_np)
# 4. Move tensors to the GPU (if available)
if tf.config.list_physical_devices('GPU'):
    with tf.device('/GPU:0'):  # Assuming you have one GPU
        gpu_features = all_features_tensor
        gpu_labels = all_labels_tensor
        print("Data loaded onto GPU.")
else:
    print("No GPU found, data remains on CPU.")

# ## 4. Define the Model Architecture (Simple CNN Example)
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers # Make sure regularizers is imported if used directly

def build_model(input_shape, num_classes):
    """
    Builds a corrected CNN-GRU model for piano note recognition.

    Assumes input_shape is (N_MELS, time_steps, 1), where time_steps=4
    based on the previous error diagnosis.

    Args:
        input_shape (tuple): The shape of the input tensor (e.g., (64, 4, 1)).
        num_classes (int): The number of output classes (e.g., 88 piano keys).

    Returns:
        tf.keras.Model: The Keras model.
    """
    # input_shape[0] = N_MELS (e.g., 64)
    # input_shape[1] = time_steps (e.g., 4)
    # input_shape[2] = channels (e.g., 1)
    if input_shape[1] != 4:
        print(f"Warning: build_model expects input_shape with 4 time steps based on previous error,"
              f" but received input_shape={input_shape}. Shapes might mismatch if data isn't (N_MELS, 4, 1).")

    model = models.Sequential(name="PianoNoteCNN_GRU")
    model.add(layers.Input(shape=input_shape)) # Expecting (None, 64, 4, 1)

    # --- Convolutional Blocks ---
    # Each MaxPooling2D(pool_size=(2, 1)) halves the first dimension (height/N_MELS)
    # It leaves the second dimension (width/time_steps) unchanged.
    model.add(layers.Conv2D(32, (3, 3), padding='same',
                             kernel_regularizer=regularizers.L2(0.0005)))
    model.add(layers.BatchNormalization())
    model.add(layers.Activation('relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 1))) # Shape: (None, 32, 4, 32)
    model.add(layers.Dropout(0.25))

    model.add(layers.Conv2D(64, (3, 3), padding='same',
                             kernel_regularizer=regularizers.L2(0.0005)))
    model.add(layers.BatchNormalization())
    model.add(layers.Activation('relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 1))) # Shape: (None, 16, 4, 64)
    model.add(layers.Dropout(0.25))

    model.add(layers.Conv2D(128, (3, 3), padding='same',
                             kernel_regularizer=regularizers.L2(0.001)))
    model.add(layers.BatchNormalization())
    model.add(layers.Activation('relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 1))) # Shape: (None, 8, 4, 128)
    model.add(layers.Dropout(0.25))

    # New Deeper Convolutional Block
    model.add(layers.Conv2D(256, (3, 3), padding='same',
                             kernel_regularizer=regularizers.L2(0.001)))
    model.add(layers.BatchNormalization())
    model.add(layers.Activation('relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 1))) # Shape: (None, 4, 4, 256)
    model.add(layers.Dropout(0.25))

    # --- Prepare for 1D Processing ---
    # Transpose to put time dimension first after batch: (batch, time, features...)
    # Input shape: (None, pooled_height, time_steps, channels) -> (None, 4, 4, 256)
    # Target axes: (batch, time_steps, pooled_height, channels) -> (0, 2, 1, 3)
    model.add(layers.Permute((2, 1, 3))) # Shape: (None, 4, 4, 256)

    # Reshape to combine the pooled_height and channel dimensions into a single feature dimension
    # Input shape: (None, 4, 4, 256) <- (batch, time_steps, pooled_height, channels)
    # Target shape: (batch, time_steps, pooled_height * channels)
    pooled_height_dim = input_shape[0] // (2**4) # 64 / 16 = 4
    last_conv_filters = 256
    num_features_for_1d = pooled_height_dim * last_conv_filters # 4 * 256 = 1024

    # target_shape should be (time_steps, num_features_for_1d)
    # input_shape[1] is the original time_steps dimension (which is 4)
    model.add(layers.Reshape(target_shape=(input_shape[1], num_features_for_1d))) # Target: (4, 1024), Output shape: (None, 4, 1024)

    # --- 1D Convolution and Recurrent Layers ---
    # Process the sequence along the time dimension (4 steps)
    # Input shape: (None, 4, 1024)
    model.add(layers.Conv1D(128, 3, padding='same', # Kernel size 3 on 4 steps is okay
                             kernel_regularizer=regularizers.L2(0.001)))
    model.add(layers.BatchNormalization())
    model.add(layers.Activation('relu'))
    model.add(layers.MaxPooling1D(pool_size=2)) # Reduces time dimension: Output shape (None, 2, 128)

    # GRU Layer processes the sequence output by Conv1D/Pool1D
    # Input shape: (None, 2, 128)
    model.add(layers.GRU(128)) # Output shape (None, 128) - GRU outputs the final state by default

    # --- Dense Layers for Classification ---
    model.add(layers.Dense(256, activation='relu',
                             kernel_regularizer=regularizers.L2(0.001)))
    model.add(layers.BatchNormalization())
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(num_classes, activation='sigmoid', dtype='float32')) # Sigmoid for multi-label classification

    return model

# ## 5. Define Custom Weighted Loss Function

def weighted_binary_crossentropy(pos_weights):
    """
    pos_weights: a NumPy array of shape (num_classes,) representing
                 the weight for each class when the true label is 1.
    Returns a loss function that applies weighted binary crossentropy.
    """
    pos_weights = tf.constant(pos_weights, dtype=tf.float32)
    def loss(y_true, y_pred):
        # Compute standard binary crossentropy for each element
        bce = tf.keras.backend.binary_crossentropy(y_true, y_pred)
        # Weight positive labels using pos_weights; negatives remain 1
        weight_matrix = y_true * pos_weights + (1 - y_true)
        weighted_bce = bce * weight_matrix
        return tf.reduce_mean(weighted_bce)
    return loss

# ## 6. Load Data Splits & Compute Per-Class Weights

print("\nLoading data splits...")

# Split the loaded tensors into training, validation, and test sets
train_features, temp_features, train_labels, temp_labels = train_test_split(
    gpu_features.numpy() if tf.config.list_physical_devices('GPU') else all_features_np,
    gpu_labels.numpy() if tf.config.list_physical_devices('GPU') else all_labels_np,
    test_size=0.2, random_state=42
)
val_features, test_features, val_labels, test_labels = train_test_split(
    temp_features, temp_labels, test_size=0.5, random_state=42
)

print(f"Found {len(train_features)} training samples.")
print(f"Found {len(val_features)} validation samples.")
print(f"Found {len(test_features)} test samples.")

# Create tf.data datasets from the tensors
BATCH_SIZE = 128 # Let's use a larger batch size now that data is in memory

train_dataset = tf.data.Dataset.from_tensor_slices((train_features, train_labels))
train_dataset = train_dataset.shuffle(len(train_features)).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

val_dataset = tf.data.Dataset.from_tensor_slices((val_features, val_labels))
val_dataset = val_dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

test_dataset = tf.data.Dataset.from_tensor_slices((test_features, test_labels))
test_dataset = test_dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

if not train_features.shape[0] > 0:
    print("\nError: No training data loaded. Cannot proceed with training.")
    exit()
if not val_features.shape[0] > 0:
    print("\nWarning: No validation data loaded. Proceeding without validation split during training.")

# ## 7. Build & Compile the Model with Custom Loss
INPUT_SHAPE = (N_MELS, 4, 1)
model = build_model(INPUT_SHAPE, N_KEYS)

# Create custom loss using computed per-class positive weights
custom_loss = weighted_binary_crossentropy(class_weights)

# Use LossScaleOptimizer for mixed precision training
optimizer = tf.keras.optimizers.Adam(learning_rate = 0.0001)
optimizer = mixed_precision.LossScaleOptimizer(optimizer)

model.compile(optimizer=optimizer,
            loss=custom_loss,  # Using custom weighted loss
            # loss=tf.keras.losses.BinaryCrossentropy(),
            metrics=[tf.keras.metrics.Precision(name='precision'),
                     tf.keras.metrics.Recall(name='recall'),
                     tf.keras.metrics.AUC(name='auc')])

model.summary()

# ## 8. Train the Model

print("\nStarting training...")

# Optional: Add callbacks like EarlyStopping, ModelCheckpoint
callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
    tf.keras.callbacks.ModelCheckpoint('/kaggle/working/best.keras', save_best_only=True, monitor='val_loss'), # Save best model
    tf.keras.callbacks.ModelCheckpoint('/kaggle/working/final.keras', save_freq='epoch', save_best_only=False) # Save final model
]

history = model.fit(
    train_dataset,
    epochs=EPOCHS,
    validation_data=val_dataset,
    callbacks=callbacks if val_dataset else []
)
print("\nTraining finished.")

# ## 9. Evaluate the Model (Optional)

if test_dataset:
    print("\nEvaluating on test set...")
    test_loss, test_precision, test_recall, test_auc = model.evaluate(test_dataset)
    print(f"\nTest Loss: {test_loss}")
    print(f"Test Precision: {test_precision}")
    print(f"Test Recall: {test_recall}")
    print(f"Test AUC: {test_auc}")
else:
    print("\nNo test data found. Skipping final evaluation.")

# ## 10. Save the Model (Optional)

# The models are already saved by the callbacks
print("\nBest model saved to /kaggle/working/best.keras")
print("Final model saved to /kaggle/working/final.keras")


Calculated Class Weights (Positive Class): [3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3.
 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3.
 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3.
 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3. 3.]
Loading data into memory...
Converting to TensorFlow tensors...
Data loaded onto GPU.

Loading data splits...
Found 120000 training samples.
Found 15000 validation samples.
Found 15000 test samples.



Starting training...
Epoch 1/100
[1m938/938[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m35s[0m 23ms/step - auc: 0.5172 - loss: 1.2829 - precision: 0.0367 - recall: 0.3564 - val_auc: 0.7472 - val_loss: 0.5673 - val_precision: 0.0000e+00 - val_recall: 0.0000e+00
Epoch 2/100
[1m938/938[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 22ms/step - auc: 0.7290 - loss: 0.4931 - precision: 0.1560 - recall: 0.0011 - val_auc: 0.7907 - val_loss: 0.3487 - val_precision: 0.8367 - val_recall: 9.1518e-04
Epoch 3/100
[1m938/938[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 22ms/step - auc: 0.7901 - loss: 0.3347 - precision: 0.3259 - recall: 0.0289 - val_auc: 0.8460 - val_loss: 0.2781 - val_precision: 0.6821 - val_recall: 0.1031
Epoch 4/100
[1m938/938[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 22ms/step - auc: 0.8352 - loss: 0.2816 - precision: 0.4544 - recall: 0.1467 - val_auc: 0.8867 - val_loss: 0.2424 - val_precision: 0.6023 - val_recall: 0.2485
Epoch 5/100
[1m93