In [None]:
!pip install datasets transformers
!pip install miditok
!pip install symusic

In [1]:
from miditok import MMM, TokenizerConfig
from miditok.classes import Event, TokSequence
from miditok.pytorch_data import DatasetTok, DataCollator
from miditok.constants import MIDI_INSTRUMENTS, MMM_DENSITY_BINS_MAX, TIME_SIGNATURE
from miditok.midi_tokenizer import MIDITokenizer
from miditok.utils import compute_ticks_per_bar, compute_ticks_per_beat

import numpy as np
from symusic import Note, Score, Tempo, TimeSignature, Track

from pathlib import Path

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
jazz_midi_paths = list(Path("jazz-chunked").glob("**/*.mid"))[:30]
ym_midi_paths = list(Path("../ym-test/chunks").glob("**/*.mid"))

len(jazz_midi_paths), len(ym_midi_paths)

(30, 21)

In [3]:
# Split MIDI paths in train/valid/test sets

from random import shuffle

def split_midi_paths_train_valid(midi_paths, valid_ratio=0.1):
    total_num_files = len(midi_paths)
    num_files_valid = round(total_num_files * valid_ratio)
    shuffle(midi_paths)
    midi_paths_valid = midi_paths[:num_files_valid]
    midi_paths_train = midi_paths[num_files_valid:]
    return midi_paths_train, midi_paths_valid

jazz_midi_paths_train, jazz_midi_paths_valid = split_midi_paths_train_valid(jazz_midi_paths)
ym_midi_paths_train, ym_midi_paths_valid = split_midi_paths_train_valid(ym_midi_paths)

len(jazz_midi_paths_train), len(jazz_midi_paths_valid), len(ym_midi_paths_train), len(ym_midi_paths_valid)

(27, 3, 19, 2)

In [4]:
# Creating a multitrack tokenizer configuration, read the doc to explore other parameters

config = TokenizerConfig(
    num_velocities=16, 
    use_chords=True, 
    use_programs=True,
    use_pitch_intervals=True
    )

TEXTER_NAME = MMM # MMM 토크나이저 사용
# TOKENIZER_NAME = MuMIDI # MuMIDI 토크나이저 사용
texter = TEXTER_NAME(config)
texter.add_to_vocab('Genre_Jazz')
texter.add_to_vocab('Genre_Ym')

In [5]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("gpt2")

vocab.json: 100%|██████████| 1.04M/1.04M [00:00<00:00, 1.91MB/s]
merges.txt: 100%|██████████| 456k/456k [00:00<00:00, 956kB/s]
tokenizer.json: 100%|██████████| 1.36M/1.36M [00:00<00:00, 2.88MB/s]


In [8]:
tokenizer.encode("Hello, my dog is cute")

[15496, 11, 616, 3290, 318, 13779]

In [25]:
sample_midi = Score(jazz_midi_paths_train[0])
t = texter(sample_midi).tokens
t = ' '.join(t)
t = tokenizer.encode(t)
tokenizer.decode(t)

'Track_Start Program_-1 NoteDensity_4 Bar_Start PitchDrum_35 Velocity_79 Duration_0.2.8 PitchDrum_49 Velocity_87 Duration_0.2.8 PitchDrum_51 Velocity_95 Duration_0.2.8 TimeShift_0.6.8 PitchDrum_38 Velocity_87 Duration_0.2.8 TimeShift_0.2.8 PitchDrum_38 Velocity_79 Duration_0.2.8 PitchDrum_42 Velocity_103 Duration_0.2.8 PitchDrum_51 Velocity_119 Duration_0.2.8 TimeShift_0.6.8 PitchDrum_51 Velocity_63 Duration_0.2.8 TimeShift_0.2.8 PitchDrum_38 Velocity_79 Duration_0.2.8 PitchDrum_51 Velocity_103 Duration_0.2.8 TimeShift_0.6.8 PitchDrum_51 Velocity_87 Duration_0.2.8 TimeShift_0.2.8 PitchDrum_38 Velocity_79 Duration_0.4.8 PitchDrum_42 Velocity_103 Duration_0.2.8 PitchDrum_51 Velocity_111 Duration_0.2.8 TimeShift_0.6.8 PitchDrum_51 Velocity_55 Duration_0.2.8 Bar_End Bar_Start PitchDrum_35 Velocity_79 Duration_0.2.8 PitchDrum_38 Velocity_79 Duration_0.2.8 PitchDrum_51 Velocity_103 Duration_0.2.8 TimeShift_1.0.8 PitchDrum_38 Velocity_79 Duration_0.2.8 PitchDrum_42 Velocity_103 Duration_0.2.8

