In [8]:
# @title 1. Setup and GPU Verification
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import numpy as np
import os
import random
from tqdm import tqdm
import matplotlib.pyplot as plt

# Install Hugging Face Datasets
!pip install datasets -q
from datasets import load_dataset

# Force GPU Usage
if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f"‚úÖ GPU Detected: {torch.cuda.get_device_name(0)}")
else:
    raise RuntimeError(" No GPU detected!")

# Set random seeds for reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

print(f"Global device set to: {device}")

‚úÖ GPU Detected: Tesla T4
Global device set to: cuda


In [9]:
# @title 2. Data Loading (Hugging Face)
def load_hf_data():
    print("‚¨áÔ∏è Downloading sander-wood/irishman dataset...")
    # Load the dataset
    dataset = load_dataset("sander-wood/irishman")

    print("\nüìä Dataset Structure:")
    print(dataset)

    # The dataset typically has 'train' and 'validation' splits.
    # We inspect the column names to find the ABC content (usually 'abc_notation' or 'text')
    sample_item = dataset['train'][0]
    print(f"\nSample Keys: {sample_item.keys()}")

    # Identify the correct key for ABC notation
    # Based on the lab description, it is likely 'abc_notation' or 'text'
    # We check dynamically to be safe
    text_key = 'abc_notation' if 'abc_notation' in sample_item else 'text'
    if text_key not in sample_item:
        # Fallback: assume the first string column is the content
        text_key = [k for k, v in sample_item.items() if isinstance(v, str)][0]

    print(f"üîë Extracting music from column: '{text_key}'")

    # Extract lists of strings for the rest of the pipeline
    train_songs = [item[text_key] for item in dataset['train']]
    val_songs = [item[text_key] for item in dataset['validation']]

    return train_songs, val_songs

# Load Data
train_songs, val_songs = load_hf_data()

print(f"\n‚úÖ Loaded {len(train_songs)} training songs")
print(f"‚úÖ Loaded {len(val_songs)} validation songs")

print("\n--- First Song (Raw ABC) ---")
print(train_songs[0])
print("----------------------------")

‚¨áÔ∏è Downloading sander-wood/irishman dataset...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

train.json:   0%|          | 0.00/80.0M [00:00<?, ?B/s]

validation.json: 0.00B [00:00, ?B/s]

Generating train split:   0%|          | 0/214122 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/2162 [00:00<?, ? examples/s]


üìä Dataset Structure:
DatasetDict({
    train: Dataset({
        features: ['abc notation', 'control code'],
        num_rows: 214122
    })
    validation: Dataset({
        features: ['abc notation', 'control code'],
        num_rows: 2162
    })
})

Sample Keys: dict_keys(['abc notation', 'control code'])
üîë Extracting music from column: 'abc notation'

‚úÖ Loaded 214122 training songs
‚úÖ Loaded 2162 validation songs

--- First Song (Raw ABC) ---
X:1
L:1/8
M:4/4
K:Emin
|: E2 EF E2 EF | DEFG AFDF | E2 EF E2 B2 |1 efe^d e2 e2 :|2 efe^d e3 B |: e2 ef g2 fe | 
 defg afdf |1 e2 ef g2 fe | efe^d e3 B :|2 g2 bg f2 af | efe^d e2 e2 ||
----------------------------


In [11]:
# @title 3. Preprocessing
# Step 1: Extract unique characters [cite: 25]
text_corpus = "".join(train_songs)
vocab = sorted(set(text_corpus))
vocab_size = len(vocab)

print(f"Unique characters (Vocabulary Size): {vocab_size}")
print(f"Sample characters: {vocab[:10]}")

# Step 2: Mappings [cite: 30]
char2idx = {u: i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)

# Save mappings for consistency
np.save('char2idx.npy', char2idx)
np.save('idx2char.npy', idx2char)

# Step 3: Vectorization Function [cite: 33]
def vectorize_string(text):
    """Converts a string into a list of indices."""
    return np.array([char2idx[c] for c in text if c in char2idx])

# Test vectorization [cite: 36]
print(f"\nVectorized 'ABC': {vectorize_string('ABC')}")

# Step 4: Padding [cite: 37]
# We determine a max length to cover most songs without excessive padding.
# The PDF suggests finding max length in dataset[cite: 40].
max_seq_len = max([len(s) for s in train_songs])
print(f"Max sequence length in training set: {max_seq_len}")

def pad_sequence(seq_indices, max_len):
    """Pads or truncates a sequence of indices to max_len."""
    if len(seq_indices) >= max_len:
        return seq_indices[:max_len] # Truncate [cite: 42]
    else:
        # Pad with space (assuming space is in vocab, otherwise index 0)
        # Using index 0 is common for padding if it's reserved,
        # but here we use the index of ' ' or the last index.
        pad_idx = char2idx.get(' ', 0)
        padding = [pad_idx] * (max_len - len(seq_indices))
        return np.concatenate([seq_indices, padding])

