# The preprocessing

In [2]:
%pip install numpy
%pip install torch

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
import numpy as np
import torch
import torch.nn as nn
from torch import optim
from torch import Tensor
import torch.nn.functional as F
import sqlite3
import math

In [3]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [7]:
# get data and filter out any bads
con = sqlite3.connect("data/wjazzd.db")
cur = con.cursor()

x = cur.execute("SELECT melid, bass_pitch from beats")# WHERE (bass_pitch is not null)")
bass = np.array(x.fetchall())
inds = np.where(bass==None)
nulls = np.unique(bass[inds, 0])
r = cur.execute("SELECT melid, pitch, division, tatum, beat from melody")# WHERE division <= 4 AND tatum <= 4 AND beat <= 4")
notes = np.array(r.fetchall()).astype('int32')
mask = np.isin(notes[:, 0], nulls)
notes = notes[~mask]

maskb = np.isin(bass[:, 0], nulls)
bass = bass[~maskb].astype('int32')


In [8]:
# get useful info from the notes and bass
unique_notes = np.unique(notes[:, 1:], axis=0)
unique_bass = np.unique(bass[:, 1])

num_notes = unique_notes.shape[0]
num_bass = unique_bass.shape[0]
num_songs_notes = np.unique(notes[:, 0]).shape[0]
num_songs_bass = np.unique(bass[:, 0]).shape[0]
max_length_notes = np.max(np.bincount(notes[:, 0]))
max_length_bass = np.max(np.bincount(bass[:, 0]))
unique_bass.shape, unique_notes.shape

((26,), (9398, 4))

In [9]:
# encode and decode into indices

bass_to_i = { b:i+1 for i, b in enumerate(unique_bass) }
bass_to_i[0] = 0
bass_to_i[-1] = -1
i_to_bass = { i+1:b for i, b in enumerate(unique_bass) }
i_to_bass[-1] = -1

notes_to_i = { tuple(n):i+1 for i, n in enumerate(unique_notes) }
notes_to_i[tuple(np.array([-1, -1, -1, -1]))] = -1
notes_to_i[tuple(np.array([0,0,0,0]))] = 0
i_to_notes = { i+1:n for i, n in enumerate(unique_notes)}
notes_to_i[-1] = np.array([-1, -1, -1, -1])
notes_to_i[0] = np.array([0,0,0,0])

In [10]:
# reshape bass

rows = bass[:, 0]

bass_reshape = np.zeros((num_songs_bass, max_length_bass+1))

for i in range(num_songs_bass):
    x = np.squeeze(bass[np.where(rows==i+1), 1])
    x = np.pad(x, (0, max_length_bass+1 - len(x)))
    tmp=x
    bass_reshape[i, :] = tmp
bass_reshape = bass_reshape[1:, :]
bass_ind = np.zeros_like(bass_reshape)

# make bass into indices
for i in range(bass_reshape.shape[0]):
    for j in range(bass_reshape.shape[1]):
        bass_ind[i, j] = bass_to_i[bass_reshape[i, j]]
bass_ind = bass_ind.astype('int32')
bass_ind.shape, num_songs_bass, max_length_bass, bass_ind[0, 0:10]

((436, 1737), 437, 1736, array([ 7,  9, 10, 11, 12, 13, 14, 14, 15, 16]))

In [11]:
# reshape notes

notes_reshape = np.zeros((num_songs_notes, max_length_notes+1, 4))
for i in range(num_songs_notes):
    x = np.squeeze(notes[np.where(rows==i+1), 1:])
    desired_shape = (max_length_notes+1, 4)
    
    padding = np.subtract(desired_shape, x.shape)
    padding = np.where(padding < 0, 0, padding)

    x = np.pad(x, ((0, padding[0]), (0, padding[1])), mode='constant')
    tmp = x
    notes_reshape[i, :] = tmp
notes_reshape = notes_reshape[1:, :, :]
notes_ind = np.zeros((notes_reshape.shape[0], notes_reshape.shape[1]))
for i in range(notes_ind.shape[0]):
    for j in range(notes_ind.shape[1]):
        notes_ind[i, j] = notes_to_i[tuple(notes_reshape[i, j, :])]