In [28]:
"""Dataset classes to be used with PyTorch when training a model."""
from __future__ import annotations

import json
from abc import ABC
from copy import deepcopy
from pathlib import Path
from typing import TYPE_CHECKING, Any

from symusic import Score
from torch import LongTensor, randint
from torch.utils.data import Dataset
from tqdm import tqdm

from miditok.constants import MIDI_FILES_EXTENSIONS

if TYPE_CHECKING:
    from collections.abc import Callable, Mapping, Sequence

    from miditok import MIDITokenizer


def split_seq_in_subsequences(
    seq: Sequence[any], min_seq_len: int, max_seq_len: int
) -> list[Sequence[Any]]:
    r"""
    Split a sequence of tokens into subsequences.

    The subsequences will have lengths comprised between ``min_seq_len`` and
    ``max_seq_len``: ``min_seq_len <= len(sub_seq) <= max_seq_len``.

    :param seq: sequence to split.
    :param min_seq_len: minimum sequence length.
    :param max_seq_len: maximum sequence length.
    :return: list of subsequences.
    """
    sub_seq = []
    i = 0
    while i < len(seq):
        if i >= len(seq) - min_seq_len:
            break  # last sample is too short
        sub_seq.append(LongTensor(seq[i : i + max_seq_len]))
        i += len(sub_seq[-1])  # could be replaced with max_seq_len

    return sub_seq


def split_dataset_to_subsequences(
    files_paths: Sequence[Path | str],
    out_dir: Path | str,
    min_seq_len: int,
    max_seq_len: int,
    one_token_stream: bool = True,
) -> None:
    """
    Split a dataset of tokens files into subsequences.

    This method is particularly useful if you plan to use a
    :class:`miditok.pytorch_data.DatasetJsonIO`, as it would split token sequences
    into subsequences with the desired lengths before loading them for training.

    :param files_paths: list of files of tokens to split.
    :param out_dir: output directory to save the subsequences.
    :param min_seq_len: minimum sequence length.
    :param max_seq_len: maximum sequence length.
    :param one_token_stream: give False if the token files contains multiple tracks,
        i.e. the first dimension of the value of the "ids" entry corresponds to several
        tracks. Otherwise, leave False. (default: True)
    """
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    for file_path in files_paths:
        with Path(file_path).open() as json_file:
            tokens = json.load(json_file)

        # Split sequence(s)
        if one_token_stream:
            subseqs = split_seq_in_subsequences(tokens["ids"], min_seq_len, max_seq_len)
        else:
            subseqs = []
            for track_seq in tokens["ids"]:
                subseqs += split_seq_in_subsequences(
                    track_seq, min_seq_len, max_seq_len
                )

        # Save subsequences
        for i, subseq in enumerate(subseqs):
            path = out_dir / f"{file_path.name}_{i}.json"
            with path.open("w") as outfile:
                new_tok = deepcopy(tokens)
                new_tok["ids"] = subseq
                json.dump(tokens, outfile)


class _DatasetABC(Dataset, ABC):
    r"""
    Abstract ``Dataset`` class.

    It holds samples (and optionally labels) and implements the basic magic methods.

    :param samples: sequence of input samples. It can directly be data, or a paths to
        files to be loaded.
    :param labels: sequence of labels associated with the samples. (default: ``None``)
    :param sample_key_name: name of the dictionary key containing the sample data when
        iterating the dataset. (default: ``"input_ids"``)
    :param labels_key_name: name of the dictionary key containing the labels data when
        iterating the dataset. (default: ``"labels"``)
    """

    def __init__(
        self,
        samples: Sequence[Any] | None = None,
        labels: Sequence[Any] | None = None,
        sample_key_name: str = "input_ids",
        labels_key_name: str = "labels",
    ) -> None:
        if samples is not None and labels is not None and len(samples) != len(labels):
            msg = "The number of samples must be the same as the number of labels"
            raise ValueError(msg)
        self.samples = samples if samples is not None else []
        self.labels = labels
        self.sample_key_name = sample_key_name
        self.labels_key_name = labels_key_name
        self.__iter_count = 0

    def reduce_num_samples(self, num_samples: int) -> None:
        r"""
        Reduce the size of the dataset, by keeping `num_samples` samples.

        :param num_samples: number of samples to keep. They will be randomly picked.
        """
        idx = randint(0, len(self), (num_samples,))
        self.samples = [self.samples[id_] for id_ in idx.tolist()]
        if self.labels is not None:
            self.labels = [self.labels[id_] for id_ in idx.tolist()]

    def __len__(self) -> int:
        return len(self.samples)

    def __getitem__(self, idx: int) -> Mapping[str, Any]:
        item = {self.sample_key_name: self.samples[idx]}
        if self.labels is not None:
            item[self.labels_key_name] = self.labels[idx]

        return item

    def __iter__(self) -> _DatasetABC:
        return self

    def __next__(self) -> Mapping[str, Any]:
        if self.__iter_count >= len(self):
            self.__iter_count = 0
            raise StopIteration

        self.__iter_count += 1
        return self[self.__iter_count - 1]

    def __repr__(self) -> str:
        return self.__str__()

    def __str__(self) -> str:
        return "No data loaded" if len(self) == 0 else f"{len(self.samples)} samples"


