In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
!pip install datasets -U
!pip install jiwer -U
!pip3 install torchaudio --index-url https://download.pytorch.org/whl/cu121
!pip install transformers -U
!pip install transformers[torch]
!pip install accelerate -U

In [None]:
from huggingface_hub import notebook_login
notebook_login()

In [None]:
from transformers import Wav2Vec2ForCTC
from transformers import Wav2Vec2Processor
model_id = 'hongseongpil/Wav2Vec2.0_zeroth_Ko'
model = Wav2Vec2ForCTC.from_pretrained(model_id,output_attentions=True)
processor = Wav2Vec2Processor.from_pretrained(model_id)
repo_name = "hongseongpil/wav2vec2-vocals"

In [None]:
from datasets import load_dataset , Dataset
dataset = load_dataset("text", data_files={"train": ["/content/drive/MyDrive/Colab Notebooks/High_Pitch_Dataset/train.txt"], "test": "/content/drive/MyDrive/Colab Notebooks/High_Pitch_Dataset/test.txt"})

In [None]:
!cp "/content/drive/MyDrive/Colab Notebooks/Tokenize_Kor.py" "/content"

In [None]:
import soundfile as sf
from Tokenize_Kor import decompose_tokens
def prepare_dataset(batch  , processor ,type):
    audiopth , text = batch['text'].split(maxsplit=1)
    labels = "".join(decompose_tokens(text))
    audio_input, sample_rate = sf.read(f"/content/drive/MyDrive/Colab Notebooks/High_Pitch_Dataset/" + audiopth[2:])
    batch['input_values'] = processor(audio_input, sampling_rate=16000, return_tensors="pt").input_values[0]
    with processor.as_target_processor():
        batch["labels"] = processor(labels).input_ids
    return batch

In [None]:
import os
dataset["train"] = dataset["train"].map(prepare_dataset,fn_kwargs={"processor": processor , 'type':'train'}, remove_columns=["text"],num_proc=os.cpu_count())
dataset["test"] = dataset["test"].map(prepare_dataset,fn_kwargs={"processor": processor, 'type':'test'}, remove_columns=["text"],num_proc=os.cpu_count())

In [None]:
import torch
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union


@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
        max_length_labels (:obj:`int`, `optional`):
            Maximum length of the ``labels`` returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lenghts and need
        # different padding methods

        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                max_length=self.max_length_labels,
                pad_to_multiple_of=self.pad_to_multiple_of_labels,
                return_tensors="pt",
            )
        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)
        batch["labels"] = labels
        return batch

In [None]:
from datasets import load_metric
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)
cer_metric = load_metric("cer",trust_remote_code=True)

In [None]:
import numpy as np
def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)
    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id
    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)
    cer = cer_metric.compute(predictions=pred_str, references=label_str)
    return {"cer": cer}

In [None]:
from transformers import TrainingArguments
from transformers import Trainer
from transformers import Wav2Vec2ForCTC

cpuCount = os.cpu_count()

training_args = TrainingArguments(
  output_dir=repo_name,
  group_by_length=True,
  per_device_train_batch_size=8,
  evaluation_strategy="steps",
  num_train_epochs=30,
  fp16=True,
  gradient_checkpointing=True,
  save_steps=500,
  eval_steps=500,
  logging_steps=500,
  learning_rate=1e-4,
  weight_decay=0.005,
  warmup_steps=1000,
  save_total_limit=2,
  push_to_hub=True,
)

model.freeze_feature_extractor()
trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset= dataset["train"],
    eval_dataset= dataset["test"],
    tokenizer=processor.feature_extractor
)
trainer.train()