# create dataset

In [1]:
# ------------------------------------------------------------------------------------------------
# improt library ที่ใช้ทั้หมด
# ------------------------------------------------------------------------------------------------

import os
import ast
import torch
from torch.utils.data import Dataset
import pandas as pd
import torchaudio
import librosa
import numpy as np
import math
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

# ------------------------------------------------------------------------------------------------

In [2]:
print(f"pandas version: {pd.__version__}")
print(f"Pytorch version: {torch.__version__}")
print(f"Is CUDA available? : {torch.cuda.is_available()}")
print(f"Number of CUDA devices: {torch.cuda.device_count()}")

pandas version: 2.2.2
Pytorch version: 2.3.1+cu121
Is CUDA available? : True
Number of CUDA devices: 1


In [3]:
exp = [2, 2, 1]  # เปลี่ยนไปตาม experiment

path = f"../data/exp{exp[0]}-{exp[1]}/{exp[0]}-{exp[1]}-{exp[2]}"
csv_train = f"../data/exp{exp[0]}-{exp[1]}/{exp[0]}-{exp[1]}-{exp[2]}/file_names_MT_exp{exp[0]}-{exp[1]}-{exp[2]}_train.csv"
csv_valid = f"../data/exp{exp[0]}-{exp[1]}/{exp[0]}-{exp[1]}-{exp[2]}/file_names_MT_exp{exp[0]}-{exp[1]}-{exp[2]}_validate.csv"
csv_test = f"../data/exp{exp[0]}-{exp[1]}/{exp[0]}-{exp[1]}-{exp[2]}/file_names_MT_exp{exp[0]}-{exp[1]}-{exp[2]}_test.csv"

In [4]:
RANGE_NOTE_ON = 128
RANGE_NOTE_OFF = 128
RANGE_VEL = 32
RANGE_TIME_SHIFT = 100

note_on_token = [f'<Event type: note_on, value:{j}>' for j in range(0, RANGE_NOTE_ON)]
note_off_token = [f'<Event type: note_off, value:{j}>' for j in range(0, RANGE_NOTE_OFF)]
time_token = [f'<Event type: time_shift, value: {i}>' for i in range(RANGE_TIME_SHIFT)]
velocity = [f'<Event type: velocity, value: {i}>' for i in range(RANGE_VEL)]
all_note = note_on_token + note_off_token + time_token + velocity+ ["<SOS>", "<EOS>", "<PAD>"]
n_note = len(all_note)
n_note

391

In [5]:
# ------------------------------------------------------------------------------------------------
# สร้าง class เพื่อเตรียม Dataset
# ------------------------------------------------------------------------------------------------


