In [1]:
import torch
import numpy as np
from torch import nn
from torch.nn import functional as F

In [2]:
from torch.nn.utils.rnn import pad_sequence

USEABLE_KEYS = [i+":" for i in "BCDFGHIKLMmNOPQRrSsTUVWwXZ"]


def read_abc(path):
    keys = []
    notes = []
    with open(path) as rf:
        for line in rf:
            line = line.strip()
            if line.startswith("%"):
                continue

            if any([line.startswith(key) for key in USEABLE_KEYS]):
                keys.append(line)
            else:
                notes.append(line)

    keys = " ".join(keys)
    notes = "".join(notes).strip()
    notes = notes.replace(" ", "")

    if notes.endswith("|"):
        notes = notes[:-1]

    notes = notes.replace("[", " [")
    notes = notes.replace("]", "] ")
    notes = notes.replace("(", " (")
    notes = notes.replace(")", ") ")
    notes = notes.replace("|", " | ")
    notes = notes.strip()
    notes = " ".join(notes.split(" "))
    
    if not keys or not notes:
        return None, None

    return keys, notes

In [3]:
from pathlib import Path
from tqdm import tqdm
import youtokentome as yttm

train_dir = "./yandex-music-generation-contest/cleaned_data"

train_paths = list(Path(train_dir).glob("*.abc"))

In [4]:
BPE_MODEL_FILENAME = './models/abc_bpe.yttm'
TRAIN_TEXTS_FILENAME = './datasets/abc_bpe_train.txt'
# with open(TRAIN_TEXTS_FILENAME, "w") as f:
#     for file in tqdm(train_paths):
#         (keys, notes) = read_abc(file)
#         f.write(f"{notes}\n")

# yttm.BPE.train(data=TRAIN_TEXTS_FILENAME, vocab_size=500, model=BPE_MODEL_FILENAME)

In [5]:
tokenizer = yttm.BPE(BPE_MODEL_FILENAME)

In [6]:
train_data = []
test_data = []
for i, p in enumerate(tqdm(train_paths)):
    (keys, notes) = read_abc(p)
    if keys is None:
        continue

    keys_tokens = tokenizer.encode(keys)
    bars = notes.split(" | ")
    notes_tokens = [tokenizer.encode(i + " | ") for i in bars]

    ## To avoid OOM
    sequence_len = sum(len(i) for i in notes_tokens)
    if (16 > sequence_len > 256):
        print("Skip", p)
        continue
            
    if i % 5 == 0:
        test_data.append((keys_tokens, notes_tokens))
    else:
        train_data.append((keys_tokens, notes_tokens))

100%|████████████████████████████████████████████████████████████████████████| 511895/511895 [03:58<00:00, 2142.89it/s]


In [7]:
import random
from torch.utils.data import Dataset


class ABCDataset(Dataset):
    def __init__(self, data,
                 context_bars_num=8, 
                 target_bars_num=8,
                 bos_id=2,
                 eos_id=3,
                 pad_id=0,
                 is_test=False,
                 max_len = 2048):
        
        self.notes = []
        self.keys = []

        for (keys, notes) in data:
            if notes is None:
                continue

            self.keys.append(keys)
            self.notes.append(notes)
            
        self.context_bars_num = context_bars_num
        self.target_bars_num = target_bars_num
        self.bos_id = bos_id
        self.eos_id = eos_id
        self.max_len = max_len
        self.is_test = is_test
        
    def __len__(self):
        return len(self.keys)
    
    
    def __getitem__(self, idx):
        notes = self.notes[idx]
        keys = self.keys[idx]
        
        if not self.is_test:
            split_indx = 8

            # split notes to context (input for network) and target (that model must to generate)
            context_notes = notes[split_indx - self.context_bars_num : split_indx]
            target_notes = notes[split_indx: split_indx + self.target_bars_num]
        else:
            context_notes = notes
            target_notes = []

        sequence = []

        for bar in context_notes:
            sequence += bar

        for bar in target_notes:
            sequence += bar
        
        if len(sequence) < self.max_len:
            sequence += [0] * (self.max_len - len(sequence))
        context_tokens = sequence[:-1]
        target_tokens = sequence[1:]

        context_tokens = torch.tensor(context_tokens, dtype=torch.long)
        target_tokens = torch.tensor(target_tokens, dtype=torch.long)

        return {"features": context_tokens, "target": target_tokens}

