In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
import shutil

# Loading the dataset

In [None]:
ENGLISH_AUDIO_DIR = '/content/drive/MyDrive/CSD/english/wav'
KOREAN_AUDIO_DIR = '/content/drive/MyDrive/CSD/korean/wav'
ENGLISH_LYRICS_DIR = '/content/drive/MyDrive/CSD/english/lyric'
KOREAN_LYRICS_DIR = '/content/drive/MyDrive/CSD/korean/lyric'
TRAIN_AUDIO_DIR = '/content/drive/MyDrive/CSD_Dataset/train/audio'
TRAIN_LYRICS_DIR = '/content/drive/MyDrive/CSD_Dataset/train/lyrics'
TEST_AUDIO_DIR = '/content/drive/MyDrive/CSD_Dataset/test/audio'
TEST_LYRICS_DIR = '/content/drive/MyDrive/CSD_Dataset/test/lyrics'

# Creating the Data File Structure

In [None]:
file_list = os.listdir(ENGLISH_AUDIO_DIR)
file_list.sort()
for song in file_list[:len(file_list)//2]:
  shutil.copy(os.path.join(ENGLISH_AUDIO_DIR, song), TRAIN_AUDIO_DIR)

In [None]:
for song in file_list[len(file_list)//2:]:
  shutil.copy(os.path.join(ENGLISH_AUDIO_DIR, song), TEST_AUDIO_DIR)

In [None]:
file_list = os.listdir(KOREAN_AUDIO_DIR)
file_list.sort()
for song in file_list[:len(file_list)//2]:
  shutil.copy(os.path.join(KOREAN_AUDIO_DIR, song), TRAIN_AUDIO_DIR)

In [None]:
for song in file_list[len(file_list)//2:]:
  shutil.copy(os.path.join(KOREAN_AUDIO_DIR, song), TEST_AUDIO_DIR)

In [None]:
file_list = os.listdir(ENGLISH_LYRICS_DIR)
file_list.sort()
for song in file_list[:len(file_list)//2]:
  shutil.copy(os.path.join(ENGLISH_LYRICS_DIR, song), TRAIN_LYRICS_DIR)

In [None]:
for song in file_list[len(file_list)//2:]:
  shutil.copy(os.path.join(ENGLISH_LYRICS_DIR, song), TEST_LYRICS_DIR)

In [None]:
file_list = os.listdir(KOREAN_LYRICS_DIR)
file_list.sort()
for song in file_list[:len(file_list)//2]:
  shutil.copy(os.path.join(KOREAN_LYRICS_DIR, song), TRAIN_LYRICS_DIR)

In [None]:
for song in file_list[len(file_list)//2:]:
  shutil.copy(os.path.join(KOREAN_LYRICS_DIR, song), TEST_LYRICS_DIR)

In [None]:
!pip install datasets



In [None]:
!pip install evaluate



In [None]:
!pip install jiwer



In [None]:
!pip install accelerate -U



# Preparing the dataset for Fine-Tuning

In [None]:
import transformers
import datasets

In [None]:
def create_output_labels(path):
  fd = open(path, 'r')
  lines = fd.readlines()
  string = str()
  for line in lines:
    if line[-1] == '\n':
      string = string + line[:-1] + ' '

    else:
      string += line

  return string

## Creating the Training and Testing datasets

In [None]:
train_lyrics_filenames = os.listdir(TRAIN_LYRICS_DIR)
train_lyrics_filenames.sort()
train_lyrics_paths = [os.path.join(TRAIN_LYRICS_DIR, fname) for fname in train_lyrics_filenames]
output_labels = []

for i in range(len(train_lyrics_paths)):
  output_labels.append(create_output_labels(train_lyrics_paths[i]))

In [None]:
train_audio_filenames = os.listdir(TRAIN_AUDIO_DIR)
train_audio_filenames.sort()
train_audio_paths = [os.path.join(TRAIN_AUDIO_DIR, fname) for fname in train_audio_filenames[1:]]

train_dataset = datasets.Dataset.from_dict({"audio": train_audio_paths, 'lyrics':output_labels}).cast_column("audio", datasets.Audio(sampling_rate=16000))

In [None]:
test_lyrics_filenames = os.listdir(TEST_LYRICS_DIR)
test_lyrics_filenames.sort()
test_lyrics_paths = [os.path.join(TEST_LYRICS_DIR, fname) for fname in test_lyrics_filenames]
output_labels = []

for i in range(len(test_lyrics_paths)):
  output_labels.append(create_output_labels(test_lyrics_paths[i]))

In [None]:
test_audio_filenames = os.listdir(TEST_AUDIO_DIR)
test_audio_filenames.sort()
test_audio_paths = [os.path.join(TEST_AUDIO_DIR, fname) for fname in test_audio_filenames]
test_dataset = datasets.Dataset.from_dict({"audio": test_audio_paths, 'lyrics':output_labels}).cast_column("audio", datasets.Audio(sampling_rate=16000))

In [None]:
train_dataset

Dataset({
    features: ['audio', 'lyrics'],
    num_rows: 100
})

In [None]:
test_dataset

Dataset({
    features: ['audio', 'lyrics'],
    num_rows: 100
})

# Creating the Whisper Feature Extractor and Processor

In [None]:
from transformers import WhisperFeatureExtractor

feature_extractor = WhisperFeatureExtractor.from_pretrained("openai/whisper-small")

In [None]:
from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained("openai/whisper-small")

In [None]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained("openai/whisper-small")

# Creating Batches for Model Training.

In [None]:
def prepare_dataset(batch):
    # load and resample audio data from 48 to 16kHz
    audio = batch["audio"]

    # compute log-Mel input features from input audio array
    batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

    # encode target text to label ids
    batch["labels"] = tokenizer(batch["lyrics"]).input_ids
    return batch

In [None]:
train_dataset = train_dataset.map(prepare_dataset, num_proc=4)

Map (num_proc=4):   0%|          | 0/100 [00:00<?, ? examples/s]

In [None]:
test_dataset = test_dataset.map(prepare_dataset, num_proc=4)

Map (num_proc=4):   0%|          | 0/100 [00:00<?, ? examples/s]

In [None]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

In [None]:
!pip install tensorflow -U



# Defining Evaluation Metrics

In [None]:
import evaluate

metric1 = evaluate.load("wer")
metric2 = evaluate.load('bleu')

Downloading builder script:   0%|          | 0.00/5.94k [00:00<?, ?B/s]

Downloading extra modules:   0%|          | 0.00/1.55k [00:00<?, ?B/s]

Downloading extra modules:   0%|          | 0.00/3.34k [00:00<?, ?B/s]

# Computation of Evaluation Metrics

In [None]:
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = 100 * metric1.compute(predictions=pred_str, references=label_str)
    bleu = metric2.compute(predictions=pred_str, references=label_str)

    return {"wer": wer, 'bleu': bleu['bleu']}

# Loading a Model Checkpoint to Fine-Tune

In [None]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small")

In [None]:
model.config.forced_decoder_ids = None
model.config.suppress_tokens = []

# Defining the Hyperparameteres for Model Training

In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper-small-hi",  # change to a repo name of your choice
    per_device_train_batch_size=8,
    gradient_accumulation_steps=1,  # increase by 2x for every 2x decrease in batch size
    learning_rate=1e-5,
    warmup_steps=100,
    max_steps=2000,
    gradient_checkpointing=True,
    fp16=False,
    evaluation_strategy="steps",
    per_device_eval_batch_size=2,
    predict_with_generate=True,
    generation_max_length=225,
    save_steps=100,
    eval_steps=100,
    logging_steps=100,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
    push_to_hub=False,
)

In [None]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

# Tranining the Model and Evaluating on Test Set after each checkpoint

In [None]:
trainer.train()