class GenreDatasetTok(_DatasetABC):
    r"""
    Basic ``Dataset`` loading and tokenizing MIDIs or JSON token files.

    The token ids will be stored in RAM. It outputs token sequences that can be used to
    train models.

    The tokens sequences being loaded will then be split into subsequences, of length
    comprise between ``min_seq_len`` and ``max_seq_len``.
    For example, with ``min_seq_len = 50`` and ``max_seq_len = 100``:
    * a sequence of 650 tokens will be split into 6 subsequences of 100 tokens plus one
    subsequence of 50 tokens;
    * a sequence of 620 tokens will be split into 6 subsequences of 100 tokens, the
    last 20 tokens will be discarded;
    * a sequence of 670 tokens will be split into 6 subsequences of 100 tokens plus one
    subsequence of 50 tokens, and the last 20 tokens will be discarded.

    This `Dataset` class is well suited if you have enough RAM to store all the data,
    as it does not require you to prior split the dataset into subsequences of the
    length you desire. Note that if you directly load MIDI files, the loading can take
    some time as they will need to be tokenized. You might want to tokenize them before
    once with the ``tokenizer.tokenize_midi_dataset()`` method.

    Additionally, you can use the `func_to_get_labels` argument to provide a method
    allowing to use labels (one label per file).

    :param files_paths: list of paths to files to load.
    :param min_seq_len: minimum sequence length (in num of tokens)
    :param max_seq_len: maximum sequence length (in num of tokens)
    :param tokenizer: tokenizer object, to use to load MIDIs instead of tokens.
        (default: ``None``)
    :param one_token_stream: give False if the token files contains multiple tracks,
        i.e. the first dimension of the value of the "ids" entry corresponds to
        several tracks. Otherwise, leave False. (default: ``True``)
    :param func_to_get_labels: a function to retrieve the label of a file. The method
        must take two positional arguments: the first is either a MidiFile or the
        tokens loaded from the json file, the second is the path to the file just
        loaded. The method must return an integer which correspond to the label id
        (and not the absolute value, e.g. if you are classifying 10 musicians, return
        the id from 0 to 9 included corresponding to the musician). (default: ``None``)
    :param sample_key_name: name of the dictionary key containing the sample data when
        iterating the dataset. (default: ``"input_ids"``)
    :param labels_key_name: name of the dictionary key containing the labels data when
        iterating the dataset. (default: ``"labels"``)
    """

    def __init__(
        self,
        genre_token_ids: str,
        files_paths: Sequence[Path],
        min_seq_len: int,
        max_seq_len: int,
        texter: MIDITokenizer | None,
        tokenizer: None,
        one_token_stream: bool = True,
        func_to_get_labels: Callable[[Score | Sequence, Path], int] | None = None,
        sample_key_name: str = "input_ids",
        labels_key_name: str = "labels",
    ) -> None:
        labels = None if func_to_get_labels is None else []
        samples = []
        if tokenizer is not None:
            one_token_stream = True

        for file_path in tqdm(
            files_paths,
            desc=f"Loading data: {files_paths[0].parent}",
            miniters=int(len(files_paths) / 20),
            maxinterval=480,
        ):
            label = None
            # Loading a MIDI file
            if file_path.suffix in MIDI_FILES_EXTENSIONS:
                midi = Score(file_path)
                if func_to_get_labels is not None:
                    label = func_to_get_labels(midi, file_path)
                text_midi = texter(midi)
                if one_token_stream:
                    text_midi = " ".join(text_midi.tokens)
                    tokens_ids = tokenizer.encode(text_midi)
                else:
                    text_midi = [" ".join(seq.tokens) for seq in text_midi]
                    text_midi = [tokenizer.encode(seq) for seq in text_midi]
            # Loading json tokens
            # else:
            #     with file_path.open() as json_file:
            #         tokens = json.load(json_file)
            #     if func_to_get_labels is not None:
            #         label = func_to_get_labels(tokens, file_path)
            #     tokens_ids = tokens["ids"]
                
            # Concat genre token
            if one_token_stream:
                tokens_ids = genre_token_ids + tokens_ids
            else:
                tokens_ids = [genre_token_ids + seq_ids for seq_ids in tokens_ids]

            # Cut tokens in samples of appropriate length
            if one_token_stream:
                tokens_ids = [tokens_ids]
            for seq in tokens_ids:
                subseqs = split_seq_in_subsequences(seq, min_seq_len, max_seq_len)
                samples += subseqs
                if label is not None:
                    labels += [label] * len(subseqs)

        if labels is not None:
            labels = LongTensor(labels)
        super().__init__(
            samples,
            labels,
            sample_key_name=sample_key_name,
            labels_key_name=labels_key_name,
        )