notes_ind = notes_ind.astype('int32')
notes_ind.shape, num_songs_notes, max_length_notes, notes_ind[0, 0:10]

((436, 1955),
 437,
 1954,
 array([6319, 7196, 7667, 4034, 5018, 5693, 4035, 5355, 6336, 5349]))

In [11]:
# make into one hot
note_embs = note_embed[notes_ind]

bass_embs = bass_embed[bass_ind]
note_embs.shape, bass_embs.shape

((436, 1953, 100), (436, 1737, 100))

In [12]:
# now the number of songs are the same
num_songs = notes_ind.shape[0] # == bass_ind.shape[0]
num_songs

436

In [13]:
train_ratio = 0.8
test_ratio = 1-train_ratio
num_train = int(num_songs * 0.8)
train_bass = torch.Tensor(bass_ind[:num_train])
train_notes = torch.Tensor(notes_ind[:num_train])
test_bass = torch.Tensor(bass_ind[num_train:])
test_notes = torch.Tensor(notes_ind[num_train:])
test_bass.shape, test_notes.shape, train_bass.shape, train_notes.shape

(torch.Size([88, 1737]),
 torch.Size([88, 1953]),
 torch.Size([348, 1737]),
 torch.Size([348, 1953]))

In [14]:
print(notes_ind.shape, bass_ind.shape)

(436, 1953) (436, 1737)


# The model

In [15]:
# maybe one hot isn't the best approach
# borrowed from https://pytorch.org/tutorials/beginner/translation_transformer.html
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        # self.embedding = nn.Embedding(vocab_size, emb_size)
        # self.emb_size = emb_size

        self.embedding = torch.rand(vocab_size+1, emb_size)
        

    def forward(self, inds):
        return self.embedding[inds.to(torch.int32)]
# this code don't work

In [16]:
# positional encoding also from same source
class PositionalEncoding(nn.Module):
    def __init__(self,
                 emb_size: int,
                 dropout: float,
                 maxlen: int = 500):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding: Tensor):
        return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])

In [17]:
class Seq2SeqTransformer(nn.Module):
    def __init__(self, 
                 num_encoder_layers: int,
                 num_decoder_layers: int,
                 emb_size: int,
                 nhead: int,
                 src_vocab_size: int,
                 tgt_vocab_size: int,
                 dim_feedforward: int = 512,
                 dropout: float = 0.1):
        super(Seq2SeqTransformer, self).__init__()
        self.transformer = nn.Transformer(d_model=emb_size,
                                          nhead=nhead,
                                          num_encoder_layers=num_encoder_layers,
                                          num_decoder_layers=num_decoder_layers,
                                          dim_feedforward=dim_feedforward,
                                          dropout=dropout)
        self.generator = nn.Linear(emb_size, tgt_vocab_size)
        self.src_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_emb = TokenEmbedding(tgt_vocab_size, emb_size)
        self.positional_encoding = PositionalEncoding(emb_size=emb_size, dropout=dropout)

    def forward(self,
                src,
                tgt,
                src_mask,
                tgt_mask,
                src_padding_mask,
                tgt_padding_mask,
                memory_key_padding_mask):
        #src = src.transpose(0, 1)
        #tgt = tgt.transpose(0, 1)
        src_emb = self.positional_encoding(self.src_emb(src)).transpose(0,1)
        tgt_emb = self.positional_encoding(self.tgt_emb(tgt)).transpose(0,1)
        print(src_padding_mask.shape, tgt_padding_mask.shape, src_mask.shape, tgt_mask.shape, memory_key_padding_mask.shape)
        print(src_emb.shape, tgt_emb.shape)
        outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None,
                                  src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
        logits = self.generator(outs)
        return logits
    
    # for inference
    def encode(self, src, src_mask):
        return self.transformer.encoder(self.positional_encoding(self.src_emb(src)), src_mask)
    
    def decode(self, tgt, tgt_mask):
        return self.transformer.decoder(self.positional_encoding(self.tgt_emb(tgt)), tgt_mask)



In [18]:
def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask


def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len),device=DEVICE).type(torch.bool)

    src_padding_mask = (src == 0).transpose(0, 1)
    tgt_padding_mask = (tgt == 0).transpose(0, 1)
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

In [19]:
torch.manual_seed(420)