class NoteDataset(Dataset):

    # กำหนด init ของคลาส ระบุคุณสมบัติเบื้องต้น ------------------------------------------
    def __init__(
        self,
        annotations_file,  # csv รวมชื่อไฟล์เสียง
        audio_dir,  # โฟล์เดอร์ของไฟล์
        transformation,  # รูปแบบการแปลงเป็นเมลสเปคโตรแกรม
        target_sample_rate,  # sample rate ปกติ
        time_length,  # ความยาวของเวลาที่จะตัด
        all_note,  # ลิสท์ของ target ทั้งหมดที่เป็นไปได้
        max_length,  # ความยาวของไฟล์ที่ยาวที่สุด
    ):
        # กำหนดค่าให้ตัวแปรในคลาสด้วยตัวแปรที่รับเข้ามา
        self.annotations = pd.read_csv(annotations_file)
        self.audio_dir = audio_dir
        self.transformation = transformation
        self.target_sample_rate = target_sample_rate
        self.time_length = time_length  # ตอนนี้ตั้งเป็น 100

        self.all_note = all_note
        self.max_length = max_length

    # -----------------------------------------------------------------------------

    # เป็น method ที่ถ้าเราใช้ len() จะทำให้ได้จำนวนของข้อมูลที่เรามีอยู่ โดยดูจากจำนวนข้อมูลในไฟล์ csv
    def __len__(self):
        return len(self.annotations)

    # -----------------------------------------------------------------------------

    # เป็น method ที่กำหนดการเข้าถึงข้อมูลคลาสนี้ ว่าจะเข้าถึงยังไง ทำอะไรกับข้อมูลที่จะได้ไปบ้าง -----
    def __getitem__(self, index):
        audio_sample_path = self._get_audio_sample_path(
            index
        )  # สร้าง path สำหรับดึงไฟล์ wav แต่ละไฟล์
        label = self._get_audio_sample_label(index)
        label_tensor = self._targetTensor(label)  # คือ label ที่ใส่ eos
        input_tensor = self._inputTensor(label)  # คือ label ที่ใส่ sos
        signal, sr = torchaudio.load(audio_sample_path)

        signal = self._resample_if_necessary(signal, sr)
        signal = self._mix_down_if_necessary(
            signal
        )  # signal -> (num_channels, samples) -> (2, 16000) -> (1, 16000)
        melsignal = self.transformation(signal)  # ไฟล์โน๊ต 3 ตัว ขนาด [1, 1024, 188]
        # print(melsignal.shape)
        # เปลี่ยนจากที่มีหลาย batch ในแกนแรกให้เป็น 1 batch และแปลงขนาดของ Mel-spectrogram จาก (จำนวน batch, n_mel, เวลา) เป็น (1, เวลา, n_mel)
        # signal_tensor = melsignal.reshape(1, melsignal.shape[2], melsignal.shape[1])
        sequences = self.split_melspectrogram(melsignal)
        # print(sequences.shape)
        signal_tensor = torch.stack(
            sequences
        )  # เอาลิสต์ของ tensor ที่ทำการแบ่งก่อนแล้ว ให้มาเป็น tensor ก้อนใหญ่ ๆ โดยจะมีมิติเพิ่มเข้ามา 1 มิติ (ตอนนี้ข้อมูลมี 4 มิติแล้ว)
        # print(signal_tensor.shape)
        signal_tensor = torch.flatten(
            signal_tensor, start_dim=1, end_dim=-1
        )  # ทำให้อยู่ในรูปแบบของเวกเตอร์ 1 มิติ
        return signal_tensor, input_tensor, label_tensor, label

    # -----------------------------------------------------------------------------

    # เป็น method ที่เอาไว้รวม folder กับ path ที่ได้จาก csv ให้เป็น file path --------------
    def _get_audio_sample_path(self, index):
        path = os.path.join(
            self.audio_dir, self.annotations.iloc[index, 0]  # ใช้ os จะรวมแบบมี / ให้
        )  # เอาชื่อโฟล์เดอร์ในตัวแปร audio_dir มารวมกับ ชื่อไฟล์ที่ได้จากคอลัม 0 ในไฟล์ csv
        return path

    # -----------------------------------------------------------------------------

    # เป็น method ที่เอาไว้เอาค่า label ของแต่ละไฟล์เสียงมา --------------------------------
    def _get_audio_sample_label(self, index):
        label = self.annotations.iloc[index, 1]  # เอาลาเบลจากไฟล์ csv คอลัม 1 มา
        label = ast.literal_eval(
            label
        )  # แยก str ให้ออกมาเป็น list ของตัวเลข : '[1, 2]' -> [1, 2]
        # label = [all_note.index(lab) for lab in label]
        return label

    # -----------------------------------------------------------------------------

    # เป็น method ที่เอาไว้เปลี่ยน label ที่เป็น list ให้เป็น tensor และเพิ่ม eos เข้าไปด้วย ------
    def _targetTensor(self, label):
        pitch_indexes = [
            int(pitch) for pitch in label
        ]  # วนให้แน่ใจว่า label เราเป็นลิสต์ตัวเลขแน่ ๆ
        pitch_indexes.insert(0, int(self.all_note.index("<SOS>")))
        pitch_indexes.append(int(self.all_note.index("<EOS>")))  # เพิ่ม EOS เข้าไปท้ายลิสต์
        return torch.LongTensor(pitch_indexes)  # เปลี่ยน list ให้เป็น tensor

    # -----------------------------------------------------------------------------

    # เป็น method ที่สร้าง tensor ที่มี [0, 0, ..., 0, 1 , 0, ..., 0] ตามจำนวนของโน๊ต -----
    def _inputTensor(self, label):
        labelwithsos = [all_note.index("<SOS>")]  # เอา sos ใส่ไว้หน้าสุดของ list
        for note in label:  # วนโน๊ตตาม label
            labelwithsos.append(note)  # เพิ่มโน๊ตลงลิสต์ที่มี sos อยู่
        labelwithsos.append(all_note.index("<EOS>"))
        # สร้าง tensor ขนาด จำนวน label แถว 1 หลัก และหลักนั้นเป็น [0 , ..., 0] ที่ยาวเท่าจำนวน class ที่เป็นไปได้
        tensor = torch.zeros(len(labelwithsos), 1, n_note)
        for li in range(len(labelwithsos)):  # วนตามจำนวนของ label
            note = labelwithsos[li]  # เอาเลข class ของ label มาใส่ไว้ในตัวแปร note
            tensor[li][0][
                note
            ] = 1  # เปลี่ยน [0 , ..., 0] แต่ละอัน ให้เป็น 1 ในตำแหน่งที่เป็น class นั้น ๆ
        return tensor

    # -----------------------------------------------------------------------------

    # เป็น method ที่เอาไว้ทำให้สัญญาณที่เอาเข้ามาถูกแบ่งด้วย sample rate ที่เท่ากัน -------------
    def _resample_if_necessary(self, signal, sr):
        if (
            sr != self.target_sample_rate
        ):  # ถ้า sample rate ที่ได้จากการโหลดเสียงและแปลงมาไม่เท่ากับ sample rate ที่ตั้งไว้
            resampler = torchaudio.transforms.Resample(
                sr, self.target_sample_rate
            )  # ให้แปลงใหม่ด้วยค่า sample rate ที่ตั้งไว้
            signal = resampler(signal)
        return signal

    # -----------------------------------------------------------------------------

    # เป็น method ที่เอาไว้ทำให้ Channel ของสัญญาณเสียง จาก 2 เป็น 1 channels ------------
    def _mix_down_if_necessary(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal

    # -----------------------------------------------------------------------------

    # ไม่ได้ใช้ _cut_if_necessary แล้ว
    # ไม่ได้ใช้ _right_pad_if_necessary แล้ว
    # ไม่ได้ทำ onset ที่หา Attack แล้ว

    # เป็น method ที่ใช้แบ่ง mel-spectrogram ของไฟล์ที่ยาวออกเป็นหลาย ๆ ภาพ ตาม time length ที่กำหนดไว้
    def split_melspectrogram(self, signal):
        # seq_len = math.ceil(signal.size(1) / self.time_length) # คำนวนว่าจะได้กี่ก้อน จริง ๆ ไม่จำเป็น
        total_size = signal.size(
            2
        )  # เอาความยาวเวลาทั้งหมดของไฟล์นั้น ๆ มา อยู่ตน. 1 เพราะรูปร่างคือ (1, เวลา, n_mel)
        split_size = self.time_length  # เอาความยาวเวลาที่ต้องการแบ่งมาเก็บในตัวแปร
        remainder = (
            total_size % split_size
        )  # คำนวนว่าจะเหลือเศษเวลาที่ไม่ครบตามที่กำหนดอยู่เท่าไหร่
        # print(total_size, seq_len, remainder)
        split_tensors = torch.split(
            signal, split_size, dim=2
        )  # แบ่ง mel-spectrogram ออกตามเวลาที่กำหนด

        # ถ้าเหลือเศษ จะต้อง padding ก้อนเศษให้มีขนาดเท่า ๆ กับก้อนอื่น ๆ
        if remainder > 0:  # เช็คว่าเหลือเศษไหม
            last_tensor = split_tensors[-1]  # ถ้าเหลือก็เลือกเอาก้อนสุดท้ายที่เป็นก้อนเศษมา
            padding_size = split_size - last_tensor.size(
                2
            )  # คำนวนว่าต้อง padding อีกเท่าไหร่
            # กำหนดว่าจะทำ padding ที่มิติไหนบ้าง โดยไม่ทำกับมิติที่ 0 แต่ทำกับมิติที่ 3 ตามจำนวน padding size ที่คำนวนไว้
            last_tensor = torch.nn.functional.pad(last_tensor, (0 , padding_size))
            split_tensors = split_tensors[:-1] + (
                last_tensor,
            )  # เอาอันที่ padding มาแทนก้อนเศษอันสุดท้าย
        # print(split_tensors.shape)
        return split_tensors

    # -----------------------------------------------------------------------------


# ------------------------------------------------------------------------------------------------
# ทำให้ข้อมูลเป็น batch
# ------------------------------------------------------------------------------------------------

# def create_data_loader(data, batch_size):

#     signal_list, input_list, labeltensor_list, labelori_list = [], [], [], [] # สร้าง list เปล่าไว้
#     num_batch = math.ceil(len(data) / batch_size) # คำนวนหาจำนวน batch ที่ต้องมี
#     for i in range(len(data)): # วนเรียกเอาข้อมูลทีละชุด ๆ
#         signal, input_ten, label_ten, label_ori = data[i] # เอาข้อมูลทุกอันของ sample ที่ i ออกมา
#         signal_list.append(signal) # เพิ่มลง list
#         input_list.append(input_ten) # เพิ่มลง list
#         labeltensor_list.append(label_ten) # เพิ่มลง list
#         labelori_list.append(label_ori) # เพิ่มลง list

#     train_data_loader = [] # ลิสต์ของข้อมูลที่จะเอาไปเทรน
#     start = 0
#     for batch in range(1, num_batch+1): # วนตามจำนวน batch ที่คำนวนได้
#         # รวม tensor ใน list ตั้งแต่ตำแหน่งต้น batch ถึงท้าย batch ให้เป็น tensor ก้อนใหญ่ ๆ ก้อนเดียว และเติมค่าที่ตั้งไว้ในส่วนที่ไม่มีข้อมูลเพื่อให้ทุก batch มีค่าเท่ากัน
#         signal_batch = pad_sequence(signal_list[start:batch_size*batch], padding_value=-1,batch_first=True)
#         onehot_batch = pad_sequence(input_list[start:batch_size*batch], padding_value=0,batch_first=True)
#         labeltensor_batch = pad_sequence(labeltensor_list[start:batch_size*batch], padding_value=14,batch_first=True) #เปลี่ยน padding เป็น 14
#         each_batch = (signal_batch, onehot_batch, labeltensor_batch)
#         train_data_loader.append(each_batch) # เพิ่มก้อน batch เข้าไปในลิสท์ของข้อมูลที่จะเอาไปเทรน
#         start = batch_size*batch
#     return train_data_loader


def collate_fn(batch):
    signal_list, input_list, labeltensor_list, labelori_list = [], [], [], []

    # Extract data from each sample in the batch
    for signal, input_ten, label_ten, label_ori in batch:
        signal_list.append(signal)
        input_list.append(input_ten)
        labeltensor_list.append(label_ten)
        labelori_list.append(label_ori)

    # Pad the sequences in each list
    signal_batch = pad_sequence(
        signal_list, padding_value=-1, batch_first=False
    )  # เดิม batch_first=True
    onehot_batch = pad_sequence(
        input_list, padding_value=0, batch_first=False
    )  # เดิม batch_first=True
    labeltensor_batch = pad_sequence(
        labeltensor_list, padding_value=all_note.index("<PAD>"), batch_first=False
    )  # เดิม batch_first=True

    return signal_batch, onehot_batch, labeltensor_batch

In [6]:
# ------------------------------------------------------------------------------------------------
# ทดสอบดึงข้อมูล
# ------------------------------------------------------------------------------------------------

if __name__ == "__main__":  # เป็นการตั้งเื่อนไขว่า ถ้ารักไฟล์นี้เป็นไฟล์หลัก จะทำการรันโค้ดด้านล่างต่อไปนี้

    # กำหนดค่าตัวแปรต่าง ๆ
    ANNOTATIONS_FILE = csv_train
    AUDIO_DIR = path
    SAMPLE_RATE = 16000
    BATCH_SIZE = 64
    # NUM_SAMPLES = 22050
    time_length = 100
    max_length = 60
    batch_size = 10

    # กำหนดค่าในการแปลงข้อมูลเป็น mel-spectrogram
    mel_spectrogram = torchaudio.transforms.MelSpectrogram(
        sample_rate=SAMPLE_RATE,
        n_fft=1024,
        hop_length=512,
        n_mels=1024,  # 256 ผิด 2, 512 ผิด 1
    )

    # แปลงเป็น mel และทำให้เป็นข้อมูลก้อนนึงใหญ่ ๆ ที่ยังไม่แบ่ง batch
    notedata = NoteDataset(
        ANNOTATIONS_FILE,
        AUDIO_DIR,
        mel_spectrogram,
        SAMPLE_RATE,
        time_length,
        all_note,
        max_length,
    )

    print(f"There are {len(notedata)} samples in the dataset.")
    print("NoteDataset : ", notedata[0][0].shape)
    print("-----------------------------------------------")

    # แสดงขนาดของข้อมูลในแต่ละ sample
    for i in range(len(notedata)):
        print("-----------------------------------------------")
        signal, input_ten, label, _ = notedata[i]
        print("signal shape : ", signal.shape)
        print("input  : ", input_ten)
        print("label : ", label)
        print("-----------------------------------------------")
        break

    train_dataloader = DataLoader(
        notedata, batch_size=BATCH_SIZE, collate_fn=collate_fn
    )

    print(
        "Num of train_data_loader: ", len(train_dataloader), " batches"
    )  # แสดงจำนวน batch

    # แสดงขนาดของข้อมูลในแต่ละ batch
    for i, batch in enumerate(train_dataloader):
        signals, input_tensor, target_tensor = batch
        print("Batch :", i + 1)
        print("Signals Shape:", signals.shape)
        print("input_tensor (Onehots) Shape:", input_tensor.shape)
        print("Labels Shape:", target_tensor.shape)
        print("Labels:", target_tensor)
        print("-----------------------------------------------")
        if i == 3:
            break

There are 600 samples in the dataset.
NoteDataset :  torch.Size([3, 102400])
-----------------------------------------------
-----------------------------------------------
signal shape :  torch.Size([3, 102400])
input  :  tensor([[[0., 0., 0.,  ..., 1., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        ...,

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 1., 0.]]])
label :  tensor([388, 387,  53, 355, 181, 355, 387,  53, 355, 181, 355, 387,  49, 355,
        177, 355, 387,  56, 355, 184, 355, 387,  48, 355, 176, 355, 387,  52,
        355, 180, 355, 387,  48, 355, 176, 355, 387,  59, 355, 187, 389])
-----------------------------------------------
Num of train_data_loader:  10  batches




Batch : 1
Signals Shape: torch.Size([3, 64, 102400])
input_tensor (Onehots) Shape: torch.Size([51, 64, 1, 391])
Labels Shape: torch.Size([51, 64])
Labels: tensor([[388, 388, 388,  ..., 388, 388, 388],
        [387, 387, 387,  ..., 387, 387, 387],
        [ 53,  57,  56,  ...,  58,  53,  54],
        ...,
        [390, 390, 355,  ..., 390, 390, 390],
        [390, 390, 178,  ..., 390, 390, 390],
        [390, 390, 389,  ..., 390, 390, 390]])
-----------------------------------------------
Batch : 2
Signals Shape: torch.Size([3, 64, 102400])
input_tensor (Onehots) Shape: torch.Size([51, 64, 1, 391])
Labels Shape: torch.Size([51, 64])
Labels: tensor([[388, 388, 388,  ..., 388, 388, 388],
        [387, 387, 387,  ..., 387, 387, 387],
        [ 56,  59,  57,  ...,  51,  49,  58],
        ...,
        [390, 390, 355,  ..., 390, 390, 390],
        [390, 390, 179,  ..., 390, 390, 390],
        [390, 390, 389,  ..., 390, 390, 390]])
-----------------------------------------------
Batch : 3
Sign

# Network

In [7]:
from torch import Tensor
import torch
import torch.nn as nn
from torch.nn import Transformer
import math

In [8]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PAD_IDX = all_note.index("<PAD>")


# helper Module that adds positional encoding to the token embedding to introduce a notion of word order.
class PositionalEncoding(nn.Module):
    def __init__(self, emb_size: int, dropout: float, maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(-torch.arange(0, emb_size, 2) * math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer("pos_embedding", pos_embedding)

    def forward(self, token_embedding: Tensor):
        # print("token_embedding shape:", token_embedding.shape)
        # print("pos_embedding shape:", self.pos_embedding.shape)
        # print(f"Positional encoding input shape: {token_embedding.shape}")
        return self.dropout(
            token_embedding + self.pos_embedding[: token_embedding.size(0), :]
        )


# helper Module to convert tensor of input indices into corresponding tensor of token embeddings
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens: Tensor):
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)


# Seq2Seq Network
class Seq2SeqTransformer(nn.Module):
    def __init__(
        self,
        num_encoder_layers: int,
        num_decoder_layers: int,
        emb_size: int,
        nhead: int,
        #  src_vocab_size: int,
        tgt_vocab_size: int,
        dim_feedforward: int = 512,
        dropout: float = 0.1,  # 0.1
    ):
        super(Seq2SeqTransformer, self).__init__()
        self.transformer = Transformer(
            d_model=emb_size,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
        )
        self.generator = nn.Linear(emb_size, tgt_vocab_size)
        # self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
        self.positional_encoding = PositionalEncoding(emb_size, dropout=dropout)

    def forward(
        self,
        src: Tensor,
        trg: Tensor,
        src_mask: Tensor,
        tgt_mask: Tensor,
        src_padding_mask: Tensor,
        tgt_padding_mask: Tensor,
        memory_key_padding_mask: Tensor,
    ):
        # print('signal tensor')
        # src_emb = self.positional_encoding(self.src_tok_emb(src))
        src_emb = self.positional_encoding(src)
        # print('target tensor')
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
        # tgt_emb = self.positional_encoding(trg)
        outs = self.transformer(
            src_emb,
            tgt_emb,
            src_mask,
            tgt_mask,
            None,
            src_padding_mask,
            tgt_padding_mask,
            memory_key_padding_mask,
        )
        return self.generator(outs)

    def encode(self, src: Tensor, src_mask: Tensor, src_key_padding_mask):
        return self.transformer.encoder(
            self.positional_encoding(src),
            src_mask,
            src_key_padding_mask=src_key_padding_mask,
        )

    def decode(
        self,
        tgt: Tensor,
        memory: Tensor,
        tgt_mask: Tensor,
        tgt_key_padding_mask,
        memory_key_padding_mask,
    ):
        print(tgt.shape)
        print(self.positional_encoding(tgt).shape)
        print(memory.shape)
        print(tgt_mask.shape)
        return self.transformer.decoder(
            self.positional_encoding(tgt),
            memory,
            tgt_mask,
            tgt_key_padding_mask=tgt_key_padding_mask,
            memory_key_padding_mask=memory_key_padding_mask,
        )

In [9]:
def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    mask = (
        mask.float()
        .masked_fill(mask == 0, float("-inf"))
        .masked_fill(mask == 1, float(0.0))
    )
    return mask


def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len), device=DEVICE).type(torch.bool)
    # print(src.shape, tgt.shape)
    src_padding_mask = (src[:, :, 0] == PAD_IDX).transpose(
        0, 1
    )  # เพราะคอนแรก src มีขนาด torch.Size([22, 128, 10240]) ต้องการแค่ [22, 128]
    tgt_padding_mask = (tgt == PAD_IDX).transpose(
        0, 1
    )  # tgt ใช้ขนาด torch.Size([6, 128]) ก็พอ
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask


#   src_mask: shape [22, 22] filled with False.
#   tgt_mask: shape [6, 6] with -inf above the diagonal to mask future tokens.
#   src_padding_mask: shape [128, 22] indicating padding positions in src.
#   tgt_padding_mask: shape [128, 6] indicating padding positions in tgt.

# train

initial ค่าต่าง ๆ

In [10]:
import pandas as pd
import numpy as np
import evaluate
import wandb

import torch
import torchaudio
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchmetrics.functional.classification import multiclass_accuracy
from nltk.translate.bleu_score import corpus_bleu, sentence_bleu, SmoothingFunction

# import numpy as np

# from NoteDataset import NoteDataset, n_note, all_note, create_data_loader
# from Network import EncoderRNN, DecoderRNN
# from early_stop import EarlyStopping

from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

  from .autonotebook import tqdm as notebook_tqdm


In [11]:
print(f"pandas version: {pd.__version__}")
print(f"Pytorch version: {torch.__version__}")
print(f"Is CUDA available? : {torch.cuda.is_available()}")
print(f"Number of CUDA devices: {torch.cuda.device_count()}")

pandas version: 2.2.2
Pytorch version: 2.3.1+cu121
Is CUDA available? : True
Number of CUDA devices: 1


In [12]:
torch.manual_seed(0)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(all_note)  # ตามจำนวนโน๊ตทั้งหมด
EMB_SIZE = 1024  # เดิม 512 มันควรเป็นขนาดหลัง flatten ไหมนะ ?
NHEAD = 4  # 8
FFN_HID_DIM = 512
BATCH_SIZE = 32  # ควรหารด้วย nhead ลงตัว
NUM_ENCODER_LAYERS = 3
NUM_DECODER_LAYERS = 3

