In [1]:
import pathlib
import os

import random

import numpy as np

import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision

In [2]:
def add_token(song, token, begin):
    token_shape = list(song.shape)
    token_shape[0] = 1
    tens_tokens = np.ones(token_shape) * token
    if begin == True:
        return np.concatenate((tens_tokens, song), axis = 0)
    else:
        return np.concatenate((song, tens_tokens), axis = 0) 
    
def standardize_song(song, lenght):
    if(song.shape[0]>lenght):
        return song
    song_shape = list(song.shape)
    song_shape[0] = lenght - song_shape[0]
    song = np.concatenate((song, np.zeros(song_shape)), axis=0)
    assert song.shape[0] == lenght
    return song

def song_transform(song, max_lenght):
    song = song[:max_lenght]
    song[song == -1] = 82
    #song += np.ones_like(song)*np.arange(song.shape[-1])*100
    song -= np.ones_like(song) * 34 #Move notes to [36, 82] -> [2, 48]
    song = np.pad(song, 0, 'constant')
    song = add_token(song, 1, begin=True)
    song = add_token(song, 49, begin=False)
    song = standardize_song(song, max_lenght+2)
    # Final mapping: pad = 0, sos = 1, notes = 2, 47, empy = 48, eos = 49      
    return song

def batch_uniform(song_batch, max_lenght):
    decoded_batch = torch.from_numpy(np.stack([song_transform(song, max_lenght) for song in song_batch], axis=0))
    #encoded_batch = F.one_hot(decoded_batch.long(), num_classes=50)
    return decoded_batch.transpose(0,1).long()

def rand_song_generation(L_distribution, bs, max_lenght):    
    songs_batch = []
    for _ in range(bs):
        L = np.random.choice(L_distribution)
        songs_batch.append(song_transform(np.random.randint(36,82,(L, 4)),max_lenght))
    decoded_batch = torch.from_numpy(np.stack(songs_batch, axis=0))
    #encoded_batch = F.one_hot(decoded_batch.long(), num_classes=50)
    return decoded_batch.transpose(0,1).long()

In [3]:
class SongIterator():
    def __init__(self, song_list, batch_size, song_size, shuffle):
        self.step = 0
        self.batch_size = batch_size
        self.song_size = song_size
        if shuffle == True:
            self.internal_song_list = random.sample(list(song_list),len(list(song_list)))
        else:
            self.internal_song_list = list(song_list)
                    
    def __iter__(self):
        return self
    
    def __next__(self):
        self.step += 1
        if self.step > 1000:
            raise StopIteration
        if (self.step + 1)  * self.batch_size < len(self.internal_song_list):
            batch = self.internal_song_list[(self.step * self.batch_size):((self.step + 1) * self.batch_size)]
        elif self.step  * self.batch_size < len(self.internal_song_list):
            batch = self.internal_song_list[(self.step * self.batch_size):]
        else:
            raise StopIteration
        return batch_uniform(batch, self.song_size)


In [4]:
# Extract real data

input_path = os.path.join(os.path.join(pathlib.Path(globals()['_dh'][0]).parent, "data"), "js-fakes-16thSeparated.npz")
jsf = np.load(input_path, allow_pickle=True, encoding='latin1')
len_seq = np.asarray([len(song) for song in jsf['pitches']])

max_song_length = len_seq.max()

#song_iterator = SongIterator(song_list=jsf['pitches'], batch_size=64, song_size=max_song_length)

# test iterator -> Now working
# for epoch in range(3):
#     for i, batch_song in enumerate(SongIterator(song_list=jsf['pitches'], batch_size=64, song_size=max_song_length, shuffle=True)):
#         print("Epoch = {}, step = {}, input type = {}, input shape = {}".format(epoch, i, type(batch_song), batch_song.shape))

In [5]:
# testing real and fake batch song generation
real_batch_song = next(SongIterator(song_list=jsf['pitches'], batch_size=64, song_size=max_song_length, shuffle=True))
fake_batch_song = rand_song_generation(len_seq, 64, max_song_length)


In [21]:
# TODO: Add key_padding_mask

