Input Layer (Piano note sequence)
     |
     v
RNN Layer 1 (Processes sequence data and captures temporal dependencies)
     |
     v
ReLU (Applies non-linearity)
     |
     v
RNN Layer 2 (Processes higher-level temporal features)
     |
     v
ReLU (Applies non-linearity)
     |
     v
Flatten Layer (Converts sequence output to 1D)
     |
     v
Fully Connected Layer 1 (Learns complex patterns)
     |
     v
ReLU (Applies non-linearity)
     |
     v
Dropout Layer (Prevents overfitting)
     |
     v
Fully Connected Layer 2 (Learns more complex patterns)
     |
     v
ReLU (Applies non-linearity)
     |
     v
Output Layer (Softmax) (Produces probability distribution over classes)


In [1]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader, random_split
import torch.optim as optim
import random

class PositionalEncoding(nn.Module):
    def __init__(self, model_dim, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, model_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, model_dim, 2).float() * (-np.log(10000.0) / model_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return x

class TraditionalRNNModel(nn.Module):
    def __init__(self, input_dim, model_dim, rnn_hidden_dim, n_classes, dropout=0.1):
        super(TraditionalRNNModel, self).__init__()
        self.model_dim = model_dim
        self.input_projection = nn.Linear(input_dim, model_dim)  
        self.pos_encoder = PositionalEncoding(model_dim)
        
        self.rnn = nn.RNN(model_dim, rnn_hidden_dim, num_layers=2, batch_first=True, dropout=dropout, bidirectional=True)  
        self.attention = nn.MultiheadAttention(rnn_hidden_dim * 2, num_heads=8, dropout=dropout, batch_first=True)
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(rnn_hidden_dim * 2 * sequence_length, 512),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(512, n_classes)
        )
        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        for layer in self.classifier:
            if isinstance(layer, nn.Linear):
                layer.bias.data.zero_()
                layer.weight.data.uniform_(-initrange, initrange)

    def forward(self, src):
        src = self.input_projection(src) * np.sqrt(self.model_dim)  
        src = self.pos_encoder(src)
        rnn_output, _ = self.rnn(src)
        attn_output, _ = self.attention(rnn_output, rnn_output, rnn_output)
        output = self.classifier(attn_output)
        return output

class PianoDataset(Dataset):
    def __init__(self, csv_file, sequence_length=100, augment=False):
        self.df = pd.read_csv(csv_file)
        self.df = pd.concat([self.df.drop('event_type', axis=1), pd.get_dummies(self.df['event_type'], prefix='event_type')], axis=1)
        self.df = self.df.astype(np.float32)  
        self.sequence_length = sequence_length
        self.augment = augment
        self.data = self.generate_sequences()

    def generate_sequences(self):
        input_sequences = []
        target_events = []
        for i in range(len(self.df) - self.sequence_length):
            full_sequence = self.df.iloc[i:i+self.sequence_length+1]
            input_sequence = full_sequence.iloc[:-1]
            target_event = full_sequence.iloc[-1]
            input_sequences.append(input_sequence.values)
            target_events.append(target_event.values.argmax())  
        return list(zip(input_sequences, target_events))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sequence, target = self.data[idx]
        if self.augment:
            sequence = self.augment_sequence(sequence)
        sequence_tensor = torch.tensor(sequence, dtype=torch.float32)
        target_tensor = torch.tensor(target, dtype=torch.long)
        return sequence_tensor, target_tensor

    def augment_sequence(self, sequence):
        # Transposition
        if random.random() < 0.5:
            transpose_amount = random.randint(-5, 5)
            sequence[:, 0] += transpose_amount  
        
        # Time-stretching
        if random.random() < 0.5:
            stretch_factor = random.uniform(0.8, 1.2)
            sequence[:, 1] *= stretch_factor  
        
        # Adding Noise
        if random.random() < 0.5:
            noise = np.random.normal(0, 0.01, sequence.shape)
            sequence += noise
        
        return sequence

def train_valid_test_split(dataset, train_ratio=0.7, valid_ratio=0.15, test_ratio=0.15):
    assert train_ratio + valid_ratio + test_ratio == 1, "Ratios must sum to 1"
    
    train_size = int(train_ratio * len(dataset))
    valid_size = int(valid_ratio * len(dataset))
    test_size = len(dataset) - train_size - valid_size
    
    train_dataset, valid_dataset, test_dataset = random_split(dataset, [train_size, valid_size, test_size])
    return train_dataset, valid_dataset, test_dataset

# Define model parameters
input_dim = 10  
model_dim = 128  
rnn_hidden_dim = 256  
n_classes = 8  
sequence_length = 100