MODEL_SAVE_PATH = f"../model_ref_MT/model/transformer_exp{exp[0]}-{exp[1]}-{exp[2]}.pth"
ANNOTATIONS_FILE_train = csv_train
ANNOTATIONS_FILE_valid = csv_valid
ANNOTATIONS_FILE_test = csv_test
AUDIO_DIR = path
SAMPLE_RATE = 16000
NUM_SAMPLES = 22050
time_length = 1  # ลดจาก 100 เหลือ 10 ทำให้ต้องลด emb_size ด้วย
max_length = 60
# batch_size = 10
n_fft = 1024
hop_length = 512
n_mels = 1024
name_wandb = f"transformer exp{exp[0]}-{exp[1]}-{exp[2]} ref MusicTransformation"

transformer = Seq2SeqTransformer(
    NUM_ENCODER_LAYERS,
    NUM_DECODER_LAYERS,
    EMB_SIZE,
    NHEAD,
    # SRC_VOCAB_SIZE,
    TGT_VOCAB_SIZE,
    FFN_HID_DIM,
)

# instantiating our dataset object and create data loader
mel_spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate=SAMPLE_RATE, n_fft=1024, hop_length=512, n_mels=1024
)  # จะแก้พารามิเตอร์ n_fft=1024 -> 512, hop_length=512 -> 256


