In [1]:
import os
import sys
project_root = os.path.abspath(os.path.dirname(os.curdir))
project_root = os.path.dirname(project_root)

sys.path.append(project_root)

from ml.data.dataset import DrumDataset

data_dir = "dataset/processed"
data_dir = os.path.join(project_root, data_dir)
config_path = "config.yaml"
config_path = os.path.join(project_root, config_path)

dataset = DrumDataset(data_dir, config_path, include_genre=True)

AssertionError: No .npz files found in /Users/robsligter/Documents/Rug/Year 3/NN/NN_Drumming_Project/dataset/processed

In [None]:
item = dataset[0]
print("Item shape:", item[0].shape, item[1].shape)  #
print(len(dataset))

In [None]:
input_seq_len = 63
output_seq_len = 63
input_size = 20
output_size = 20

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import yaml
from torch.utils.data import DataLoader, Subset

# Load config to get seed
with open(os.path.join(project_root, 'config.yaml'), 'r') as f:
    config = yaml.safe_load(f)

# Set seeds for reproducibility
SEED = config['dataset']['seed']
torch.manual_seed(SEED)
random.seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)

print(f"Using seed: {SEED} from config file")

# Training configuration
DATASET_PERCENTAGE = 0.5  # Use 100% of dataset (change to 0.1 for 10%, 0.5 for 50%, etc.)
BATCH_SIZE = 64
EPOCHS = 64
LEARNING_RATE = 1e-3

# Check for device availability in order of preference: CUDA > MPS > CPU
if torch.cuda.is_available():
    device = torch.device('cuda')
    print(f"Using device: {device}")
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
elif torch.backends.mps.is_available():
    device = torch.device('mps')
    print(f"Using device: {device} (Apple Silicon GPU)")
else:
    device = torch.device('cpu')
    print(f"Using device: {device}")

