In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import pandas as pd
import os

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# --------------------------
# Load and preprocess data
# --------------------------

# Load transformed CSV
df = pd.read_csv("output/maestro_transformed.csv")
print("Data loaded. Shape:", df.shape)

# Drop 'song_id' column (not needed for training)
df = df.drop(columns=["song_id"])

# Convert DataFrame to NumPy array
data = df.to_numpy()
print(f"Converted to NumPy. Shape: {data.shape}")

# Ensure correct sequence length (each song has timestamps * 4 features)
input_dim = 4  # Features per timestamp
sequence_length = data.shape[1] // input_dim  # Dynamically calculate timestamps per sample

# You can set a fixed number of timestamps to reduce memory usage.
# For example, limit the sequence length to 300 timestamps.
max_sequence_length = 500  # Adjust this based on your system's capability
sequence_length = min(sequence_length, max_sequence_length)  # Ensure it doesn't exceed the max length

print(f"Adjusted sequence length: {sequence_length}")

# Reshape data to (num_samples, sequence_length, input_dim)
data = data[:, :sequence_length * input_dim]  # Trim or pad data to match the new sequence length
data = data.reshape(data.shape[0], sequence_length, input_dim)
print("Data reshaped to:", data.shape)

# ------------------------------
# Prepare for Auto-Regressive Training
# ------------------------------

# Use first T-1 tokens as source and the last T-1 as target.
src = data[:, :-1, :]  # (num_samples, sequence_length-1, 4)
tgt = data[:, 1:, :]   # (num_samples, sequence_length-1, 4)
print("Source and target created. src shape:", src.shape, ", tgt shape:", tgt.shape)

# Split data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(src, tgt, test_size=0.2, random_state=12)

# Convert to PyTorch tensors and move to device
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.float32).to(device)
X_val = torch.tensor(X_val, dtype=torch.float32).to(device)
y_val = torch.tensor(y_val, dtype=torch.float32).to(device)

print("Data prepared and moved to device.")


Data loaded. Shape: (1276, 100305)
Converted to NumPy. Shape: (1276, 100304)
Adjusted sequence length: 800
Data reshaped to: (1276, 800, 4)
Source and target created. src shape: (1276, 799, 4) , tgt shape: (1276, 799, 4)
Data prepared and moved to device.


In [2]:
print("Sample src:", src[0, :5, :])  # First 5 timestamps
print("Sample tgt:", tgt[0, :5, :])  # Next 5 timestamps


Sample src: [[0.52873563 0.00139645 0.00257095 0.40944882]
 [0.5862069  0.00253395 0.00270782 0.52755906]
 [0.52873563 0.00309808 0.00448898 0.30708661]
 [0.45977011 0.00309438 0.00460736 0.30708661]
 [0.65517241 0.00304999 0.00511415 0.51181102]]
Sample tgt: [[0.5862069  0.00253395 0.00270782 0.52755906]
 [0.52873563 0.00309808 0.00448898 0.30708661]
 [0.45977011 0.00309438 0.00460736 0.30708661]
 [0.65517241 0.00304999 0.00511415 0.51181102]
 [0.57471264 0.00305924 0.00614253 0.35433071]]


In [16]:
import torch
import torch.nn as nn
import numpy as np

# --------------------------
# Positional Encoding Class
# --------------------------
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len, dropout=0.1):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        self.register_buffer('pe', pe.unsqueeze(0))  # (1, max_len, d_model)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

# --------------------------
# Transformer Model
# --------------------------
class MusicTransformer(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_layers, max_seq_len, output_dim, dropout=0.2):
        super(MusicTransformer, self).__init__()
        
        # Embedding layer to project input features to model dimension
        self.embedding = nn.Linear(input_dim, model_dim)
        self.pos_encoder = PositionalEncoding(model_dim, max_seq_len, dropout)
        
        # Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=model_dim, nhead=num_heads, dropout=dropout, batch_first=True,activation=nn.LeakyReLU(negative_slope=0.01))
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        # Transformer Decoder
        decoder_layer = nn.TransformerDecoderLayer(
            d_model=model_dim, nhead=num_heads, dropout=dropout, batch_first=True, activation=nn.LeakyReLU(negative_slope=0.01))
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
        
        # Final linear layer
        self.fc_out = nn.Linear(model_dim, output_dim)
        
        # Layer normalization for stability
        self.norm = nn.LayerNorm(model_dim)

    def forward(self, src, tgt, tgt_mask=None):
        """
        src: (batch_size, seq_len, input_dim)
        tgt: (batch_size, seq_len, input_dim)
        tgt_mask: (seq_len, seq_len) optional, causal mask for decoder
        """
        # Embed input sequences
        src = self.pos_encoder(self.embedding(src))  # (batch_size, seq_len, model_dim)
        tgt = self.pos_encoder(self.embedding(tgt))

        # Encode source sequence
        memory = self.transformer_encoder(self.norm(src))

        # If no tgt_mask is provided, generate one
        if tgt_mask is None:
            tgt_mask = self.generate_square_subsequent_mask(tgt.size(1), tgt.device)

        # Decode the target sequence using the encoder memory
        output = self.transformer_decoder(self.norm(tgt), memory, tgt_mask)
        
        # Project output back to the original feature dimension
        return self.fc_out(output)

    @staticmethod
    def generate_square_subsequent_mask(size, device):
        """Generates a causal mask to prevent attending to future tokens."""
        return torch.triu(torch.full((size, size), float('-inf'), device=device), diagonal=1)