Unique characters (Vocabulary Size): 95
Sample characters: ['\n', ' ', '!', '"', '#', '$', '&', "'", '(', ')']

Vectorized 'ABC': [33 34 35]
Max sequence length in training set: 2968


In [12]:
# @title 4. Dataset and DataLoader
class MusicDataset(Dataset):
    def __init__(self, songs, char2idx, max_len):
        self.char2idx = char2idx
        self.idx2char = {v: k for k, v in char2idx.items()}
        self.max_len = max_len
        self.data = []

        # Pre-process all data
        for song in tqdm(songs, desc="Processing Dataset"):
            # Vectorize
            indices = [self.char2idx[c] for c in song if c in self.char2idx]
            # Pad/Truncate to max_len + 1 (to have enough for input and target)
            indices = pad_sequence(indices, self.max_len + 1)
            self.data.append(np.array(indices))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Retrieve vector
        seq = self.data[idx]

        # Create Input and Target [cite: 53]
        # Input: All except last
        input_seq = torch.tensor(seq[:-1], dtype=torch.long)
        # Target: All except first (shifted by 1)
        target_seq = torch.tensor(seq[1:], dtype=torch.long)

        return input_seq, target_seq

# Hyperparameters for Data Loading
BATCH_SIZE = 8 # Initial small batch size for verification [cite: 57]
SEQ_LENGTH = 1000 # Capping sequence length for memory efficiency

# Initialize Datasets
train_dataset = MusicDataset(train_songs, char2idx, SEQ_LENGTH)
val_dataset = MusicDataset(val_songs, char2idx, SEQ_LENGTH)

# Initialize DataLoaders [cite: 57]
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, drop_last=True)

# Verify Batch
example_input, example_target = next(iter(train_loader))
print(f"\nInput Batch Shape: {example_input.shape}")
print(f"Target Batch Shape: {example_target.shape}")
print(f"Input Sample (indices): {example_input[0][:10]}")
print(f"Target Sample (indices): {example_target[0][:10]}")

Processing Dataset: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 214122/214122 [00:21<00:00, 9867.50it/s] 
Processing Dataset: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 2162/2162 [00:00<00:00, 12801.14it/s]


Input Batch Shape: torch.Size([8, 1000])
Target Batch Shape: torch.Size([8, 1000])
Input Sample (indices): tensor([56, 26, 17, 20, 20, 22, 16, 21,  0, 44])
Target Sample (indices): tensor([26, 17, 20, 20, 22, 16, 21,  0, 44, 26])





In [13]:
# @title 5. Model Architecture
class MusicRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size):
        super(MusicRNN, self).__init__()
        self.hidden_size = hidden_size

        # 1. Embedding Layer [cite: 71]
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        # 2. LSTM Layer [cite: 72]
        # batch_first=True expects input shape (batch, seq, feature)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, batch_first=True)

        # 3. Dense Layer [cite: 73]
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden=None):
        # x shape: (batch_size, seq_len)

        # Embed: (batch_size, seq_len, embedding_dim)
        x = self.embedding(x)

        # LSTM: output shape (batch_size, seq_len, hidden_size)
        # hidden state is a tuple (h_0, c_0)
        output, hidden = self.lstm(x, hidden)

        # Flatten output for dense layer if needed, but PyTorch handles 3D input in Linear now usually.
        # We want output: (batch_size, seq_len, vocab_size)
        output = self.fc(output)

        return output, hidden

# Instantiate Model logic moved to training section to adhere to hyperparams

In [14]:
# @title 6. Training Loop (Iterations-based)
# TensorBoard setup
%load_ext tensorboard
writer = SummaryWriter('runs/music_rnn_experiment')

