In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
import numpy as np
import music21
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import Adam

import pickle

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
from sklearn.preprocessing import StandardScaler

In [None]:
from tensorflow.keras.utils import to_categorical

In [None]:
# 1. Data Preprocessing
def extract_notes_and_durations(file_path):
    midi = music21.converter.parse(file_path)
    notes = []
    durations = []

    for element in midi.flatten():
        if isinstance(element, music21.note.Note):
            notes.append(element.pitch.midi)
            durations.append(element.duration.quarterLength)
        elif isinstance(element, music21.chord.Chord):
            notes.append(element.sortAscending().pitches[-1].midi)
            durations.append(element.duration.quarterLength)

    return notes, durations


In [None]:
def process_midi_files(directory):
    all_notes = []
    all_durations = []

    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith('.mid') or file.endswith('.midi'):
                try:
                    path = os.path.join(root, file)
                    notes, durations = extract_notes_and_durations(path)
                    all_notes.extend(notes)
                    all_durations.extend(durations)
                except Exception as e:
                    print(f"Error processing {file}: {str(e)}")

    return all_notes, all_durations

In [None]:
def create_sequences(notes, durations, sequence_length):
    X = []
    y_notes = []
    y_durations = []
    for i in range(0, len(notes) - sequence_length, 1):
        X.append(list(zip(notes[i:i + sequence_length], durations[i:i + sequence_length])))
        y_notes.append(notes[i + sequence_length])
        y_durations.append(durations[i + sequence_length])
    return np.array(X), np.array(y_notes), np.array(y_durations)

In [None]:
# 2. Model Definition
def create_model(input_shape, vocab_size):
    model = Sequential([
        LSTM(256, input_shape=input_shape, return_sequences=True),
        Dropout(0.3),
        LSTM(256),
        Dropout(0.3),
        Dense(256, activation='relu'),
        Dense(vocab_size, activation='softmax')
    ])

    return model

In [None]:
def save_model_and_data(model, notes, vocab_size, sequence_length, output_dir):
    os.makedirs(output_dir, exist_ok=True)

    # Save the model
    model.save(os.path.join(output_dir, 'music_model.h5'))

    # Save the notes and other necessary data
    data = {
        'notes': notes,
        'vocab_size': vocab_size,
        'sequence_length': sequence_length
    }
    with open(os.path.join(output_dir, 'music_data.pkl'), 'wb') as f:
        pickle.dump(data, f)

In [None]:
def accuracy(y_true, y_pred):
    return tf.keras.metrics.sparse_categorical_accuracy(y_true, y_pred)

In [None]:
def tune_hyperparameters(X, y):
    best_lr = 0.001
    best_batch_size = 64
    best_epochs = 5
    best_val_acc = 0.0

    # Simplified grid search
    for lr in [0.001, 0.01, 0.1]:
        for batch_size in [32, 64, 128]:
            model = create_model(X.shape[1:], vocab_size)
            optimizer = Adam(learning_rate=lr)
            model.add(Dense(128, activation='softmax'))
            model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])

            y_train_onehot = to_categorical(y_train)
            history = model.fit(X, y_train_onehot, validation_split=0.2, epochs=5, batch_size=batch_size, verbose=0)

            val_acc = max(history.history['val_accuracy'])
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                best_lr = lr
                best_batch_size = batch_size
                best_epochs = len(history.history['val_accuracy'])

    return best_lr, best_batch_size, best_epochs

In [None]:
data_directory = '/content/drive/MyDrive/clean_midi'
output_directory = '/content/drive/MyDrive'
sequence_length = 100

In [None]:
# Process MIDI files
print("Processing MIDI files...")
all_notes, all_durations = process_midi_files(data_directory)

In [None]:
# Create sequences
print("Creating sequences...")
X, y_notes, y_durations = create_sequences(all_notes, all_durations, sequence_length)

In [None]:
# Prepare data for model
vocab_size = max(all_notes) + 1
X = np.array([[[n/vocab_size, d] for n, d in seq] for seq in X]).astype(np.float32)

In [None]:
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y_notes, test_size=0.2, random_state=42)

# Tune hyperparameters
print("Tuning hyperparameters...")
best_lr, best_batch_size, best_epochs = tune_hyperparameters(X_train, y_train)


In [None]:
# Create and train model
print("Creating and training model...")
model = create_model((X.shape[1], X.shape[2]), vocab_size)
optimizer = Adam(learning_rate=best_lr)
model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizer, metrics=[accuracy])


In [None]:
# Callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
model_checkpoint = ModelCheckpoint(os.path.join(output_directory, 'best_model.keras'), save_best_only=True)

# Train model
history = model.fit(
    X_train, y_train,
    validation_split=0.2,
    epochs=5,
    batch_size=best_batch_size,
    callbacks=[early_stopping, model_checkpoint]
) #supposed to use best_epochs but cannot run without crashing

In [None]:
# Train model
history_test = model.fit(
    X_test, y_test,
    validation_split=0.2,
    epochs=5,
    batch_size=best_batch_size,
    callbacks=[early_stopping, model_checkpoint]
) #supposed to use best_epochs but cannot run without crashing

In [None]:
# Save model and data
model.save(os.path.join(output_directory, 'm_model.keras'))
with open(os.path.join(output_directory, 'training_hist.pkl'), 'wb') as f:
    pickle.dump(history.history, f)

print(f"Model and data saved in {output_directory}")