In [8]:
train_dataset = ABCDataset(train_data)
test_dataset = ABCDataset(test_data)

In [9]:
train_data[0][1]

[[176, 225, 133, 65, 225, 133, 139, 90, 183, 65, 90, 183, 139, 48],
 [225, 133, 65, 225, 133, 51, 32, 78, 48],
 [176,
  225,
  133,
  65,
  225,
  133,
  139,
  118,
  70,
  66,
  58,
  140,
  90,
  56,
  95,
  136,
  77,
  80,
  291,
  101,
  183,
  67,
  182,
  94,
  46,
  48],
 [67,
  356,
  231,
  98,
  225,
  133,
  65,
  225,
  133,
  139,
  225,
  133,
  65,
  225,
  133,
  139,
  48],
 [176, 90, 263, 65, 90, 263, 139, 90, 263, 65, 90, 263, 139, 48],
 [176, 90, 119, 65, 90, 119, 139, 90, 263, 65, 90, 263, 139, 48],
 [176,
  90,
  66,
  119,
  4,
  66,
  90,
  66,
  119,
  484,
  52,
  90,
  66,
  119,
  4,
  66,
  90,
  66,
  119,
  362,
  17,
  8,
  48],
 [150,
  23,
  153,
  334,
  47,
  61,
  90,
  71,
  23,
  153,
  334,
  73,
  90,
  119,
  139,
  90,
  119,
  65,
  90,
  119,
  139,
  48],
 [176, 225, 133, 65, 225, 133, 139, 90, 133, 65, 90, 133, 139, 48],
 [176, 225, 133, 65, 225, 133, 139, 90, 183, 65, 90, 183, 139, 48],
 [176, 90, 183, 65, 90, 183, 139, 225, 133, 65, 22

In [10]:
train_dataset[0]['target']

tensor([225, 133,  65,  ...,   0,   0,   0])

In [11]:
def make_target_dependency_mask(length):
    full_mask = torch.ones(length, length)
    ignore_mask = torch.tril(full_mask) < 1
    full_mask.masked_fill_(ignore_mask, float('-inf'))
    full_mask.masked_fill_(~ignore_mask, 0)
    return full_mask

def make_positional_encoding(max_length, embedding_size):
    time = np.pi * torch.arange(0, max_length).float()
    freq_dividers = torch.arange(1, embedding_size // 2 + 1).float()
    inputs = time[:, None] / freq_dividers[None, :]
    
    result = torch.zeros(max_length, embedding_size)
    result[:, 0::2] = torch.sin(inputs)
    result[:, 1::2] = torch.cos(inputs)
    return result

In [12]:
make_target_dependency_mask(10)

tensor([[0., -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf],
        [0., 0., -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf],
        [0., 0., 0., -inf, -inf, -inf, -inf, -inf, -inf, -inf],
        [0., 0., 0., 0., -inf, -inf, -inf, -inf, -inf, -inf],
        [0., 0., 0., 0., 0., -inf, -inf, -inf, -inf, -inf],
        [0., 0., 0., 0., 0., 0., -inf, -inf, -inf, -inf],
        [0., 0., 0., 0., 0., 0., 0., -inf, -inf, -inf],
        [0., 0., 0., 0., 0., 0., 0., 0., -inf, -inf],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., -inf],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]])

In [13]:
def v_multihead_attention(queries, keys, values, padding_mask, dependency_mask,
                           is_training=True,
                           p_dropout=0):
    """
    queries - BatchSize x HeadN x MaxLen x EmbeddingSize_Q
    keys - BatchSize x HeadN x MaxLen x EmbeddingSize_K
    values - BatchSize x HeadN x MaxLen x EmbeddingSize_V
    
    padding_mask - BatchSize x MaxLen
    dependency_mask - MaxLen x MaxLen
    
    is_training - bool
    weights_dropout - float
    
    result:
        BatchSize x HeadN x MaxLen x EmbeddingSize_V
    """

    # BatchSize x HeadN x MaxLen x MaxLen
    relevances = torch.einsum('bhie,bhje->bhij', (queries, keys))
    
    # замаскировать элементы, выходящие за длины последовательностей ключей
    padding_mask_expanded = padding_mask[:, None, :, None].expand_as(relevances)
    relevances.masked_fill_(padding_mask_expanded, float('-inf'))
    
    # замаскировать пары <выходная позиция, входная позиция>
    relevances = relevances + dependency_mask[None, None, :, :].expand_as(relevances)
    
    relevances = relevances * (1/(np.sqrt(keys.shape[3])))
    normed_rels = F.softmax(relevances, dim=3)    
    normed_rels = F.dropout(normed_rels, p_dropout, is_training)
    
    result = torch.einsum('bhti,bhie->bhte', (normed_rels, values))
    
    return result, normed_rels

In [14]:
class v_MultiheadSelfAttention(nn.Module):
    def __init__(self, embeddingSize, n_heads, dropout=0):
        super().__init__()
        assert embeddingSize % n_heads == 0, 'Размерность модели должна делиться нацело на количество голов'
        self.n_heads = n_heads

        self.queries_proj = nn.Linear(embeddingSize, embeddingSize)
        self.keys_proj = nn.Linear(embeddingSize, embeddingSize)
        self.values_proj = nn.Linear(embeddingSize, embeddingSize)
        
        self.dropout = dropout

        self.last_attention_map = None
    
    def forward(self, sequence, padding_mask, dependency_mask):
        """
        sequence - BatchSize x Len x EmbeddingSize
        padding_mask - BatchSize x Len
        dependency_mask - Len x Len
        
        result - BatchSize x Len x EmbeddingSize
        
        """
        batch_size, max_len, embeddingSize = sequence.shape
        
        queries_flat = self.queries_proj(sequence)  # BatchSize x Len x EmbeddingSize
        queries = queries_flat.view(batch_size, self.n_heads, max_len, -1)
        
        keys_flat = self.keys_proj(sequence)  # BatchSize x Len x ModelSize
        keys = keys_flat.view(batch_size, self.n_heads, max_len, -1)
        
        values_flat = self.values_proj(sequence)  # BatchSize x Len x ModelSize
        values = values_flat.view(batch_size, self.n_heads, max_len, -1)
        
        # BatchSize x Len x HeadsN x ValueSize
        result, att_map = v_multihead_attention(queries, keys, values, padding_mask, dependency_mask, self.training, self.dropout)
        result_flat = result.view(batch_size, max_len, embeddingSize)
        
        self.last_attention_map = att_map.detach()

        return result_flat

In [15]:
class v_TransformerEncoderLayer(nn.Module):
    def __init__(self, embeddingSize, n_heads, dim_feedforward, dropout):
        super().__init__()
        self.self_attention = v_MultiheadSelfAttention(embeddingSize,
                                                       n_heads,
                                                       dropout=dropout)
        self.first_dropout = nn.Dropout(dropout)
        self.first_norm = nn.LayerNorm(embeddingSize)
        
        self.feedforward = nn.Sequential(
            nn.Linear(embeddingSize, dim_feedforward),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(dim_feedforward, embeddingSize),
            nn.Dropout(dropout)
        )
        self.second_norm = nn.LayerNorm(embeddingSize)
    
    def forward(self, sequence, padding_mask, dependency_mask):
        att_features = self.self_attention(sequence, padding_mask, dependency_mask)

        sequence = sequence + self.first_dropout(att_features)
        sequence = self.first_norm(sequence)
        
        sequence = sequence + self.feedforward(sequence)
        sequence = self.second_norm(sequence)
        return sequence

In [16]:
class v_TransformerEncoder(nn.Module):
    def __init__(self, n_layers, **kwargs):
        super().__init__()
        self.layers = nn.ModuleList([
            v_TransformerEncoderLayer(**kwargs)
            for _ in range(n_layers)
        ])
        self.initialize_weights()

    def forward(self, sequence, padding_mask, dependency_mask):
        for layer in self.layers:
            sequence = layer(sequence, padding_mask, dependency_mask)
        return sequence

    def initialize_weights(self):
        for param in self.parameters():
            if param.dim() > 1:
                nn.init.xavier_uniform_(param)

In [17]:
class musicModel(nn.Module):
    def __init__(self, vocab_size, embedding_size, backbone, emb_dropout=0.0):
        super().__init__()
        self.embedding_size = embedding_size
        self.embeddings = nn.Embedding(vocab_size, embedding_size, padding_idx=0)
        self.emb_dropout = nn.Dropout(emb_dropout)
        self.backbone = backbone
        self.out = nn.Linear(embedding_size, vocab_size)
    
    def forward(self, seed_token_ids):
        """
            seed_token_ids - BatchSize x MaxInLen
        """
        batch_size, max_in_length = seed_token_ids.shape

        seed_padding_mask = seed_token_ids == 0
        dependency_mask = make_target_dependency_mask(max_in_length) \
            .to(seed_token_ids.device)
        
        seed_embs = self.embeddings(seed_token_ids)  # BatchSize x MaxInLen x EmbSize
        pos_codes = make_positional_encoding(max_in_length,
                                             self.embedding_size).unsqueeze(0).to(seed_embs.device)
        seed_embs = seed_embs + pos_codes
        seed_embs = self.emb_dropout(seed_embs)

        # BatchSize x TargetLen x EmbSize
        target_features = seed_embs
        target_features = self.backbone(seed_embs,
                                        dependency_mask=dependency_mask,
                                        padding_mask=seed_padding_mask)
        logits = self.out(target_features)  # BatchSize x TargetLen x VocabSize
        return logits

In [18]:
def lm_cross_entropy(pred, target):
    """
    pred - BatchSize x TargetLen x VocabSize
    target - BatchSize x TargetLen
    """
    pred_flat = pred.view(-1, pred.shape[-1])  # BatchSize*TargetLen x VocabSize
    target_flat = target.view(-1)  # BatchSize*TargetLen
    return F.cross_entropy(pred_flat, target_flat, ignore_index=0)


In [19]:
my_transf_model = ABCModel(tokenizer.vocab_size(),
                                256,
                                v_TransformerEncoder(
                                    n_layers=3,
                                    embeddingSize=256,
                                    n_heads=16,
                                    dim_feedforward=512,
                                    dropout=0.1),
                                emb_dropout=0.1)

def get_params_number(model):
    return sum(t.numel() for t in model.parameters())

print('Количество параметров', get_params_number(my_transf_model))

Количество параметров 1640436


In [20]:
from torch.utils.data import DataLoader

device = torch.device('cuda')
model = my_transf_model.to(device)

criterion = lm_cross_entropy

optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=0)

lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer,patience=20,factor=0.5,verbose=True)

batch_size = 1
train_dataloader = DataLoader(train_dataset, batch_size=batch_size)
val_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [21]:
import copy
import datetime

best_val_loss = float('inf')
best_epoch_i = 0
best_model = copy.deepcopy(model)

In [None]:
epoch_n = 30

for epoch_i in range(epoch_n):
    epoch_start = datetime.datetime.now()
    print('Эпоха {}'.format(epoch_i))

    model.train()
    mean_train_loss = 0
    train_batches_n = 0
    for batch_i, elem in enumerate(train_dataloader):
        
        batch_x = elem['features'].to(device)
        batch_y = elem['target'].to(device)

        pred = model(batch_x)
        loss = criterion(pred, batch_y)

        model.zero_grad ()
        loss.backward()

        optimizer.step()

        mean_train_loss += float(loss)
        train_batches_n += 1

    mean_train_loss /= train_batches_n
    lr_scheduler.step(mean_train_loss)
    
    print('Эпоха: {} итераций, {:0.2f} сек'.format(train_batches_n,
                                                   (datetime.datetime.now() - epoch_start).total_seconds()))
    print('Среднее значение функции потерь на обучении', mean_train_loss)

Эпоха 0


In [None]:
model.eval()
mean_val_loss = 0
val_batches_n = 0

