# Libraries

In [2]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset, Dataset, random_split
from torch.optim import Adam
import torch.nn.functional as F
import numpy as np

# for teacher
import random

from sklearn.model_selection import train_test_split

# Side variables
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
base_type = np.float32
torch_type = torch.float32
batch_size = 64
window_length = 1024 # Data points will hold 1024 tokens of observations (should be 1/4 average song length in tokens, remember to remove outliers)
window_step_size = 32 # Sliding window will move this much each time (higher numbers means less data, but less overfitting to similar data)


# Load Data

# Data Preprocessing

In [None]:
data = [] # <-- put data here

# Assume already tokenized (this is for transformer, must be adapted for other models)
class MusicDataset(Dataset):
    def __init__(self, data, window_length, step_size):
        self.temp_data = data
        self.window_length = window_length
        self.step_size = step_size
        self.final_data = self.apply_window()

    def apply_window(self):
        # return sliding window data + labels
        train_examples = []
        # cycle through each window configuration, calculating start index and end index
        for start_idx in range(0, len(self.temp_data) - self.window_length + 1, self.step_size):
            end_idx = start_idx + self.window_length
            train_example = self.temp_data[start_idx:end_idx] # training of length window_length
            train_examples.append(train_example)
            
        return train_examples
                

    def __len__(self):
        return len(self.final_data)

    def __getitem__(self, idx):
        window = self.final_data[idx]

        return torch.tensor(window).to(device)
    

dataset = MusicDataset(data, window_length, window_step_size)

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

training_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
validation_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)


# RNN

In [None]:
# RNN Variables
seq_len = 256
embed_size = 512 # larger embed size may require larger dropout
dropout = 0.2
lr = 1e-4
epochs = 20

In [None]:
class Music_RNN(nn.Module):
    def __init__(self, input_size, embed_size, dropout):
        super(Music_RNN, self).__init__()
        self.RNN = nn.RNN(input_size, embed_size, batch_first=True, dropout=dropout)
        self.ff = nn.Linear(embed_size, 1) # predicting next input autoregressively

    def forward(self, x):
        # x should be (batch_size, seq_len)
        x = self.RNN(x)
        x_pred = self.ff(x)

        return x_pred # (batch_size, 1)

In [None]:
model_rnn = Music_RNN(seq_len, embed_size, dropout)

In [None]:
# adapt loss and optimizer as needed
criterion = nn.MSELoss()
optimizer = Adam(model_rnn.parameters(), lr=lr)

# train
for epoch in range(epochs):
    # Set to train
    model_rnn.train()
    # keep cumalitive losses
    total_losses = 0.0

    for batch in enumerate(training_dataloader):
        inputs, targets = batch
        optimizer.zero_grad()

        preds = model_rnn.forward(inputs) # make sure dimensions line up

        loss = criterion(preds, targets)
        loss.backward()

        optimizer.step()

        total_losses += loss.item()

    print(f"Epoch: {epoch}, Loss: {total_losses / len(training_dataloader)}")


In [None]:
# Validation

model_rnn.eval()

total_losses = 0.0

# validation
with torch.no_grad():
    for batch in validation_dataloader:
        inputs, targets = batch

        preds = model_rnn.forward(inputs) # make sure dimensions line up

        loss = criterion(preds, targets)
        total_losses += loss.item()

    print(f"Loss: {total_losses / len(validation_dataloader)}")

# LSTM

# Transformer

YAYYYYYYYYY Transformer Time WOOOOOOOOO

In [None]:
#  Model variables
d_model = 128 # Embed Dim
n_encoder_layers = 6 # lower if not that complex and wanting speedup
n_decoder_layers = n_encoder_layers
n_heads = 8 # Number of Attention Heads
d_ff = 512 # Feed Forward Dimensionality (AIAYN paper reccomends 4 times d_model size)
learning_rate = 1e-5 # Maybe increase?
num_epochs = 100 # Change this later so it doesn't take 10 years to run

In [None]:
# Standard positonal encoding used here, could also try time encoding since notes have different timestamps
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, win_len):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(win_len, d_model) # (win_len, d_model)
        position = torch.arange(0, win_len, dtype=torch_type).unsqueeze(1) # (win_len, 1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float()*(-torch.log(torch.tensor(10000.0))/d_model)) # (d_model/2)
        pe[:, 0::2] = torch.sin(position*div_term)
        pe[:, 1::2] = torch.cos(position*div_term)
        pe = pe.unsqueeze(0) # (1, win_len, d_model)
        self.pe = pe.to(device)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return x