class TransformerBlock(nn.Module):
    def __init__(self, num_heads, bs, emb_notes, emb_f):
        super(TransformerBlock, self).__init__() # Seq, batch, features
        self.ln1 = nn.LayerNorm([bs * num_heads * emb_notes, emb_f]) #layer norm: [L, bs, Emb_f, Emb_notes] -> [L, bs, Emb_f, Emb_notes]
        self.fc1 = nn.Linear(emb_f, emb_f)
        self.ln2 = nn.LayerNorm([bs *  emb_f, num_heads * emb_notes]) #layer norm: [BS, L, Emb_f, Emb_notes] -> [BS, L, Emb_f, Emb_notes]
        self.mha_l = nn.MultiheadAttention(num_heads * emb_notes, num_heads, dropout=0.25) # multi-head attention per lunghezza: [BS * Emb_f, L, Emb_notes] -> [BS * Emb_f, L, Emb_notes]
        self.ln3 = nn.LayerNorm([bs,  emb_f, num_heads * emb_notes]) #layer norm
        
        self.mlp = nn.Sequential(
            nn.Linear(num_heads * emb_notes, num_heads * emb_notes),  # Linear transformation
            nn.LayerNorm([bs, emb_f, num_heads * emb_notes]),  # Layer normalization
            nn.ELU(),  # Activation function (ELU)
            nn.Linear(num_heads * emb_notes, num_heads * emb_notes)  # Linear transformation
        )

    
    def forward(self, x): # add various reshape
        #[L, bs, Emb_f, Emb_notes]
        #print("Step TB1, x shape = {}".format(x.shape))
        x_1 = x.transpose(2, 3).reshape((x.shape[0], x.shape[1] * x.shape[3], x.shape[2])) 
        #[L, BS * Emb_notes, Emb_f]
        #print("Step TB2, x_1 shape = {}".format(x_1.shape))
        norm_x_1 = self.ln1(x_1) 
        #print("Step TB3, x_1 shape = {}".format(x_1.shape))
        attn_output_1 = self.fc1(norm_x_1)
        #print("Step TB4, attn_output_1 shape = {}".format(attn_output_1.shape))
        x_2 = x_1 + attn_output_1 # residual connection
        #print("Step TB5, x_2 shape = {}".format(x_2.shape))
        x_2 = x_2.reshape(x.shape[0], x.shape[1], x.shape[3], x.shape[2]).transpose(2,3).reshape(x.shape[0], x.shape[1] * x.shape[2], x.shape[3])
        #[L, BS * Emb_f, Emb_notes]
        #print("Step TB6, x_2 shape = {}".format(x_2.shape))
        norm_x_2 = self.ln2(x_2)
        #print("Step TB7, x_norm_x_2 shape = {}".format(norm_x_2.shape))
        attn_output_2 = self.mha_l(norm_x_2, norm_x_2, norm_x_2)[0] # [0] selects the attention output, to be decided if padding is needed
        #print("Step TB8, attn_output_2 shape = {}".format(attn_output_2.shape))
        x_3 = x + attn_output_2.reshape(x.shape)
        #print("Step TB9, x shape = {}".format(x.shape))
        x_3 = self.ln3(x_3)
        #print("Step TB10, x shape = {}".format(x.shape))
        x_3 = self.mlp(x_3)
        #print("Step TB11, x shape = {}".format(x.shape))
        return x_3 + x

In [60]:
class SinusoidalPosEmb(nn.Module):
    def __init__(self, dim):
        super(SinusoidalPosEmb, self).__init__()
        self.dim = dim
    
    def forward(self, x):
        position_enc = np.array([[pos / np.power(10000, 2*i/self.dim) for i in range(self.dim)] 
                                 if pos != 0 else np.zeros(self.dim) for pos in range(x.shape[0])])
        # keep dim 0 for padding token position encoding zero vector # To be decided what to do with this
        position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2]) # dim 2i
        position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2]) # dim 2i+1
        position_enc = np.expand_dims(position_enc, axis=(1,2))
        position_enc = np.repeat(position_enc, repeats=x.shape[1], axis=1)
        position_enc = np.repeat(position_enc, repeats=x.shape[2], axis=2)
        return torch.from_numpy(position_enc).type(torch.FloatTensor)

class Generator(nn.Module):
    def __init__(self, num_layers, num_emb, emb_f, num_heads, emb_notes, bs):
        super(Generator, self).__init__()
        #[L, bs, Emb_f, notes]
        self.embedding = nn.Embedding(num_emb, num_heads * emb_notes) # To be refined
        self.embedding.weight.data = 0.001 * self.embedding.weight.data # Unclear
        #[L, bs, Emb_f, num_heads_1 * emb_f]
        self.pos_emb = SinusoidalPosEmb(num_heads * emb_notes)
        self.blocks = nn.ModuleList([TransformerBlock(num_heads, bs, emb_notes, emb_f) for _ in range(num_layers)])
        self.fc_out = nn.Linear(num_heads * emb_notes, num_heads * emb_notes)

    def forward(self, x):
        #print("Gen Step 1, x shape = {}".format(x.shape))
        input_emb = self.embedding(x)
        #print("Gen Step 2, input_emb shape = {}".format(input_emb.shape))
        pos_emb = self.pos_emb(x)
        #print("Gen Step 3, pos_emb shape = {}".format(pos_emb.shape))
        emb = input_emb + pos_emb
        #print("Gen Step 4, emb shape = {}".format(emb.shape))
        for block in self.blocks:
            emb = block(emb) # add key_padding_mask
            #print("Gen Step 5 loop, emb shape = {}".format(emb.shape))
        emb = self.fc_out(emb)
        return torch.argmax(emb, dim = -1)
    