with torch.no_grad():
    for batch_i, elem in enumerate(val_dataloader):
        batch_x = elem['features'].to(device)
        batch_y = elem['target'].to(device)

        pred = model(batch_x)
        loss = criterion(pred, batch_y)

        mean_val_loss += float(loss)
        val_batches_n += 1

mean_val_loss /= val_batches_n
print('Среднее значение функции потерь на валидации', mean_val_loss)

In [None]:
!nvidia-smi

In [None]:
(keys, notes) = read_abc(r".\yandex-music-generation-contest\testset\abc\1.abc")
print(notes)
bars = notes.split(" | ")
print(bars)
notes_tokens = [tokenizer.encode(i + " | ") for i in bars]
print(notes_tokens)

In [None]:
(keys, notes) = read_abc(r".\yandex-music-generation-contest\testset\abc\1.abc")

t = tokenizer.encode(notes)
t = torch.tensor(t).unsqueeze(0).to(device)
print(t)
g = []
for i in range(100):
    output = model(t)[0,-1].argmax()
    t = torch.cat([t, output.unsqueeze(0).unsqueeze(0)], dim=1)
    g += [int(t[-1][-1])]
    
print(t)

In [None]:
tokenizer.decode(list(t))

In [None]:

notes_tokens = [tokenizer.encode(i + " | ") for i in bars]