# mask
def generate_causal_mask(size):
    # size should be target size
    mask = torch.triu(torch.ones(size, size, device=device), diagonal=1)
    mask = mask.masked_fill(mask == 1, float('-inf'))
    '''
    [0, -inf, -inf, -inf]
    [0,   0,  -inf, -inf]
    [0,   0,    0,  -inf]
    [0,   0,    0,    0 ]
    Yay for triangle masking
    '''
    return mask


In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, num_tokens, d_model=d_model, nhead=n_heads, dim_ff=d_ff, win_len=window_length, layers=n_encoder_layers):
        super(EncoderLayer, self).__init__()
        self.d_model = d_model
        self.nhead = nhead
        self.dim_ff = dim_ff
        self.win_len = win_len
        self.layers = layers

        # pre-transformer
        self.tok_embed = nn.Embedding(num_tokens, d_model)
        self.pos_enc = PositionalEncoding(d_model, win_len)

        # transformer part
        encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_ff, batch_first=True)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=self.layers)

    def forward(self, x):
        # pass sequential data through embedding and apply positional encoding
        # x is (batch_size, win_len)
        x_seq = self.tok_embed(x) # (batch_size, win_len, d_model)
        x_seq = self.pos_enc(x_seq) # (batch_size, win_len, d_model)
        # now through transformer encoder layers
        encoder_output = self.encoder(x_seq)

        return encoder_output
    


# Actual decoder
class DecoderLayer(nn.Module):
    def __init__(self, num_tokens, d_model=d_model, nhead=n_heads, dim_ff=d_ff, win_len=window_length, layers=n_encoder_layers):
        super(DecoderLayer, self).__init__()
        self.d_model = d_model
        self.nheads = nhead
        self.dim_ff = dim_ff
        self.win_len = win_len
        self.layers = layers

        # pre-transformer
        self.tok_embed = nn.Embedding(num_tokens, d_model)
        self.pos_enc = PositionalEncoding(d_model, win_len)
        self.out_proj = nn.Linear(d_model, num_tokens) # final projection for token prediction

        # transformer part
        decoder_layer = nn.TransformerDecoderLayer(d_model, nhead, dim_ff, batch_first=True)
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=self.layers)


    def forward(self, tgt, memory, tgt_mask=None):
        '''
        Memory is encoder output
        Tgt is the real note  # (batch_size, 1)
        Tgt_mask masks the predictions, but won't be needed unless we decide to predict multiple steps at once
        '''

        x_seq = self.tok_embed(tgt) # (batch_size, 1, d_model)
        x_seq = self.pos_enc(x_seq) # (batch_size, 1, d_model)

        decoder_output = self.decoder(x_seq, memory, tgt_mask=tgt_mask) # (batch_size, 1, d_model)
        pred = self.out_proj(decoder_output) # (batch)
      
        return pred



In [None]:
class TransformerAutoencoder(nn.Module):
    def __init__(self, num_tokens, d_model, nhead, dim_ff, win_len, layers):
        super(TransformerAutoencoder, self).__init__()
        self.encoder = EncoderLayer(num_tokens, d_model, nhead, dim_ff, win_len, layers)
        self.decoder = DecoderLayer(num_tokens, d_model, nhead, dim_ff, win_len, layers, out_days)

    def forward(self, tgt):
        # tgt is (batch_size, win_len)
        memory = self.encoder(tgt)
        my_mask = generate_causal_mask(tgt.size(1)) # win_len by win_len mask
        reconstructed = self.decoder(tgt, memory, tgt_mask=my_mask)
        return reconstructed

In [None]:
# Create model
num_tokens = len(set(list(data)))
model = TransformerAutoencoder(num_tokens, d_model, n_heads, d_ff, window_length, n_encoder_layers, days_to_predict).to(device)

# Use Adam cause he's so cool
optimizer = Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

# Training loop
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0 # cumalative loss

    # Cycle through each batch
    for batch in training_dataloader:

        optimizer.zero_grad()
        # Pass through model
        output = model(batch)
        # Determine loss
        loss = criterion(batch, output)
        # Update weights
        loss.backward()
        optimizer.step()
        # Add to epoch_loss
        epoch_loss += loss.item()

    # Now show average loss for epoch
    avg_epoch_loss = epoch_loss / len(training_dataloader)
    print(f"Epoch: [{epoch+1}/{num_epochs}]   Epoch Average Loss: {avg_epoch_loss}")


Note that the above doesn't actually autoregressively select, I will add that later. Now it just recreates sequences