for p in transformer.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

transformer = transformer.to(DEVICE)

loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)

optimizer = torch.optim.Adam(
    transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9
)



โหลดข้อมูล และ padding

In [13]:
from torch.nn.utils.rnn import pad_sequence


# helper function to club together sequential operations
def sequential_transforms(*transforms):
    def func(txt_input):
        for transform in transforms:
            txt_input = transform(txt_input)
        return txt_input

    return func


# function to add BOS/EOS and create tensor for input sequence indices
# def tensor_transform(token_ids: List[int]):
#     return torch.cat((torch.tensor([BOS_IDX]),
#                       torch.tensor(token_ids),
#                       torch.tensor([EOS_IDX])))

# ``src`` and ``tgt`` language text transforms to convert raw strings into tensors indices
# text_transform = {}
# for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
#     text_transform[ln] = sequential_transforms(token_transform[ln], #Tokenization
#                                                vocab_transform[ln], #Numericalization
#                                                tensor_transform) # Add BOS/EOS and create tensor


# function to collate data samples into batch tensors
# def collate_fn(batch):
#     src_batch, tgt_batch = [], []
#     for src_sample, tgt_sample in batch:
#         src_batch.append(text_transform[SRC_LANGUAGE](src_sample.rstrip("\n")))
#         tgt_batch.append(text_transform[TGT_LANGUAGE](tgt_sample.rstrip("\n")))

#     src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)
#     tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)
#     return src_batch, tgt_batch

In [14]:
# os.environ["WANDB_API_KEY"] = '12e003611d0725d54cbf144b4f1fdc949930bf5b'

# wandb.init(
#     # set the wandb project where this run will be logged
#     project="detect pitch",
#     name=name_wandb,
#     # track hyperparameters and run metadata
#     config={
#         "learning_rate": 0.0001,
#         "architecture": "transformer",
#         "dataset": "midi on",
#         "epochs": 50,
#         "sample_rate": 16000,
#         "n_fft": 1024,
#         "hop_length": 512,
#         "n_mels": 1024,
#         'nhead': NHEAD,
#         "time length": time_length,
#         "emb_size": EMB_SIZE,
#         "batch_size": BATCH_SIZE,
#     },
# )

In [15]:
# def train_epoch(model, optimizer):
#     model.train()
#     losses = 0
#     noteseq_train = NoteDataset(
#         ANNOTATIONS_FILE_train,
#         AUDIO_DIR,
#         mel_spectrogram,
#         SAMPLE_RATE,
#         time_length,
#         all_note,
#         max_length,
#     )
#     train_dataloader = DataLoader(
#         noteseq_train, batch_size=BATCH_SIZE, collate_fn=collate_fn
#     )

#     for src, _, tgt in train_dataloader:  # อันที่ 2 เป็น input tensor ขนาด [127, 6, 1, 15]
#         src = src.to(DEVICE)
#         tgt = tgt.to(DEVICE)
#         # print(src.shape)
#         # print("1 shape:", tgt.shape) # ขนาด torch.Size([6, 128])
#         # Prepare tgt_input (excluding the last token)
#         tgt_input = tgt[:-1, :]
#         # print("2 shape:", tgt_input.shape) # torch.Size([5, 128])
#         # print(src.size(1), tgt_input.size(1)) # 128 128
#         src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(
#             src, tgt_input
#         )
#         # print("src_mask:", src_mask.shape) # torch.Size([220, 220])
#         # print("tgt_mask:", tgt_mask.shape) # torch.Size([5, 5])
#         # print("src_padding_mask:", src_padding_mask.shape) # torch.Size([128, 220])
#         # print("tgt_padding_mask:", tgt_padding_mask.shape) # torch.Size([128, 5])

#         logits = model(
#             src,
#             tgt_input,
#             src_mask,
#             tgt_mask,
#             src_padding_mask,
#             tgt_padding_mask,
#             src_padding_mask,
#         )

#         optimizer.zero_grad()
#         # Prepare tgt_out (excluding the first token)
#         tgt_out = tgt[1:, :]  # เดิม tgt[1:, :]
#         # print("tgt_input:", tgt_input.shape)
#         # print("tgt_out:", tgt_out.shape)
#         loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
#         loss.backward()

#         optimizer.step()
#         losses += loss.item()

#     return losses / len(list(train_dataloader))


# def evaluate(model):
#     model.eval()
#     losses = 0