In [None]:
class v_MusicCritic(nn.Module):
    def __init__(self, vocab_size, embedding_size, num_latents, latent_dim, backbone, emb_dropout=0.0):
        super().__init__()
        self.embedding_size = embedding_size
        self.embeddings = nn.Embedding(vocab_size, embedding_size, padding_idx=0)
        self.emb_dropout = nn.Dropout(emb_dropout)
        self.backbone = backbone
        self.latents = nn.Parameter(torch.randn(num_latents, latent_dim))
        self.out = nn.Linear(embedding_size + 2, 1)
    
    def forward(self, offsets, note_ids, durations):
        """
            note_ids - BatchSize x ChunkSize
        """
        
        note_embs = self.embeddings(note_ids)  # BatchSize x ChunkSize x EmbSize
        offsets = torch.Tensor(offsets)
        durations = torch.Tensor(durations)
        durations = durations.unsqueeze(1)
        durations = durations.unsqueeze(0)
        offset = offset.unsqueeze(1)
        offset = offset.unsqueeze(0)
        embs = torch.cat([note_embs, durations], 2)
        embs = torch.cat([note_embs, offset], 2)
        
        #note_embs = self.emb_dropout(note_embs)

        # BatchSize x TargetLen x EmbSize
        target_features = embs
        target_features = self.backbone(embs)
        logits = self.out(target_features)  # BatchSize x TargetLen x 1
        return logits

In [None]:
import torch

embs = torch.Tensor([ [ [1,2,3], [4,5,6] ] ])
offset = torch.Tensor([0, 0])
offset = offset.unsqueeze(1)
offset = offset.unsqueeze(0)

torch.cat([embs, offset], 2)