In [29]:
jazz_dataset_train = GenreDatasetTok(
    genre_token_ids=tokenizer.encode("Genre_Jazz"),
    files_paths=jazz_midi_paths_train,
    min_seq_len=50,
    max_seq_len=1022,
    texter=texter,
    tokenizer=tokenizer,
)

jazz_dataset_valid = GenreDatasetTok(
    genre_token_ids=tokenizer.encode("Genre_Jazz"),
    files_paths=jazz_midi_paths_valid,
    min_seq_len=50,
    max_seq_len=1022,
    texter=texter,
    tokenizer=tokenizer,
)

ym_dataset_train = GenreDatasetTok(
    genre_token_ids=tokenizer.encode("Genre_Ym"),
    files_paths=ym_midi_paths_train,
    min_seq_len=50,
    max_seq_len=1022,
    texter=texter,
    tokenizer=tokenizer,
)

ym_dataset_valid = GenreDatasetTok(
    genre_token_ids=tokenizer.encode("Genre_Ym"),
    files_paths=ym_midi_paths_valid,
    min_seq_len=50,
    max_seq_len=1022,
    texter=texter,
    tokenizer=tokenizer,
)

Loading data: jazz-chunked/195_LittlePixieG_cleaned: 100%|██████████| 27/27 [00:00<00:00, 84.29it/s]
Loading data: jazz-chunked/195_LittlePixieG_cleaned: 100%|██████████| 3/3 [00:00<00:00, 41.48it/s]
Loading data: ../ym-test/chunks: 100%|██████████| 19/19 [00:00<00:00, 108.90it/s]
Loading data: ../ym-test/chunks: 100%|██████████| 2/2 [00:00<00:00, 102.24it/s]


In [30]:
jazz_dataset_train[0]['input_ids'][0], ym_dataset_train[0]['input_ids'][0]

(tensor(13746), tensor(13746))

In [31]:
# Concat datasets
dataset_train = jazz_dataset_train + ym_dataset_train
dataset_valid = jazz_dataset_valid + ym_dataset_valid

In [32]:

# collator = DataCollator(
#     tokenizer["PAD_None"], tokenizer["BOS_None"], tokenizer["EOS_None"], copy_inputs_as_labels=True
# )

TypeError: 'GPT2TokenizerFast' object is not subscriptable

In [33]:
from torch.utils.data import DataLoader

data_loader_train = DataLoader(dataset=dataset_train)
data_loader_valid = DataLoader(dataset=dataset_valid)
train_tokenized_songs = []
valid_tokenized_songs = []
for batch in data_loader_train:
    train_tokenized_songs.append(batch)
for batch in data_loader_valid:
    valid_tokenized_songs.append(batch)

In [34]:
# make custom dataset
import torch
from torch.utils.data import Dataset, DataLoader

class MidiDataset(Dataset):
    def __init__(self, tokenized_songs, max_length=510):  # max_length를 512로 하면 앞, 뒤에 BOS, EOS 토큰이 또 붙어서 길이 514 되고 에러가 나서 일단 510로 함. 디버깅 필요!!
        self.tokenized_songs = tokenized_songs
        self.max_length = max_length
    
    def __len__(self):
        return len(self.tokenized_songs)
    
    def __getitem__(self, idx):
        # item = {key: val.clone().detach() for key, val in self.tokenized_songs[idx].items()}
        item = {'input_ids': self.tokenized_songs[idx]['input_ids'][:, :self.max_length].clone().detach().squeeze(),}
        return item