In [4]:
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
import os

# --------------------------
# Model, Loss, and Optimizer
# --------------------------
# Model parameters
model_dim = 128
num_heads = 8
num_layers = 4
output_dim = input_dim  # For auto-regression

max_seq_len = sequence_length  # Ensure this matches the expected input length

# Initialize the model with batch_first=True
model = MusicTransformer(input_dim, model_dim, num_heads, num_layers, max_seq_len, output_dim).to(device)
print(model)

# Create DataLoaders (batch size of 32)
BATCH_SIZE = 16
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Define loss and optimizer
criterion = nn.MSELoss()  
optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=20, eta_min=1e-5)  # Adjusted T_max
#scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.8)

# --------------------------
# Checkpoint Setup
# --------------------------
os.makedirs('output', exist_ok=True)
model_name = f"best_model_dim{model_dim}_heads{num_heads}_layers{num_layers}.pth"
checkpoint_path = f'output/{model_name}'

# Initialize training state variables
best_val_loss = float('inf')
epochs_without_improvement = 0
early_stopping_patience = 100
loss_improvement_threshold = 1e-4  

# Load checkpoint if exists
if os.path.exists(checkpoint_path):
    print(f"Loading model checkpoint from {checkpoint_path}...")
    checkpoint = torch.load(checkpoint_path, map_location=device)  # Ensures compatibility across devices
    
    saved_model_dim = checkpoint.get('model_dim')
    saved_num_heads = checkpoint.get('num_heads')
    saved_num_layers = checkpoint.get('num_layers')

    if (saved_model_dim == model_dim and saved_num_heads == num_heads and saved_num_layers == num_layers):
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
        best_val_loss = checkpoint.get('best_val_loss', float('inf'))
        epochs_without_improvement = checkpoint.get('epochs_without_improvement', 0)
        print("Checkpoint loaded successfully. Continuing training.")
    else:
        print("Architecture mismatch detected. Initializing a new model with the new configuration.")
else:
    print("No previous weights found. Starting training from scratch.")