SRC_VOCAB_SIZE = num_bass
TGT_VOCAB_SIZE = num_notes
EMB_SIZE = 100
NHEAD = 8
FFN_HID_DIM = 512
BATCH_SIZE = 12
NUM_ENCODER_LAYERS = 3
NUM_DECODER_LAYERS = 3

transformer = Seq2SeqTransformer(num_encoder_layers=NUM_DECODER_LAYERS, num_decoder_layers=NUM_ENCODER_LAYERS,
                                  emb_size=EMB_SIZE, nhead=NHEAD, src_vocab_size=SRC_VOCAB_SIZE, tgt_vocab_size=TGT_VOCAB_SIZE, dim_feedforward=FFN_HID_DIM)

for p in transformer.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

transformer = transformer.to(DEVICE)

loss_fn = torch.nn.CrossEntropyLoss(ignore_index=0)

optimizer = torch.optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

In [20]:
def collate_fn(data):
    notes, bass = data
    note_length = notes.shape[1]
    bass_length = bass.shape[1]
    num_extra_examples_notes = notes.shape[0] % BATCH_SIZE
    num_extra_examples_bass = bass.shape[0] % BATCH_SIZE
    notes = torch.cat((notes, torch.zeros(num_extra_examples_notes, note_length)), dim=0)
    bass = torch.cat((bass, torch.zeros(num_extra_examples_bass, bass_length)), dim=0)
    batch_notes = notes.view((BATCH_SIZE, int(num_train / BATCH_SIZE), note_length))
    batch_bass = bass.view((BATCH_SIZE, int(num_train / BATCH_SIZE), bass_length))
    

    return batch_bass, batch_notes

In [21]:
from torch.utils.data import DataLoader

# data: (num_examples, example_length)
def train_epoch(model, optimizer):
    model.train()
    losses=0
    
    train_dataloader = DataLoader((train_notes, train_bass), batch_size=BATCH_SIZE, collate_fn=collate_fn)
    src = None
    tgt = None
    for x, y in train_dataloader:
        src = x
        tgt = y

    for i in range(BATCH_SIZE):
        batch_src = src[i].to(DEVICE)
        batch_tgt = tgt[i].to(DEVICE)
        print(batch_src.shape, batch_tgt.shape, src.shape, tgt.shape)
        tgt_input = batch_tgt[:,:-1]
        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(batch_src.T, tgt_input.T)
        
        logits = model(batch_src, tgt_input, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, src_padding_mask)

        optimizer.zero_grad()
        tgt_out = batch_tgt[:,1:]
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
        loss.backward()
        optimizer.step()
        losses += loss.item()
    return losses / num_train


In [22]:
from timeit import default_timer as timer
NUM_EPOCHS = 18
#(src_padding_mask.shape, tgt_padding_mask.shape, src_mask.shape, tgt_mask.shape, memory_key_padding_mask.shape
for epoch in range(1, NUM_EPOCHS+1):
    start_time = timer()
    train_loss = train_epoch(transformer, optimizer)
    end_time = timer()
    #val_loss = evaluate(transformer)
    print((f"Epoch: {epoch}, Train loss: {train_loss:.3f}, "f"Epoch time = {(end_time - start_time):.3f}s"))

torch.Size([29, 1737]) torch.Size([29, 1953]) torch.Size([12, 29, 1737]) torch.Size([12, 29, 1953])
torch.Size([29, 1737]) torch.Size([29, 1952]) torch.Size([1737, 1737]) torch.Size([1952, 1952]) torch.Size([29, 1737])
torch.Size([1737, 29, 512]) torch.Size([1952, 29, 512])


KeyboardInterrupt: 

In [None]:
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    src = src.to(DEVICE)
    src_mask = src_mask.to(DEVICE)

    memory = model.encode(src, src_mask)
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
    for i in range(max_len-1):
        memory = memory.to(DEVICE)
        tgt_mask = (generate_square_subsequent_mask(ys.size(0))
                    .type(torch.bool)).to(DEVICE)
        out = model.decode(ys, memory, tgt_mask)
        out = out.transpose(0, 1)
        prob = model.generator(out[:, -1])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()

        ys = torch.cat([ys,
                        torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
        if next_word == 0:
            break
    return ys