def train_model(model, train_data, val_data, params):
    # Move model to GPU
    model.to(device)

    # Optimizer & Loss
    optimizer = optim.Adam(model.parameters(), lr=params['lr'])
    criterion = nn.CrossEntropyLoss()

    # Create DataLoaders
    # We use a large number of workers to speed up data loading
    train_dl = DataLoader(train_data, batch_size=params['batch_size'], shuffle=True, drop_last=True)
    val_dl = DataLoader(val_data, batch_size=params['batch_size'], shuffle=False, drop_last=True)

    best_val_loss = float('inf')

    # Global iteration counter
    iteration = 0
    total_steps = params['num_iterations']

    print(f"üöÄ Starting training for {total_steps} ITERATIONS ...")

    # We loop indefinitely until the iteration count is reached
    # We use a while loop effectively by repeating the dataloader
    model.train()

    while iteration < total_steps:
        for batch_idx, (inputs, targets) in enumerate(train_dl):

            # --- Training Step ---
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()
            outputs, _ = model(inputs)

            # Flatten outputs and targets for CrossEntropy
            loss = criterion(outputs.reshape(-1, outputs.size(-1)), targets.reshape(-1))

            loss.backward()
            optimizer.step()

            # --- Logging ---
            if iteration % 50 == 0:
                print(f"Step {iteration}/{total_steps} | Train Loss: {loss.item():.4f}")
                writer.add_scalar('Loss/train', loss.item(), iteration)

            iteration += 1

            # --- Validation & Checkpointing (Every 500 steps) ---
            if iteration % 500 == 0 or iteration == total_steps:
                model.eval()
                val_loss_accum = 0
                with torch.no_grad():
                    # Check just a portion of validation set to save time
                    for i, (v_inputs, v_targets) in enumerate(val_dl):
                        if i > 50: break # Only check 50 batches
                        v_inputs, v_targets = v_inputs.to(device), v_targets.to(device)
                        v_outputs, _ = model(v_inputs)
                        v_loss = criterion(v_outputs.reshape(-1, v_outputs.size(-1)), v_targets.reshape(-1))
                        val_loss_accum += v_loss.item()

                avg_val_loss = val_loss_accum / 50
                print(f"‚úÖ Validation at Step {iteration}: {avg_val_loss:.4f}")
                writer.add_scalar('Loss/val', avg_val_loss, iteration)

                if avg_val_loss < best_val_loss:
                    best_val_loss = avg_val_loss
                    torch.save(model.state_dict(), 'best_music_rnn.pth')
                    print("üíæ Best model saved.")

                model.train() # Switch back to train mode

            # STOPPING CONDITION
            if iteration >= total_steps:
                print("üèÅ Reached maximum iterations. Stopping.")
                return model

    return model

# Define Hyperparameters [cite: 81-86]
params = {
    'num_iterations': 3000,   # Stops exactly at 3000 steps
    'batch_size': 64,         # Adjusted for Colab stability
    'lr': 5e-3,
    'embedding_dim': 256,
    'hidden_size': 1024
}

# Initialize Model
model = MusicRNN(vocab_size=vocab_size,
                 embedding_dim=params['embedding_dim'],
                 hidden_size=params['hidden_size'])

# Train
trained_model = train_model(model, train_dataset, val_dataset, params)

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard
üöÄ Starting training for 3000 ITERATIONS (not epochs)...
Step 0/3000 | Train Loss: 4.6262
Step 50/3000 | Train Loss: 0.6266
Step 100/3000 | Train Loss: 0.4850
Step 150/3000 | Train Loss: 0.4370
Step 200/3000 | Train Loss: 0.3851
Step 250/3000 | Train Loss: 0.4042
Step 300/3000 | Train Loss: 0.3355
Step 350/3000 | Train Loss: 0.3434
Step 400/3000 | Train Loss: 0.3512
Step 450/3000 | Train Loss: 0.3144
‚úÖ Validation at Step 500: 0.2279
üíæ Best model saved.
Step 500/3000 | Train Loss: 0.3162
Step 550/3000 | Train Loss: 0.3590
Step 600/3000 | Train Loss: 0.3244
Step 650/3000 | Train Loss: 0.3636
Step 700/3000 | Train Loss: 0.3462
Step 750/3000 | Train Loss: 0.3405
Step 800/3000 | Train Loss: 0.3409
Step 850/3000 | Train Loss: 0.3354
Step 900/3000 | Train Loss: 0.2979
Step 950/3000 | Train Loss: 0.3203
‚úÖ Validation at Step 1000: 0.2052
üíæ Best model saved.
Step 1000/3000 | Train Loss: 0.3011
S

In [None]:
# @title 6. Training Loop
# TensorBoard setup
%load_ext tensorboard
writer = SummaryWriter('runs/music_rnn_experiment')