class Discriminator(nn.Module):
    def __init__(self, num_layers, num_emb, emb_f, num_heads, emb_notes, bs, len_seq):
        super(Discriminator, self).__init__()
        #[L, bs, Emb_f, notes]
        self.embedding = nn.Embedding(num_emb, num_heads * emb_notes) # To be refined
        self.embedding.weight.data = 0.001 * self.embedding.weight.data # Unclear
        #[L, bs, Emb_f, num_heads_1 * emb_f]
        self.pos_emb = SinusoidalPosEmb(num_heads * emb_notes)
        self.blocks = nn.ModuleList([TransformerBlock(num_heads, bs, emb_notes, emb_f) for _ in range(num_layers)])
        self.fc_out = nn.Linear(len_seq * emb_f * num_heads * emb_notes, 2) 


    def forward(self, x):
        print("1. shape = {}".format(x.shape))
        input_emb = self.embedding(x)
        print("2. shape = {}".format(input_emb.shape))
        pos_emb = self.pos_emb(x)
        print("3. shape = {}".format(pos_emb.shape))
        emb = input_emb + pos_emb
        print("4. shape = {}".format(emb.shape))
        for block in self.blocks:
            emb = block(emb) # add key_padding_mask
        print("5. Shape = {}".format(emb.shape))
        emb = emb.transpose(0, 1).reshape(emb.shape[1], emb.shape[0] * emb.shape[2] * emb.shape[3])
        print("6. Shape = {}".format(emb.shape))
        emb = self.fc_out(emb)
        return F.softmax(emb, dim=-1)

In [None]:
#Losses definition
def gan_discriminator_loss(output, real_label=True):
    if real_label:
        labes = torch.ones_like(output) * np.arange(0,2)
    else:
        labes = torch.ones_like(output) * np.arange(1,-1,-1)
    return F.binary_cross_entropy_with_logits(output, labes)
    
def gan_generator_loss(output, real_label=True):
    if real_label:
        return -output.mean()
    else:     
        return output.mean(

In [23]:
# Test TransformerBlock -> Work
bs = 4
emb_notes = 20 
emb_f = 4
num_heads = 2
trans_block = TransformerBlock(num_heads = num_heads, bs = bs, emb_notes = emb_notes, emb_f = emb_f)
#[L, bs, Emb_f, Emb_notes]

# S soprano (100), A alto (200), T tenor (300), B bass (400). 

n_tokens = len([0, 1, -1, 82]) + len(range(36,82))
batch_song = next(SongIterator(song_list=jsf['pitches'], batch_size=64, song_size=max_song_length, shuffle=True))

In [63]:
#[L, bs, Emb_f, Emb_notes]
#[bs, L, Emb_f, Emb_notes]]

# Test that Generator work as expected 
num_layers = 3
num_emb = 10 # to be adjusted  
Gen = Generator(num_layers, n_tokens, emb_f, num_heads, emb_notes, bs)
Dis = Discriminator(num_layers, n_tokens, emb_f, num_heads, emb_notes, bs, max_song_length+2)
noise_input = rand_song_generation(len_seq, bs, max_song_length)
a = Gen(noise_input.type(torch.int32))
print(a.shape)
b = Dis(a)
print(b.shape)

torch.Size([522, 4, 4])
1. shape = torch.Size([522, 4, 4])
2. shape = torch.Size([522, 4, 4, 40])
3. shape = torch.Size([522, 4, 4, 40])
4. shape = torch.Size([522, 4, 4, 40])
5. Shape = torch.Size([522, 4, 4, 40])
6. Shape = torch.Size([4, 83520])
torch.Size([4, 2])


In [70]:
torch.ones_like(b) * np.arange(1,-1,-1)

tensor([[1., 0.],
        [1., 0.],
        [1., 0.],
        [1., 0.]], dtype=torch.float64)