MusicTransformer(
  (embedding): Linear(in_features=4, out_features=128, bias=True)
  (pos_encoder): PositionalEncoding(
    (dropout): Dropout(p=0.2, inplace=False)
  )
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-3): 4 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=128, out_features=128, bias=True)
        )
        (linear1): Linear(in_features=128, out_features=2048, bias=True)
        (dropout): Dropout(p=0.2, inplace=False)
        (linear2): Linear(in_features=2048, out_features=128, bias=True)
        (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.2, inplace=False)
        (dropout2): Dropout(p=0.2, inplace=False)
        (activation): LeakyReLU(negative_slope=0.01)
      )
    )
  )
  (transformer_decoder): TransformerDecoder(
    (layers):

In [None]:
from torch.utils.tensorboard import SummaryWriter

# Initialize TensorBoard
#writer = SummaryWriter("runs/music_transformer_experiment")

# --------------------------
# Training Loop
# --------------------------
num_epochs = 500


print("Training started")

for epoch in range(num_epochs):
    model.train()
    total_train_loss = 0.0

    for src_batch, tgt_batch in train_loader:
        src_batch, tgt_batch = src_batch.to(device), tgt_batch.to(device)

        optimizer.zero_grad()
        
        decoder_input = tgt_batch[:, :-1, :]
        ground_truth = tgt_batch[:, 1:, :]

        tgt_mask = model.generate_square_subsequent_mask(decoder_input.size(1), device)

        output = model(src_batch, decoder_input, tgt_mask=tgt_mask)
        loss = criterion(output, ground_truth)

        # Backpropagation
        loss.backward()

        # Log Gradients & Weights to TensorBoard after computing gradients
        """for name, param in model.named_parameters():
            if param.grad is not None:
                writer.add_histogram(f"Gradients/{name}", param.grad.cpu().detach(), epoch)
            writer.add_histogram(f"Weights/{name}", param.cpu().detach().numpy(), epoch)"""
        #for name, param in model.named_parameters():
            #if param.requires_grad:
                #print(f"{name} - Mean: {param.data.mean()}, Std: {param.data.std()}")
        #for name, param in model.named_parameters():
            #if param.grad is not None:
                #print(f"{name} - Gradient mean: {param.grad.mean()}, Gradient std: {param.grad.std()}")



        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        total_train_loss += loss.item()

    avg_train_loss = total_train_loss / len(train_loader)

    # --------------------------
    # Validation Phase
    # --------------------------
    model.eval()
    total_val_loss = 0.0
    with torch.no_grad():
        for src_batch, tgt_batch in val_loader:
            src_batch, tgt_batch = src_batch.to(device), tgt_batch.to(device)

            decoder_input = tgt_batch[:, :-1, :]
            ground_truth = tgt_batch[:, 1:, :]
            tgt_mask = model.generate_square_subsequent_mask(decoder_input.size(1), device)

            output = model(src_batch, decoder_input, tgt_mask=tgt_mask)
            loss = criterion(output, ground_truth)
            total_val_loss += loss.item()

    avg_val_loss = total_val_loss / len(val_loader)

    # Log metrics to TensorBoard
    #writer.add_scalar("Loss/Train", avg_train_loss, epoch)
    #writer.add_scalar("Loss/Validation", avg_val_loss, epoch)
    #writer.add_scalar("Learning Rate", optimizer.param_groups[0]["lr"], epoch)

    print(f"Epoch {epoch + 1}/{num_epochs}")
    print(f"Training Loss: {avg_train_loss:.4f}")
    print(f"Validation Loss: {avg_val_loss:.4f}")

    # Model Checkpointing
    # --------------------------
    if avg_val_loss < best_val_loss - loss_improvement_threshold:
        improvement = best_val_loss - avg_val_loss
        best_val_loss = avg_val_loss
        epochs_without_improvement = 0  # Reset counter

        torch.save({
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict(),
            'best_val_loss': best_val_loss,
            'epochs_without_improvement': epochs_without_improvement,
            'model_dim': model_dim,
            'num_heads': num_heads,
            'num_layers': num_layers
        }, checkpoint_path)
        print(f"Saved improved model. Improvement: {improvement:.6f}")
    
    else:
        epochs_without_improvement += 1
        print(f"No improvement. Epochs without improvement: {epochs_without_improvement}")

        if epochs_without_improvement >= early_stopping_patience:
            print(f"Early stopping triggered after {epoch + 1} epochs.")
            break

# Close TensorBoard writer
writer.close()


Training started


In [7]:

import torch
model_checkpoint = torch.load('output/best_model_dim256_heads8_layers4.pth')

# Check if 'loss' is stored in the checkpoint dictionary
if 'best_val_loss' in model_checkpoint:
    print(f"Last loss from checkpoint: {model_checkpoint['best_val_loss']}")
else:
    print("Loss is not stored in the checkpoint.")


Last loss from checkpoint: 0.006994853261858225


  model_checkpoint = torch.load('output/best_model_dim256_heads8_layers4.pth')


In [45]:
import torch
import numpy as np
import pandas as pd
import json

# Define device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the trained model
model_path = "output/best_model_dim128_heads8_layers4.pth"
model = MusicTransformer(
    input_dim=4,
    model_dim=128,
    num_heads=8,
    num_layers=4,
    max_seq_len=500,
    output_dim=4
).to(device)

# Load model checkpoint
checkpoint = torch.load(model_path, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()

# Load MIDI normalization params
with open("output/normalization_params.json", "r") as file:
    normalization_params = json.load(file)

midi_end_times = list(normalization_params["midi_end_times"].values())
estimated_midi_end_time = np.median(midi_end_times) if midi_end_times else 60  # Default 60 sec


# ==========================
# Generate a Structured Seed Sequence
# ==========================
def generate_seed_sequence(estimated_midi_end_time):
    """Creates a simple, coherent starting melody pattern with discrete velocities.
    
    Args:
        estimated_midi_end_time (float): The maximum end time used for normalization.
                                         This should match the normalization applied to the training data.
    """
    pitches = [60, 64, 67, 72]  # C4, E4, G4, C5
    start_times = [0.2, 0.5, 0.8, 0.11]  # Start times in seconds
    end_times = [0.4, 0.7, 0.10, 0.13]    # End times in seconds
    velocities = [64, 80, 100, 120]     # Discrete MIDI velocities

    # Normalize values
    velocity_normalized = [v / 127 for v in velocities]  # Normalize velocity to [0, 1]
    start_times_normalized = [s / estimated_midi_end_time for s in start_times]  # Normalize start times
    end_times_normalized = [e / estimated_midi_end_time for e in end_times]      # Normalize end times

    seed = np.array([
        [(p - 21) / (108 - 21), s, e, v]  # Normalize pitch, start time, end time, velocity
        for p, s, e, v in zip(pitches, start_times_normalized, end_times_normalized, velocity_normalized)
    ], dtype=np.float32)

    return torch.tensor(seed).unsqueeze(0).to(device)  # Shape: (1, seq_len, 4)

# Initialize seed sequence
seed_sequence = generate_seed_sequence(estimated_midi_end_time)
print("estimated_midi_end_time", estimated_midi_end_time)
print("Seed Sequence:", seed_sequence)

# ==========================
# Generate New Music with Temperature
# ==========================
generated_length = 493  # Total number of timesteps to generate (seed + new predictions)
temperature = 0.8

# Initialize generated_music with the seed sequence
seed_np = seed_sequence.squeeze(0).cpu().numpy()  # Convert seed sequence to numpy array
generated_music = [seed_np[i] for i in range(seed_np.shape[0])]  # Add seed sequence to generated_music

# Start with the seed sequence
current_sequence = seed_sequence

# Generate new music
with torch.no_grad():
    for t in range(generated_length - seed_sequence.size(1)):  # Adjust loop length
        src = current_sequence
        
        # Predict next timestep
        output = model(src, src)[:, -1, :]  # Shape: (1, 4)
        output = output.squeeze(0).cpu().numpy()

        # Apply temperature scaling
        scaled_output = output / temperature

        # Store generated step
        predicted_timestep = np.array([
            np.clip(scaled_output[0], 0, 1),  # Pitch (normalized)
            np.clip(scaled_output[1], 0, 1),  # Start time
            np.clip(scaled_output[2], 0, 1),  # End time
            np.clip(scaled_output[3], 0, 1)   # Velocity (normalized)
        ], dtype=np.float32)

        generated_music.append(predicted_timestep)

        # Append generated step to sequence
        predicted_timestep_tensor = torch.tensor(predicted_timestep).unsqueeze(0).unsqueeze(0).to(device)
        current_sequence = torch.cat((current_sequence, predicted_timestep_tensor), dim=1)

# Convert generated music into a **single row** DataFrame
flattened_values = np.array(generated_music).flatten()  # Convert to 1D array
column_names = [f"{feature}_{i+1}" for i in range(generated_length) for feature in ["pitch", "start", "end", "velocity"]]

df_flat = pd.DataFrame([flattened_values], columns=column_names)

# Save normalized output
normalized_csv = "output/generated_music_flat.csv"
df_flat.to_csv(normalized_csv, index=False)
print(f"Generated music saved to '{normalized_csv}'.")

# =============================================================================
# DENORMALIZATION
# =============================================================================
def denormalize_data(df, generated_length, estimated_midi_end_time):
    """Convert normalized values back to MIDI-compatible values."""
    df = df.copy()

    for i in range(generated_length):
        df[f"pitch_{i+1}"] = (df[f"pitch_{i+1}"] * (108 - 21) + 21).clip(21, 108).round().astype(int)
        df[f"velocity_{i+1}"] = (df[f"velocity_{i+1}"] * 127).clip(0, 127).round().astype(int)  # Make sure it's discrete
        df[f"start_{i+1}"] *= estimated_midi_end_time
        df[f"end_{i+1}"] *= estimated_midi_end_time

    print("Denormalization complete.")
    return df


# Load & denormalize generated music
df_denormalized = denormalize_data(df_flat, generated_length, estimated_midi_end_time)

# Save denormalized data
denormalized_csv = "output/denormalized_music_flat.csv"
df_denormalized.to_csv(denormalized_csv, index=False)
print(f"Denormalized music saved to '{denormalized_csv}'.")

  checkpoint = torch.load(model_path, map_location=device)


estimated_midi_end_time 430.8352864583333
Seed Sequence: tensor([[[4.4828e-01, 4.6421e-04, 9.2843e-04, 5.0394e-01],
         [4.9425e-01, 1.1605e-03, 1.6248e-03, 6.2992e-01],
         [5.2874e-01, 1.8569e-03, 2.3211e-04, 7.8740e-01],
         [5.8621e-01, 2.5532e-04, 3.0174e-04, 9.4488e-01]]])
Generated music saved to 'output/generated_music_flat.csv'.
Denormalization complete.
Denormalized music saved to 'output/denormalized_music_flat.csv'.


In [46]:


df=pd.read_csv(denormalized_csv)
df

Unnamed: 0,pitch_1,start_1,end_1,velocity_1,pitch_2,start_2,end_2,velocity_2,pitch_3,start_3,...,end_491,velocity_491,pitch_492,start_492,end_492,velocity_492,pitch_493,start_493,end_493,velocity_493
0,60,0.2,0.4,64,64,0.5,0.7,80,67,0.8,...,43.168747,127,108,41.62538,41.97353,127,108,40.98625,41.1912,127


In [47]:
import pretty_midi
import pandas as pd
import os

def denormalized_to_midi(input_csv, output_dir, log_file="output/log.txt", num_timestamps=493):
    """
    Converts denormalized music data from a CSV file into MIDI files.
    Each row represents a song, with per-note features: pitch, start, end, velocity.

    Args:
        input_csv (str): Path to the input CSV file with denormalized data.
        output_dir (str): Directory to save output MIDI files.
        log_file (str): Path to the log file for logs.
        num_timestamps (int): Number of timestamps (notes) per song.
    """
    try:
        data = pd.read_csv(input_csv)
        os.makedirs(output_dir, exist_ok=True)  # Ensure output directory exists

        # Prepare logging
        log_messages = [f"Processing started.\nTotal Songs in CSV: {len(data)}\n"]

        for idx, row in data.iterrows():
            try:
                midi = pretty_midi.PrettyMIDI()
                instrument = pretty_midi.Instrument(program=0)  # Default: Acoustic Grand Piano
                note_count = 0

                for i in range(1, num_timestamps + 1):
                    # Retrieve features
                    pitch = row.get(f"pitch_{i}")
                    start = row.get(f"start_{i}")
                    end = row.get(f"end_{i}")
                    velocity = row.get(f"velocity_{i}")

                    # Ensure all values are valid numbers
                    if pd.notna(pitch) and pd.notna(start) and pd.notna(end) and pd.notna(velocity):
                        pitch = int(round(pitch))  # Ensure integer
                        velocity = int(round(min(max(velocity, 0), 127)))  # Clip velocity
                        start, end = float(start), float(end)  # Ensure float

                        if 21 <= pitch <= 108 and start >= 0 and end > start:
                            note = pretty_midi.Note(
                                velocity=velocity,
                                pitch=pitch,
                                start=start,
                                end=end
                            )
                            instrument.notes.append(note)
                            note_count += 1

                if note_count > 0:
                    midi.instruments.append(instrument)
                    output_midi_path = os.path.join(output_dir, f"song_{idx + 1}.mid")
                    midi.write(output_midi_path)
                    log_messages.append(f"Song {idx + 1}: {note_count} notes added. MIDI saved at {output_midi_path}")
                else:
                    log_messages.append(f"Skipping song {idx + 1}: No valid notes found.")

            except Exception as row_error:
                log_messages.append(f"Error processing song {idx + 1}: {row_error}")

        # Save logs after processing all songs
        with open(log_file, "w") as log:
            log.write("\n".join(log_messages) + "\n")

    except Exception as e:
        with open(log_file, "a") as log:  # Append error logs instead of overwriting
            log.write(f"Error occurred: {e}\n")

# Example Usage
input_csv_path = "output/denormalized_music_flat.csv"
output_midi_dir = "output/midi_files"
denormalized_to_midi(input_csv_path, output_midi_dir)


In [None]:
import pygame

def play_midi_with_stop(midi_file):
    """
    Play a MIDI file and stop on user input.
    
    Args:
        midi_file (str): Path to the MIDI file.
    """
    try:
        pygame.mixer.init()
        pygame.mixer.music.load(midi_file)
        pygame.mixer.music.play()

        print(f"Playing {midi_file}...")
        print("Press Enter to stop playback.")
        input()  # Wait for user input to stop
        pygame.mixer.music.stop()
        print("Playback stopped.")

    except Exception as e:
        print(f"Error: {e}")
    finally:
        pygame.mixer.quit()

# Example Usage
midi_file_path = "output/midi_files/song_1.mid"
play_midi_with_stop(midi_file_path)


pygame 2.6.1 (SDL 2.30.7, Python 3.12.0)
Hello from the pygame community. https://www.pygame.org/contribute.html
Playing output/midi_files/song_1.mid...
Press Enter to stop playback.