def train_model(model, train_data, val_data, params):
    # Move model to GPU
    model.to(device)

    # Optimizer & Loss
    optimizer = optim.Adam(model.parameters(), lr=params['lr'])
    criterion = nn.CrossEntropyLoss()

    # Create DataLoaders with the specific batch size requested
    train_dl = DataLoader(train_data, batch_size=params['batch_size'], shuffle=True, drop_last=True)
    val_dl = DataLoader(val_data, batch_size=params['batch_size'], shuffle=False, drop_last=True)

    best_val_loss = float('inf')
    patience = 5
    trigger_times = 0

    iteration = 0
    epochs = params['num_iterations'] // len(train_dl) + 1 # Approx epochs needed

    print(f"Starting training for {params['num_iterations']} iterations...")

    for epoch in range(epochs):
        model.train()
        train_loss_accum = 0

        for batch_idx, (inputs, targets) in enumerate(train_dl):
            if iteration >= params['num_iterations']:
                break

            # Move data to GPU
            inputs, targets = inputs.to(device), targets.to(device)

            # Zero gradients
            optimizer.zero_grad()

            # Forward pass
            # Hidden state is initialized to zero by default if not provided
            outputs, _ = model(inputs)

            # Reshape for Loss: (batch * seq, vocab_size) vs (batch * seq)
            loss = criterion(outputs.reshape(-1, outputs.size(-1)), targets.reshape(-1))

            # Backward pass
            loss.backward()
            optimizer.step()

            # Logging
            writer.add_scalar('Loss/train', loss.item(), iteration)
            train_loss_accum += loss.item()

            if iteration % 50 == 0:
                print(f"Iter {iteration} | Train Loss: {loss.item():.4f}")

            iteration += 1

        # Validation Phase (End of Epoch)
        model.eval()
        val_loss_accum = 0
        with torch.no_grad():
            for v_inputs, v_targets in val_dl:
                v_inputs, v_targets = v_inputs.to(device), v_targets.to(device)
                v_outputs, _ = model(v_inputs)
                v_loss = criterion(v_outputs.reshape(-1, v_outputs.size(-1)), v_targets.reshape(-1))
                val_loss_accum += v_loss.item()

        avg_val_loss = val_loss_accum / len(val_dl)
        writer.add_scalar('Loss/val', avg_val_loss, iteration)
        print(f"Epoch {epoch} complete. Val Loss: {avg_val_loss:.4f}")

        # Early Stopping & Checkpointing [cite: 80]
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), 'best_music_rnn.pth')
            print("‚úÖ New best model saved.")
            trigger_times = 0
        else:
            trigger_times += 1
            if trigger_times >= patience:
                print(f"Early stopping at epoch {epoch}!")
                return model

        if iteration >= params['num_iterations']:
            print("Reached maximum iterations.")
            return model

    return model

# Define Hyperparameters [cite: 81]
params = {
    'num_iterations': 3000,
    'batch_size': 64, # Adjusted to 64 as 256 might OOM on standard Colab GPU with seq_len=1000
    'lr': 5e-3,
    'embedding_dim': 256,
    'hidden_size': 1024
}
# Note: Instructions asked for batch_size=256. If you have a high-VRAM GPU (A100), change back to 256.
# For T4 (standard Colab), 64 is safer for stability.

# Initialize Model
model = MusicRNN(vocab_size=vocab_size,
                 embedding_dim=params['embedding_dim'],
                 hidden_size=params['hidden_size'])

# Train
trained_model = train_model(model, train_dataset, val_dataset, params)

In [18]:
# @title 7. Generation
def generate_music(model, start_string, generation_length=200, temperature=1.0):
    # Prepare model
    model.eval()
    model.to(device)

    # Vectorize start string
    input_eval = [char2idx[c] for c in start_string]
    input_eval = torch.tensor(input_eval, dtype=torch.long).unsqueeze(0).to(device) # (1, seq_len)

    text_generated = []

    # Initialize hidden state (None defaults to zeros)
    hidden = None

    print(f"Generating {generation_length} characters...")

    with torch.no_grad():
        for i in range(generation_length):
            # Forward pass
            output, hidden = model(input_eval, hidden)

            # We only care about the prediction for the LAST time step
            # output shape: (1, seq_len, vocab_size) -> take last: (1, vocab_size)
            prediction = output[:, -1, :]

            # Apply temperature
            # High temp -> more random; Low temp -> more greedy
            prediction = prediction / temperature

            # Sample from distribution
            probs = torch.softmax(prediction, dim=1)
            dist = torch.distributions.Categorical(probs)
            predicted_idx = dist.sample()

            # Append to result
            idx = predicted_idx.item()
            text_generated.append(idx2char[idx])

            # Next input is the current prediction (Autoregressive)
            input_eval = predicted_idx.unsqueeze(0) # (1, 1)

    return start_string + ''.join(text_generated)

# Test Generation [cite: 99]
start_seed = "X:1\nT:Generated Song\n"
generated_song = generate_music(trained_model, start_seed, generation_length=400)

print("\nüé∂ GENERATED SONG üé∂")
print("=======================")
print(generated_song)
print("=======================")

Generating 400 characters...

üé∂ GENERATED SONG üé∂
X:1
T:Generated Song
| F2 D>F D2 F/A/F/D/ | [DB]2 A>B d>G A2 | 
 B/e/d Dd A2 A>F |"C" E3F G/F/E/D/ C/C/A,/G,/C/ | G,A,B,>CD A,DD/D/D ||                                                                                                                                                                                                                                                                                              


In [19]:
#to hear the output, head out to : https://www.abcjs.net/abcjs-editor.html