#     noteseq_valid = NoteDataset(
#         ANNOTATIONS_FILE_valid,
#         AUDIO_DIR,
#         mel_spectrogram,
#         SAMPLE_RATE,
#         time_length,
#         all_note,
#         max_length,
#     )

#     val_dataloader = DataLoader(
#         noteseq_valid, batch_size=BATCH_SIZE, collate_fn=collate_fn
#     )

#     for src, _, tgt in val_dataloader:
#         src = src.to(DEVICE)
#         tgt = tgt.to(DEVICE)
#         tgt_input = tgt[:-1, :]
#         src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(
#             src, tgt_input
#         )
#         logits = model(
#             src,
#             tgt_input,
#             src_mask,
#             tgt_mask,
#             src_padding_mask,
#             tgt_padding_mask,
#             src_padding_mask,
#         )
#         tgt_out = tgt[1:, :]
#         # print(logits)
#         loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
#         # logits.reshape(-1, logits.shape[-1]) คือ flatten logits ได้เป็น [768, 15] โดย 768 มาจาก 6*128 (sequence length * batch size)
#         losses += loss.item()

#     return losses / len(list(val_dataloader))

In [16]:
# from timeit import default_timer as timer

# NUM_EPOCHS = 50

# for epoch in range(1, NUM_EPOCHS + 1):
#     start_time = timer()
#     train_loss = train_epoch(transformer, optimizer)
#     # end_time = timer()
#     val_loss = evaluate(transformer)
#     end_time = timer()
#     print(
#         (
#             f"Epoch: {epoch}, Train loss: {train_loss:.3f}, Val loss: {val_loss:.3f}, "
#             f"Epoch time = {(end_time - start_time):.3f}s"
#         )
#     )
#     # torch.save(transformer.state_dict(), MODEL_SAVE_PATH)
#     # log metrics to wandb
#     wandb.log(
#         {
#             "train loss": train_loss,
#             "valid loss": val_loss,
#             # "exact score": results_eval["exact"],
#             # "BLEU score": results_eval["bleu"],
#             # "avg gen len": results["gen_len"],
#         }
#     )
# torch.save(transformer.state_dict(), MODEL_SAVE_PATH)
# wandb.finish()

# evaluate

In [17]:
import torch
import torchaudio
import torch.nn as nn
from torch.utils.data import DataLoader
from nltk.translate.bleu_score import corpus_bleu, sentence_bleu, SmoothingFunction

import numpy as np
import evaluate

# from notedataset import NoteDataset, n_note, all_note, create_data_loader
# from network import EncoderRNN, DecoderRNN

In [18]:
# SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(all_note)  # ตามจำนวนโน๊ตทั้งหมด
EMB_SIZE = 1024  # เดิม 512 มันควรเป็นขนาดหลัง flatten ไหมนะ ?
# NHEAD = 1  # 8
FFN_HID_DIM = 512
# BATCH_SIZE = 1
NUM_ENCODER_LAYERS = 3
NUM_DECODER_LAYERS = 3

MODEL_SAVE_PATH = f"../model_ref_MT/model/transformer_exp{exp[0]}-{exp[1]}-{exp[2]}.pth"
ANNOTATIONS_FILE_train = csv_train
ANNOTATIONS_FILE_valid = csv_valid
ANNOTATIONS_FILE_test = csv_test
AUDIO_DIR = path
SAMPLE_RATE = 16000
NUM_SAMPLES = 22050
time_length = 1  # ลดจาก 100 เหลือ 10 ทำให้ต้องลด emb_size ด้วย
max_length = 60
# batch_size = 10
n_fft = 1024
hop_length = 512
n_mels = 1024

transformer_eval = Seq2SeqTransformer(
    NUM_ENCODER_LAYERS,
    NUM_DECODER_LAYERS,
    EMB_SIZE,
    NHEAD,
    # SRC_VOCAB_SIZE,
    TGT_VOCAB_SIZE,
    FFN_HID_DIM,
)



In [19]:
def get_exact_score(preds, labels):
    # l = [label.strip() for label in labels]
    exact = evaluate.load("exact_match")
    candidates = [",".join(map(str, pred)) for pred in preds]
    references = [",".join(map(str, label)) for label in labels]
    r = exact.compute(predictions=candidates, references=references, ignore_case=True)
    correct_samples = sum(1 for pred, ref in zip(candidates, references) if pred.lower() == ref.lower())
    print(correct_samples)
    return r["exact_match"]


def get_each_sample_score(preds, labels):
    # l = [label.strip() for label in labels]
    exact = evaluate.load("exact_match")
    # candidates = [','.join(map(str, pred)) for pred in preds]
    # references = [ ','.join(map(str, label)) for label in labels]
    r = exact.compute(predictions=preds, references=labels, ignore_case=True)
    return r["exact_match"]

def cut_to_eos(lst):
    if '<EOS>' in lst:
        index = lst.index('<EOS>') + 1
        return lst[:index]
    return lst

def cut_eos(lst):
    if '<EOS>' in lst:
        index = lst.index('<EOS>')
        return lst[:index]
    return lst

def remove_entries(lst):
    remove_time = [item for item in lst if not item.startswith('<Event type: time_shift,')]
    remove_velo = [item for item in remove_time if not item.startswith('<Event type: velocity,')]
    only_note_on = [item for item in remove_velo if not item.startswith('<Event type: note_off,')]
    return only_note_on

In [20]:
# transformer_eval.load_state_dict(torch.load(MODEL_SAVE_PATH))
# transformer_eval.to(DEVICE)  # Move the model to the appropriate device if necessary

# transformer_eval.eval()
# # src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1)

# mel_spectrogram = torchaudio.transforms.MelSpectrogram(
#     sample_rate=SAMPLE_RATE,
#     n_fft=1024,
#     hop_length=512,
#     n_mels=1024
# )

# noteseq_test = NoteDataset(ANNOTATIONS_FILE_test,
#                         AUDIO_DIR,
#                         mel_spectrogram,
#                         SAMPLE_RATE,
#                         time_length,
#                         all_note,
#                         max_length
#                         )
# test_dataloader = DataLoader(noteseq_test, batch_size=128, collate_fn=collate_fn)