class Seq2SeqRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=4, teacher_forcing_ratio=0.8):
        super().__init__()
        self.encoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.decoder = nn.LSTM(output_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.teacher_forcing_ratio = teacher_forcing_ratio

    def forward(self, x, y=None, target_len=1, training=False):
        # x: (batch, input_seq_len, input_size)
        # y: (batch, target_seq_len, output_size) — only available during training
        batch_size = x.size(0)
        device = x.device

        # Encode input sequence
        _, (hidden, cell) = self.encoder(x)

        # Start decoding with last known input
        decoder_input = x[:, -1:, :]  # (batch, 1, input_size)
        outputs = []

        for t in range(target_len):
            out, (hidden, cell) = self.decoder(decoder_input, (hidden, cell))
            pred = self.fc(out)  # (batch, 1, output_size)
            outputs.append(pred)

            # Decide if we use teacher forcing
            if training and y is not None and random.random() < self.teacher_forcing_ratio:
                decoder_input = y[:, t:t+1, :]  # use true value
            else:
                decoder_input = pred  # use predicted value

        return torch.cat(outputs, dim=1)  # (batch, target_len, output_size)

# Create subset of dataset based on percentage using config seed for reproducibility
dataset_size = len(dataset)
subset_size = int(dataset_size * DATASET_PERCENTAGE)

# Use seeded random generator for consistent sampling
generator = torch.Generator().manual_seed(SEED)
indices = torch.randperm(dataset_size, generator=generator)[:subset_size]
subset_dataset = Subset(dataset, indices)

print(f"Original dataset size: {dataset_size}")
print(f"Using {DATASET_PERCENTAGE*100}% of dataset: {subset_size} samples")
print(f"Subset created with seed {SEED} for reproducible sampling")

dataloader = DataLoader(subset_dataset, batch_size=BATCH_SIZE, shuffle=True)

model = Seq2SeqRNN(
    input_size=20,
    hidden_size=32,
    output_size=20,
    teacher_forcing_ratio=0.5
).to(device)  # Move model to GPU/CPU

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

print("Starting training...")
for epoch in range(EPOCHS):
    total_loss = 0
    for batch_idx, (X, Y) in enumerate(dataloader):
        # Move data to device
        X, Y = X.to(device), Y.to(device)
        
        optimizer.zero_grad()
        output = model(X, Y, target_len=Y.shape[1], training=True)
        loss = criterion(output, Y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        
        # Print progress every 10 batches
        if batch_idx % 10 == 0:
            print(f"Epoch {epoch+1}/{EPOCHS}, Batch {batch_idx}/{len(dataloader)}, Current Loss: {loss.item():.6f}", end='\r')

    avg_loss = total_loss / len(dataloader)
    print(f"\nEpoch {epoch+1}/{EPOCHS} completed - Average Loss: {avg_loss:.6f}")

In [None]:
# Plot training and validation loss
plt.figure(figsize=(18, 5))

# Loss plot
plt.subplot(1, 3, 1)
epochs_range = range(1, len(train_losses) + 1)
plt.plot(epochs_range, train_losses, 'b-', label='Training Loss', linewidth=2)
plt.plot(epochs_range, val_losses, 'r-', label='Validation Loss', linewidth=2)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True, alpha=0.3)

# Loss difference plot
plt.subplot(1, 3, 2)
loss_diff = [val - train for train, val in zip(train_losses, val_losses)]
plt.plot(epochs_range, loss_diff, 'g-', label='Val - Train Loss', linewidth=2)
plt.axhline(y=0, color='k', linestyle='--', alpha=0.5)
plt.xlabel('Epoch')
plt.ylabel('Loss Difference')
plt.title('Validation - Training Loss Difference')
plt.legend()
plt.grid(True, alpha=0.3)

# Learning rate plot
plt.subplot(1, 3, 3)
plt.plot(epochs_range, learning_rates, 'm-', label='Learning Rate', linewidth=2)
plt.xlabel('Epoch')
plt.ylabel('Learning Rate')
plt.title('Learning Rate Schedule')
plt.yscale('log')  # Log scale for better visualization
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Print final statistics
print(f"\nFinal Training Statistics:")
print(f"Final Training Loss: {train_losses[-1]:.6f}")
print(f"Final Validation Loss: {val_losses[-1]:.6f}")
print(f"Final Learning Rate: {learning_rates[-1]:.2e}")
print(f"Best Training Loss: {min(train_losses):.6f} (Epoch {train_losses.index(min(train_losses)) + 1})")
print(f"Best Validation Loss: {min(val_losses):.6f} (Epoch {val_losses.index(min(val_losses)) + 1})")

# Check for overfitting
final_diff = val_losses[-1] - train_losses[-1]
if final_diff > 0.1:
    print(f"⚠️  Potential overfitting detected: Val loss is {final_diff:.6f} higher than train loss")
elif final_diff < -0.1:
    print(f"⚠️  Unusual pattern: Val loss is {abs(final_diff):.6f} lower than train loss")
else:
    print(f"✅ Good generalization: Val and train losses are close (diff: {final_diff:.6f})")

# Check learning rate reductions
initial_lr = learning_rates[0]
final_lr = learning_rates[-1]
if final_lr < initial_lr:
    reduction_factor = initial_lr / final_lr
    print(f"📉 Learning rate was reduced by factor of {reduction_factor:.1f} during training")

In [None]:
# Get a sample from the validation set for testing
val_sample_idx = 0  # First validation sample
val_sample = val_dataset[val_sample_idx]
X_sample = val_sample[0].unsqueeze(0).to(device)  # Add batch dimension and move to device
Y_sample = val_sample[1].unsqueeze(0).to(device)  # Add batch dimension and move to device

print(f"Testing model on validation sample {val_sample_idx}")
with torch.no_grad():
    model.eval()  # Set model to evaluation mode
    future_steps = model(X_sample, target_len=Y_sample.shape[1], training=False)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Extract data from tensors
predicted = future_steps.squeeze(0).cpu().numpy()  # Remove batch dimension and convert to numpy
actual = Y_sample.squeeze(0).cpu().numpy()  # Remove batch dimension and convert to numpy
input_seq = X_sample.squeeze(0).cpu().numpy()  # Remove batch dimension and convert to numpy

# Define instrument names (based on your dataset configuration)
base_instruments = [
    "Kick",
    "Snare", 
    "HH Closed",
    "HH Open",
    "Tom L",
    "Tom M", 
    "Tom H",
    "Crash",
    "Ride"
]

# Check if we have genre information (20 features total means 9 instruments + 11 genres)
num_features = predicted.shape[1]
if num_features > len(base_instruments) and hasattr(dataset, 'genres'):
    instruments = base_instruments + dataset.genres
else:
    instruments = base_instruments[:num_features]  # Use only the available instruments

# Create the comparison plot
fig, axes = plt.subplots(3, 1, figsize=(15, 10), sharex=True)
fig.suptitle('LSTM Drum Pattern Prediction vs Actual', fontsize=16)

# Plot input sequence
im1 = axes[0].imshow(input_seq.T, aspect="auto", origin="lower", cmap="magma", vmin=0, vmax=1)
axes[0].set_yticks(range(len(instruments)))
axes[0].set_yticklabels(instruments)
axes[0].set_ylabel("Instruments")
axes[0].set_title("Input Sequence")

# Plot predicted sequence  
im2 = axes[1].imshow(predicted.T, aspect="auto", origin="lower", cmap="magma", vmin=0, vmax=1)
axes[1].set_yticks(range(len(instruments)))
axes[1].set_yticklabels(instruments)
axes[1].set_ylabel("Instruments")
axes[1].set_title("Predicted Output")

# Plot actual sequence
im3 = axes[2].imshow(actual.T, aspect="auto", origin="lower", cmap="magma", vmin=0, vmax=1)
axes[2].set_yticks(range(len(instruments)))
axes[2].set_yticklabels(instruments)
axes[2].set_xlabel("Timesteps")
axes[2].set_ylabel("Instruments")
axes[2].set_title("Actual Output")

# Add colorbar
plt.colorbar(im3, ax=axes, orientation='horizontal', pad=0.1, shrink=0.8, label='Activation')

plt.tight_layout()
plt.show()

# Print some statistics
mse = np.mean((predicted - actual) ** 2)
mae = np.mean(np.abs(predicted - actual))
print(f"\nPrediction Statistics:")
print(f"Mean Squared Error: {mse:.6f}")
print(f"Mean Absolute Error: {mae:.6f}")
print(f"Input sequence shape: {input_seq.shape}")
print(f"Predicted sequence shape: {predicted.shape}")
print(f"Actual sequence shape: {actual.shape}")