In [1]:
from copy import deepcopy
from pathlib import Path
from random import shuffle

from miditok import TokenizerConfig
from miditok.pytorch_data import DatasetMIDI, DataCollator
from miditok.utils import split_files_for_training
from miditok.data_augmentation import augment_dataset
from torch import Tensor, argmax
from torch.utils.data import DataLoader
from torch.cuda import is_available as cuda_available, is_bf16_supported
from torch.backends.mps import is_available as mps_available
from transformers import AutoModelForCausalLM, MistralConfig, Trainer, TrainingArguments, GenerationConfig
from transformers.trainer_utils import set_seed
from tqdm import tqdm
import torch
from symusic import Score
from hello_remi import ChordREMI

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
TOKENIZER_PARAMS = {
    "special_tokens": ["PAD", "BOS", "EOS"],
    "use_tempos": True,
    "use_programs": False,  # no multitrack here
    "num_tempos": 32,
    "tempo_range": (50, 200),  # (min_tempo, max_tempo)
    "log_tempo": True,
    "additional_params": {
        "use_chord_tokens": True  # 确保配置中包含和弦token
    }
}
config = TokenizerConfig(**TOKENIZER_PARAMS)

# Creates the tokenizer
tokenizer = ChordREMI(config)

In [None]:
print(len(tokenizer))
tokenizer.save("tokenizer_exp_02.json")

In [2]:
tokenizer = ChordREMI(params="tokenizer_exp_02.json")
print(len(tokenizer))

362


In [None]:
# Split MIDI paths in train/valid/test sets
total_num_files = len(midi_paths)
num_files_valid = round(total_num_files * 0.15)
num_files_test = round(total_num_files * 0.05)
shuffle(midi_paths)
midi_paths_valid = midi_paths[:num_files_valid]
midi_paths_test = midi_paths[num_files_valid:num_files_valid + num_files_test]
midi_paths_train = midi_paths[num_files_valid + num_files_test:]

# Chunk MIDIs and perform data augmentation on each subset independently
for files_paths, subset_name in (
    (midi_paths_train, "train"), (midi_paths_valid, "valid"), (midi_paths_test, "test")
):

    # Split the MIDIs into chunks of sizes approximately about 1024 tokens
    subset_chunks_dir = Path(f"exp_{subset_name}_02")
    split_files_for_training(
        files_paths=files_paths,
        tokenizer=tokenizer,
        save_dir=subset_chunks_dir,
        max_seq_len=1024,
        num_overlap_bars=2,
    )

    # Perform data augmentation
    augment_dataset(
        subset_chunks_dir,
        pitch_offsets=[-12, 12],
        velocity_offsets=[-4, 4],
        duration_offsets=[-0.5, 0.5],
    )


In [None]:
midi_paths_train = list(Path("exp_train_02").glob("**/*.mid")) + list(Path("exp_train_02").glob("**/*.midi"))
midi_paths_valid = list(Path("exp_valid_02").glob("**/*.mid")) + list(Path("exp_valid_02").glob("**/*.midi"))
midi_paths_test = list(Path("exp_test_02").glob("**/*.mid")) + list(Path("exp_test_02").glob("**/*.midi"))
kwargs_dataset = {"max_seq_len": 1024, "tokenizer": tokenizer, "bos_token_id": tokenizer["BOS_None"], "eos_token_id": tokenizer["EOS_None"]}
dataset_train = DatasetMIDI(midi_paths_train, **kwargs_dataset)
dataset_valid = DatasetMIDI(midi_paths_valid, **kwargs_dataset)
dataset_test = DatasetMIDI(midi_paths_test, **kwargs_dataset)

In [None]:
# Creates model
model_config = MistralConfig(
    vocab_size=len(tokenizer),
    hidden_size=512,
    intermediate_size=2048,
    num_hidden_layers=6,
    num_attention_heads=8,
    num_key_value_heads=4,
    sliding_window=256,
    max_position_embeddings=2048,
    pad_token_id=tokenizer['PAD_None'],
    bos_token_id=tokenizer['BOS_None'],
    eos_token_id=tokenizer['EOS_None'],
)
model = AutoModelForCausalLM.from_config(model_config)
print(len(tokenizer))

In [None]:
def compute_metrics(eval_pred):
    """
    使用纯PyTorch计算准确率，不依赖外部metrics库
    
    :param eval_pred: 包含predictions和labels的元
    :return: 包含accuracy的字典
    """
    predictions, labels = eval_pred
    predictions = torch.tensor(predictions)
    labels = torch.tensor(labels)
    
    # 创建掩码，排除padding标记(-100)
    not_pad_mask = labels != -100
    
    # 过滤掉padding标记
    filtered_labels = labels[not_pad_mask]
    filtered_predictions = predictions[not_pad_mask]
    
    # 计算准确率：正确预测的数量除以总数量
    correct_predictions = (filtered_predictions == filtered_labels).sum().item()
    total_predictions = filtered_labels.numel()
    
    accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0.0
    
    return {"accuracy": accuracy}

def preprocess_logits(logits: Tensor, _: Tensor) -> Tensor:
    """
    Preprocess the logits before accumulating them during evaluation.

    This allows to significantly reduce the memory usage and make the training tractable.
    """
    pred_ids = argmax(logits, dim=-1)  # long dtype
    return pred_ids

# Create config for the Trainer
USE_CUDA = cuda_available()
if not cuda_available():
    FP16 = FP16_EVAL = BF16 = BF16_EVAL = False
elif is_bf16_supported():
    BF16 = BF16_EVAL = True
    FP16 = FP16_EVAL = False
else:
    BF16 = BF16_EVAL = False
    FP16 = FP16_EVAL = True
USE_MPS = not USE_CUDA and mps_available()
training_config = TrainingArguments(
    "runs", False, True, True, False, "steps",
    per_device_train_batch_size=256,
    per_device_eval_batch_size=256,
    gradient_accumulation_steps=3,
    eval_accumulation_steps=None,
    eval_steps=200,
    learning_rate=1e-4,
    weight_decay=0.01,
    max_grad_norm=3.0,
    lr_scheduler_type="cosine_with_restarts",
    warmup_ratio=0.3,
    log_level="debug",
    logging_strategy="steps",
    logging_steps=20,
    save_strategy="steps",
    save_steps=200,
    save_total_limit=5,
    no_cuda=not USE_CUDA,
    seed=444,
    fp16=FP16,
    fp16_full_eval=FP16_EVAL,
    bf16=BF16,
    bf16_full_eval=BF16_EVAL,
    load_best_model_at_end=True,
    label_smoothing_factor=0.,
    optim="adamw_torch",
    report_to=["tensorboard"], 
    gradient_checkpointing=True,
    max_steps=20000,
)

collator = DataCollator(tokenizer["PAD_None"], copy_inputs_as_labels=True)
trainer = Trainer(
    model=model,
    args=training_config,
    data_collator=collator,
    train_dataset=dataset_train,
    eval_dataset=dataset_valid,
    compute_metrics=compute_metrics,
    callbacks=None,
    preprocess_logits_for_metrics=preprocess_logits,
)
checkpoint_path = "runs\checkpoint-6000" 
# Training
train_result = trainer.train(resume_from_checkpoint=checkpoint_path)
#train_result = trainer.train()
trainer.save_model()  # Saves the tokenizer too
trainer.log_metrics("train", train_result.metrics)
trainer.save_metrics("train", train_result.metrics)
trainer.save_state()