# preds_all = []
# labels_all = []
# sample_score_all = 0
# for src, _, tgt in test_dataloader:
#     src = src.to(DEVICE)
#     tgt = tgt.to(DEVICE)
#     tgt_input = tgt[:-1, :]
#     src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
#     logits = transformer_eval(src, tgt_input, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, src_padding_mask)
#     tgt_out = tgt[1:, :]
#     _, indices = torch.max(logits, dim=2)
#     # แปลง index เป็นคำ
#     words = [all_note[idx] for idx in indices.squeeze().tolist()]
#     target = [all_note[idx] for idx in tgt_out.squeeze().tolist()]
#     sample_score = get_each_sample_score(words, target)
#     sample_score_all += sample_score
#     preds_all.extend(words)
#     labels_all.extend(target)
#     print(words)
#     print(target)
#     print(f"sample score : {sample_score}")
#     # print(" ".join([all_note[int(x)] for x in list(tgt_tokens.cpu().numpy())]).replace("sos", "").replace("eos", ""))
#     print('-'*30)

# print("-- test set --")
# print(f"avg sample score : {sample_score_all/len(test_dataloader)}")
# print(f"exact score: {get_exact_score(preds_all, labels_all)}")

In [21]:
print("cut time")
transformer_eval.load_state_dict(torch.load(MODEL_SAVE_PATH))
transformer_eval.to(DEVICE)
transformer_eval.eval()

mel_spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate=SAMPLE_RATE, n_fft=1024, hop_length=512, n_mels=1024
)

noteseq_test = NoteDataset(
    ANNOTATIONS_FILE_test,
    AUDIO_DIR,
    mel_spectrogram,
    SAMPLE_RATE,
    time_length,
    all_note,
    max_length,
)
test_dataloader = DataLoader(noteseq_test, batch_size=BATCH_SIZE, collate_fn=collate_fn)

preds_all = []
labels_all = []
sample_score_all = 0

for src, _, tgt in test_dataloader:
    src = src.to(DEVICE)
    tgt = tgt.to(DEVICE)
    tgt_input = tgt[:-1, :]
    src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
    logits = transformer_eval(
        src,
        tgt_input,
        src_mask,
        tgt_mask,
        src_padding_mask,
        tgt_padding_mask,
        src_padding_mask,
    )
    tgt_out = tgt[1:, :]
    _, indices = torch.max(logits, dim=2)

    for i in range(indices.size(1)):
        words = [all_note[idx] for idx in indices[:, i].tolist()]
        target = [all_note[idx] for idx in tgt_out[:, i].tolist()]
        # sample_score = get_each_sample_score(words,target)
        # sample_score_all += sample_score
        words = cut_to_eos(words)
        target = cut_to_eos(target)
        words = remove_entries(words)
        target = remove_entries(target)

        
        preds_all.append(words)
        labels_all.append(target)
        print(words)
        print(target)
        # print(f"sample score : {sample_score}")
        print("-" * 30)

print("-- test set --")
# print(f"avg sample score : {sample_score_all / len(preds_all)}")
print(f"exact score: {get_exact_score(preds_all, labels_all)}")

cut time




['<Event type: note_on, value:56>', '<Event type: note_on, value:55>', '<Event type: note_on, value:56>', '<Event type: note_on, value:59>', '<EOS>']
['<Event type: note_on, value:56>', '<Event type: note_on, value:55>', '<Event type: note_on, value:56>', '<Event type: note_on, value:59>', '<EOS>']
------------------------------
['<Event type: note_on, value:50>', '<Event type: note_on, value:56>', '<Event type: note_on, value:57>', '<EOS>']
['<Event type: note_on, value:50>', '<Event type: note_on, value:56>', '<Event type: note_on, value:57>', '<EOS>']
------------------------------
['<Event type: note_on, value:50>', '<Event type: note_on, value:51>', '<Event type: note_on, value:48>', '<EOS>']
['<Event type: note_on, value:50>', '<Event type: note_on, value:51>', '<Event type: note_on, value:48>', '<EOS>']
------------------------------
['<Event type: note_on, value:49>', '<Event type: note_on, value:54>', '<Event type: note_on, value:58>', '<Event type: note_on, value:55>', '<Even

In [22]:
print("cut time and eos")
transformer_eval.load_state_dict(torch.load(MODEL_SAVE_PATH))
transformer_eval.to(DEVICE)
transformer_eval.eval()

mel_spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate=SAMPLE_RATE, n_fft=1024, hop_length=512, n_mels=1024
)

noteseq_test = NoteDataset(
    ANNOTATIONS_FILE_test,
    AUDIO_DIR,
    mel_spectrogram,
    SAMPLE_RATE,
    time_length,
    all_note,
    max_length,
)
test_dataloader = DataLoader(noteseq_test, batch_size=BATCH_SIZE, collate_fn=collate_fn)

preds_all = []
labels_all = []
sample_score_all = 0

for src, _, tgt in test_dataloader:
    src = src.to(DEVICE)
    tgt = tgt.to(DEVICE)
    tgt_input = tgt[:-1, :]
    src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
    logits = transformer_eval(
        src,
        tgt_input,
        src_mask,
        tgt_mask,
        src_padding_mask,
        tgt_padding_mask,
        src_padding_mask,
    )
    tgt_out = tgt[1:, :]
    _, indices = torch.max(logits, dim=2)

    for i in range(indices.size(1)):
        words = [all_note[idx] for idx in indices[:, i].tolist()]
        target = [all_note[idx] for idx in tgt_out[:, i].tolist()]
        # sample_score = get_each_sample_score(words,target)
        # sample_score_all += sample_score
        words = cut_eos(words)
        target = cut_eos(target)
        words = remove_entries(words)
        target = remove_entries(target)

        
        preds_all.append(words)
        labels_all.append(target)
        print(words)
        print(target)
        # print(f"sample score : {sample_score}")
        print("-" * 30)

print("-- test set --")
# print(f"avg sample score : {sample_score_all / len(preds_all)}")
print(f"exact score: {get_exact_score(preds_all, labels_all)}")

cut time and eos