# Set device and initialize model and optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TraditionalRNNModel(input_dim, model_dim, rnn_hidden_dim, n_classes).to(device)

# Initialize weights
def initialize_weights(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_uniform_(m.weight)
        m.bias.data.fill_(0.01)

model.apply(initialize_weights)

# Set optimizer with a lower learning rate and L2 regularization
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5)

# Define learning rate scheduler
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

# Define loss function
criterion = nn.CrossEntropyLoss()

# Create dataset and dataloaders
output_csv = 'D:\\Projects\\Resources\\midis_v1.2\\Beethoven\\Beethoven_note_sequence.csv'
full_dataset = PianoDataset(output_csv, sequence_length, augment=True)

# Split the dataset
train_dataset, valid_dataset, test_dataset = train_valid_test_split(full_dataset)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)  # Increased batch size
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Training function with gradient clipping and early stopping
def train_model(model, train_loader, valid_loader, optimizer, scheduler, num_epochs=50, patience=10):
    model.train()
    best_accuracy = 0.0
    patience_counter = 0

    for epoch in range(num_epochs):
        running_loss = 0.0
        correct_predictions = 0
        total_predictions = 0
        print(f'Starting epoch {epoch+1}/{num_epochs}')
        
        for batch_idx, (inputs, targets) in enumerate(train_loader):
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            
            # Ensure outputs and targets are correctly shaped
            if outputs.shape[0] != targets.shape[0]:
                print(f"Shape mismatch: outputs shape {outputs.shape}, targets shape {targets.shape}")
                continue
            
            loss = criterion(outputs, targets)
            loss.backward()
            # Apply gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)  
            
            # Calculate accuracy
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == targets).sum().item()
            total_predictions += targets.size(0)

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_accuracy = correct_predictions / total_predictions
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}')
        
        # Validate the model
        val_loss, val_accuracy = evaluate_model(model, valid_loader)
        
        # Adjust learning rate
        scheduler.step()
        
        # Early stopping
        if val_accuracy > best_accuracy:
            best_accuracy = val_accuracy
            patience_counter = 0
            torch.save(model.state_dict(), 'best_model_rnn.pth')
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f'Early stopping at epoch {epoch+1}')
                break

def evaluate_model(model, dataloader):
    model.eval()  # Set the model to evaluation mode
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            running_loss += loss.item() * inputs.size(0)  
            
            # Calculate accuracy
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == targets).sum().item()
            total_predictions += targets.size(0)

    total_loss = running_loss / len(dataloader.dataset)
    accuracy = correct_predictions / total_predictions
    print(f'Evaluation Loss: {total_loss:.4f}, Accuracy: {accuracy:.4f}')
    model.train()  # Restore the model to training mode
    return total_loss, accuracy

# Train the model with high patience early stopping and save the best model
train_model(model, train_loader, valid_loader, optimizer, scheduler, num_epochs=100, patience=10)

# Load the best model and evaluate on the test set
model.load_state_dict(torch.load('best_model_rnn.pth'))
evaluate_model(model, test_loader)


Starting epoch 1/100
Epoch 1/100, Loss: 0.9466, Accuracy: 0.6127
Evaluation Loss: 0.8872, Accuracy: 0.6291
Starting epoch 2/100
Epoch 2/100, Loss: 0.8664, Accuracy: 0.6430
Evaluation Loss: 0.8439, Accuracy: 0.6505
Starting epoch 3/100
Epoch 3/100, Loss: 0.8398, Accuracy: 0.6541
Evaluation Loss: 0.8361, Accuracy: 0.6538
Starting epoch 4/100
Epoch 4/100, Loss: 0.8216, Accuracy: 0.6620
Evaluation Loss: 0.8208, Accuracy: 0.6631
Starting epoch 5/100
Epoch 5/100, Loss: 0.8044, Accuracy: 0.6689
Evaluation Loss: 0.8223, Accuracy: 0.6589
Starting epoch 6/100
Epoch 6/100, Loss: 0.7890, Accuracy: 0.6746
Evaluation Loss: 0.8190, Accuracy: 0.6627
Starting epoch 7/100
Epoch 7/100, Loss: 0.7721, Accuracy: 0.6820
Evaluation Loss: 0.8204, Accuracy: 0.6625
Starting epoch 8/100
Epoch 8/100, Loss: 0.7521, Accuracy: 0.6899
Evaluation Loss: 0.8263, Accuracy: 0.6615
Starting epoch 9/100
Epoch 9/100, Loss: 0.7255, Accuracy: 0.7014
Evaluation Loss: 0.8323, Accuracy: 0.6626
Starting epoch 10/100
Epoch 10/100, L

(0.8175448838658113, 0.6634765430459179)