In [35]:
train_dataset = MidiDataset(train_tokenized_songs)
eval_dataset = MidiDataset(valid_tokenized_songs)

In [36]:
train_dataset[0]['input_ids'].shape, eval_dataset[0]['input_ids'].shape

(torch.Size([510]), torch.Size([510]))

In [None]:
# # Test our data_collator
# out = collator([train_dataset[i] for i in range(5)])

# for key in out:
#     print(f"{key} shape: {out[key].shape}")

# print(f"out {out}")

In [None]:
# Training

In [None]:
from transformers import Trainer, TrainingArguments

# first create a custom trainer to log prediction distribution
class CustomTrainer(Trainer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def evaluation_loop(
        self,
        dataloader,
        description,
        prediction_loss_only=None,
        ignore_keys=None,
        metric_key_prefix="eval",
    ):
        # call super class method to get the eval outputs
        eval_output = super().evaluation_loop(
            dataloader,
            description,
            prediction_loss_only,
            ignore_keys,
            metric_key_prefix,
        )

        return eval_output

In [None]:
from transformers import AutoConfig, GPT2LMHeadModel

context_length = 1024 # context length는 자유롭게 바꿔보며 실험해봐도 좋을 듯 합니다.

# Change this based on size of the data
n_layer=6
n_head=4
n_emb=1024

config = AutoConfig.from_pretrained(
    "gpt2",
    vocab_size=len(tokenizer),
    n_positions=context_length,
    n_layer=n_layer,
    n_head=n_head,
    pad_token_id=tokenizer["PAD_None"],
    bos_token_id=tokenizer["BOS_None"],
    eos_token_id=tokenizer["EOS_None"],
    n_embd=n_emb
)

model = GPT2LMHeadModel(config)
model

In [None]:
# Create the args for out trainer
from argparse import Namespace

# Get the output directory with timestamp.
output_path = "models"
steps = 100
# Commented parameters correspond to the small model
config = {"output_dir": output_path,
          "num_train_epochs": 30, # 학습 epoch 자유롭게 변경. 저는 30 epoch 걸어놓고 early stopping 했습니다.
          "per_device_train_batch_size": 32,
          "per_device_eval_batch_size": 16,
          "evaluation_strategy": "steps",
          "save_strategy": "steps",
          "eval_steps": steps,
          "logging_steps":steps,
          "logging_first_step": True,
          "save_total_limit": 5,
          "save_steps": steps,
          "lr_scheduler_type": "cosine",
          "learning_rate":5e-4,
          "warmup_ratio": 0.01,
          "weight_decay": 0.01,
          "seed": 1,
          "load_best_model_at_end": True,
          # "metric_for_best_model": "eval_loss" # best model 기준 바꾸고 싶을 경우 이 부분 변경 (default가 eval_loss임)
        #   "report_to": "wandb"
          }

args = Namespace(**config)

In [None]:
from transformers import set_seed
set_seed(args.seed)

In [None]:
from transformers import Trainer, TrainingArguments
from transformers import EarlyStoppingCallback

# mps device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = GPT2LMHeadModel(config)
model
model.to(device)

train_args = TrainingArguments(**config)

trainer = CustomTrainer(
    model=model,
    tokenizer=tokenizer,
    collate_fn=DataCollator(copy_inputs_as_labels=True),
    args=train_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    callbacks = [EarlyStoppingCallback(early_stopping_patience=5)] # Early Stopping patience 자유롭게 변경
)

In [None]:
# Train the model.
trainer.train()

In [None]:
# The model did not return a loss from the inputs, only the following keys: logits,past_key_values. For reference, the inputs it received are input_ids,attention_mask.
# 이런 에러가 나면서 학습이 안될 경우, Trainer 클래스를 상속받아서 loss 계산하는 함수를 오버라이드 해주면 됩니다.

# 아래는 loss 계산하는 함수를 오버라이드 하는 예시입니다.
# class CustomTrainer(Trainer):
#     def compute_loss(self, model, inputs, return_outputs=False):
#         labels = inputs.pop("labels")
#         outputs = model(**inputs)
#         logits = outputs.logits
#         loss_fct = torch.nn.CrossEntropyLoss()
#         loss = loss_fct(logits.view(-1, logits.size(-1)), labels.view(-1))
#         return (loss, outputs) if return_outputs else loss



In [37]:
model(train_dataset[0]['input_ids'].unsqueeze(0).to(device))

NameError: name 'model' is not defined