['<Event type: note_on, value:56>', '<Event type: note_on, value:55>', '<Event type: note_on, value:56>', '<Event type: note_on, value:59>']
['<Event type: note_on, value:56>', '<Event type: note_on, value:55>', '<Event type: note_on, value:56>', '<Event type: note_on, value:59>']
------------------------------
['<Event type: note_on, value:50>', '<Event type: note_on, value:56>', '<Event type: note_on, value:57>']
['<Event type: note_on, value:50>', '<Event type: note_on, value:56>', '<Event type: note_on, value:57>']
------------------------------
['<Event type: note_on, value:50>', '<Event type: note_on, value:51>', '<Event type: note_on, value:48>']
['<Event type: note_on, value:50>', '<Event type: note_on, value:51>', '<Event type: note_on, value:48>']
------------------------------
['<Event type: note_on, value:49>', '<Event type: note_on, value:54>', '<Event type: note_on, value:58>', '<Event type: note_on, value:55>', '<Event type: note_on, value:58>', '<Event type: note_on, va

In [23]:
# print("original")
# transformer_eval.load_state_dict(torch.load(MODEL_SAVE_PATH))
# transformer_eval.to(DEVICE)
# transformer_eval.eval()

# mel_spectrogram = torchaudio.transforms.MelSpectrogram(
#     sample_rate=SAMPLE_RATE, n_fft=1024, hop_length=512, n_mels=1024
# )

# noteseq_test = NoteDataset(
#     ANNOTATIONS_FILE_test,
#     AUDIO_DIR,
#     mel_spectrogram,
#     SAMPLE_RATE,
#     time_length,
#     all_note,
#     max_length,
# )
# test_dataloader = DataLoader(noteseq_test, batch_size=BATCH_SIZE, collate_fn=collate_fn)

# preds_all = []
# labels_all = []
# sample_score_all = 0

# for src, _, tgt in test_dataloader:
#     src = src.to(DEVICE)
#     tgt = tgt.to(DEVICE)
#     tgt_input = tgt[:-1, :]
#     src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)
#     logits = transformer_eval(
#         src,
#         tgt_input,
#         src_mask,
#         tgt_mask,
#         src_padding_mask,
#         tgt_padding_mask,
#         src_padding_mask,
#     )
#     tgt_out = tgt[1:, :]
#     _, indices = torch.max(logits, dim=2)

#     for i in range(indices.size(1)):
#         words = [all_note[idx] for idx in indices[:, i].tolist()]
#         target = [all_note[idx] for idx in tgt_out[:, i].tolist()]
#         # sample_score = get_each_sample_score(words,target)
#         # sample_score_all += sample_score
#         words = cut_to_eos(words)
#         target = cut_to_eos(target)
        
#         preds_all.append(words)
#         labels_all.append(target)
#         print(words)
#         print(target)
#         # print(f"sample score : {sample_score}")
#         print("-" * 30)

# print("-- test set --")
# # print(f"avg sample score : {sample_score_all / len(preds_all)}")
# print(f"exact score: {get_exact_score(preds_all, labels_all)}")

# end

In [24]:
# function to generate output sequence using greedy algorithm
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    src = src.to(DEVICE)  # src.shape should be [src_seq_len, 1, 1024]
    src_mask = src_mask.to(
        DEVICE
    )  # src_mask.shape should be [src_seq_len, src_seq_len]
    src_padding_mask = (src[:, :, 0] == all_note.index("pad")).transpose(
        0, 1
    )  # src_padding_mask.shape should be [1, src_seq_len]
    print(f"src shape : {src.shape}")
    print(f"src_mask shape : {src_mask.shape}")
    print(f"src_padding_mask shape : {src_padding_mask.shape}")
    memory = model.encode(src, src_mask, src_padding_mask)
    ys = (
        torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
    )  # ys.shape starts as [1, 1]
    # ys = torch.tensor([[all_note.index("sos")]], device=src.device)
    # print(ys)
    for i in range(max_len - 1):
        print(f"Step {i}:")
        print(f"ys shape: {ys.shape}")  # ys shape: torch.Size([1, 1])

        memory = memory.to(DEVICE)  # memory.shape should be [src_seq_len, 1, 1024]
        print(
            f"memory shape: {memory.shape}"
        )  # memory shape: torch.Size([188, 1, 1024])

        tgt_mask = (generate_square_subsequent_mask(ys.size(0)).type(torch.bool)).to(
            DEVICE
        )  # tgt_mask.shape will be [current_seq_len, current_seq_len]
        print(f"tgt_mask shape : {tgt_mask.shape}")

        tgt_padding_mask = (
            torch.zeros(ys.size(0), ys.size(0)).type(torch.bool).to(DEVICE)
        )  # tgt_padding_mask.shape will be [1, current_seq_len]
        print(f"tgt_padding_mask shape : {tgt_padding_mask.shape}")

        out = model.decode(ys, memory, tgt_mask, tgt_padding_mask, src_padding_mask)
        out = out.transpose(0, 1)  # out.shape should be [current_seq_len, 1, 1024]
        print(f"out shape: {out.shape}")  # out shape: torch.Size([1, 1, 1024])

        prob = model.generator(out[:, -1])  # prob.shape should be [1, vocab_size]
        print(f"prob shape: {prob.shape}")  # prob shape: torch.Size([1, 15])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()

        # Debugging prints
        print(f"next_word: {next_word}")

        # ys = torch.cat([ys, torch.tensor([[next_word]], device=src.device)], dim=0)
        ys = torch.cat([ys, torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
        # print(f"ys shape: {ys.shape}") # ys shape: torch.Size([2, 1])
        if next_word == all_note.index("<EOS>"):
            break
    return ys


# actual function to translate input sentence into target language
def translate(model: torch.nn.Module):
    model.eval()
    # src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1)
    noteseq_test = NoteDataset(
        ANNOTATIONS_FILE_test,
        AUDIO_DIR,
        mel_spectrogram,
        SAMPLE_RATE,
        time_length,
        all_note,
        max_length,
    )
    test_dataloader = DataLoader(noteseq_test, batch_size=1, collate_fn=collate_fn)

    for src, _, tgt in test_dataloader:
        src = src.to(DEVICE)
        num_tokens = src.shape[0]
        # print(num_tokens)
        # src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
        src_mask = generate_square_subsequent_mask(src.size(0)).to(DEVICE)
        tgt_tokens = greedy_decode(
            model, src, src_mask, max_len=10, start_symbol=all_note.index("sos")
        ).flatten()  # max_len=num_tokens + 5
        print(tgt)
        print(tgt_tokens.shape)
        # print(" ".join([all_note[int(x)] for x in list(tgt_tokens.cpu().numpy())]).replace("sos", "").replace("eos", ""))
        print("-" * 30)

    return None

In [25]:
# print(translate(transformer))