# Recurrent Neural Networks

Grzegorz Statkiewicz, Mateusz Matukiewicz

## Overview

The structure of the direcotry should be as follows:

```
.
├── data
│   ├── train.pkl
│   └── test_no_target.pkl
└── main.ipynb
```



## Setup

Select the device to use

In [1]:
!nvidia-smi

Sun May 25 13:14:57 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 560.35.02              Driver Version: 560.94         CUDA Version: 12.6     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA GeForce GTX 1660 Ti     On  |   00000000:1C:00.0  On |                  N/A |
|  0%   48C    P8             18W /  130W |     875MiB /   6144MiB |      4%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [2]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"



In [3]:
import pickle
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import numpy as np
import os
import random
from torch.nn.utils.rnn import pad_sequence

device = torch.device("cuda") if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using device: {device}")

Using device: cuda


### Config for reproductivity

In [4]:
SEED = 42

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)
    torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

## Data preparation

Load the data

In [5]:
train_path = "data/train.pkl"

In [6]:
def load_data(file_path):
    """Loads data from a pickle file."""
    try:
        with open(file_path, "rb") as f:
            data = pickle.load(f)
        return data
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return None


In [7]:
train_data = load_data(train_path)

print(f"Loaded {len(train_data)} training samples.")

print(train_data[0])

Loaded 2939 training samples.
(array([ -1.,  -1.,  -1., ...,  78.,  40., 144.], shape=(4756,)), 0)


In [8]:
compositors = {0: 'bach', 1: 'beethoven', 2: 'debussy', 3: 'scarlatti', 4: 'victoria'}
num_classes = len(compositors)

In [9]:
import numpy as np

sequences = [torch.tensor(seq, dtype=torch.long) for (seq, label) in train_data]
labels = [label for (seq, label) in train_data]

# Find the max chord index (vocab size, since chords are ints)
all_chords = set()
for seq in sequences:
    all_chords.update(seq.tolist())
vocab_size = int(max(all_chords)) + 2  # +1 for max, +1 for padding idx=0

print(f"Vocab size: {vocab_size}")

Vocab size: 193


In [10]:
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

class ChordDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = [seq + 1 for seq in sequences]
        self.labels = labels
    def __len__(self):
        return len(self.sequences)
    def __getitem__(self, idx):
        return self.sequences[idx], self.labels[idx]

def collate_fn(batch):
    seqs, labels = zip(*batch)
    lengths = torch.tensor([len(s) for s in seqs], dtype=torch.long)
    padded_seqs = pad_sequence(seqs, batch_first=True, padding_value=0)
    return padded_seqs, lengths, torch.tensor(labels, dtype=torch.long)


In [11]:
from sklearn.model_selection import train_test_split

BATCH_SIZE = 256

train_data_split, val_data_split = train_test_split(train_data, test_size=0.2, random_state=42)

train_sequences = [torch.tensor(seq, dtype=torch.long) for (seq, label) in train_data_split]
train_labels = [label for (seq, label) in train_data_split]
val_sequences = [torch.tensor(seq, dtype=torch.long) for (seq, label) in val_data_split]
val_labels = [label for (seq, label) in val_data_split]

train_dataset = ChordDataset(train_sequences, train_labels)
val_dataset = ChordDataset(val_sequences, val_labels)


train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)


In [12]:
from torch.utils.data import WeightedRandomSampler

class_sample_counts = np.array([train_labels.count(i) for i in range(num_classes)])
weights = 1. / class_sample_counts

sample_weights = np.array([weights[label] for label in train_labels])
sample_weights = torch.DoubleTensor(sample_weights)

sampler = WeightedRandomSampler(
    weights=sample_weights,
    num_samples=len(sample_weights),
    replacement=True
)

sampled_train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=sampler, collate_fn=collate_fn)

## Model

In [13]:
import torch.nn as nn

class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim, num_layers, dropout_p=0.5, bidirectional=False):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0) # padding_idx=0 assumes 0 is used for padding

        self.lstm = nn.LSTM(
            input_size=embed_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,  
            dropout=dropout_p if num_layers > 1 else 0,
            bidirectional=bidirectional
        )

        self.dropout = nn.Dropout(dropout_p)

        fc_input_dim = hidden_dim * 2 if bidirectional else hidden_dim
        self.fc = nn.Linear(fc_input_dim, output_dim)

        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.bidirectional = bidirectional

    def forward(self, x, lengths):
        embedded = self.embedding(x)
        packed_embedded = nn.utils.rnn.pack_padded_sequence(
            embedded, lengths.cpu(), batch_first=True, enforce_sorted=False
        )

        # LSTM
        packed_output, (h_n, c_n) = self.lstm(packed_embedded)

    
        if self.bidirectional:
            h_n_last_layer_forward = h_n[-2, :, :]
            h_n_last_layer_backward = h_n[-1, :, :]
            hidden = torch.cat((h_n_last_layer_forward, h_n_last_layer_backward), dim=1)
        else:
            hidden = h_n[-1, :, :]


        hidden = self.dropout(hidden)
        logits = self.fc(hidden)
        return logits

In [14]:
VOCAB_SIZE = vocab_size
EMBED_DIM = 32
HIDDEN_DIM = 64
OUTPUT_DIM = 5
NUM_LAYERS = 2
DROPOUT_P = 0.4
LEARNING_RATE = 1e-3
WEIGHT_DECAY = 1e-5

In [15]:
model = LSTMClassifier(
        vocab_size=VOCAB_SIZE,
        embed_dim=EMBED_DIM,
        hidden_dim=HIDDEN_DIM,
        output_dim=OUTPUT_DIM,
        num_layers=2,
        dropout_p=DROPOUT_P,
        bidirectional=False
    ).to(device)

In [16]:
# norm
counts = torch.tensor([1630, 478, 154, 441, 236], dtype=torch.float) # from data-exploration
class_weights = 1.0 / counts
class_weights = class_weights / class_weights.sum() * len(counts)  # Normalize to num_classes

criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.5)



In [17]:
print(model)
print(f"Number of parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}")


LSTMClassifier(
  (embedding): Embedding(193, 32, padding_idx=0)
  (lstm): LSTM(32, 64, num_layers=2, batch_first=True, dropout=0.4)
  (dropout): Dropout(p=0.4, inplace=False)
  (fc): Linear(in_features=64, out_features=5, bias=True)
)
Number of parameters: 64869


In [None]:
from training import Trainer

trainer = Trainer(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    device=device,
    early_stopping=True,
)

trainer.train(train_loader, val_loader, epochs=100)


Epoch 1/100:  50%|█████     | 5/10 [00:16<00:14,  2.97s/it]