In [1]:
from huggingface_hub import notebook_login

notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [2]:
import os
from datasets import load_dataset, Dataset
import jsonlines

In [3]:
path_to_audio_files = "../novice/asr.jsonl"

def read_jsonl(file_path):
    audio_list = []
    transcript_list = []

    # Open the JSONL file
    with jsonlines.open(file_path) as reader:
        # Iterate over each JSON object in the file
        for obj in reader:
            # Extract audio and transcript information
            audio = obj['audio']
            transcript = obj['transcript']

            # Append audio and transcript to the respective lists
            audio_list.append(audio)
            transcript_list.append(transcript)

    # Return audio and transcript lists
    return audio_list, transcript_list


# Read the JSONL file
audio_data, transcript_data = read_jsonl(path_to_audio_files)

In [4]:
audio_transcript_dict = dict(map(lambda i,j : (i,j) , audio_data,transcript_data))

In [5]:
import numpy as np
import re

In [6]:
audio_file_names = os.listdir("../novice/audio") # natural ordering of audio files
indexes = [int(re.findall(r'\d+', filename)[0]) for filename in audio_file_names]

In [7]:
transcript_correct_ordering = np.array(transcript_data)[indexes]

In [8]:
dataset = load_dataset("audiofolder", data_dir= "../novice/audio")

Resolving data files:   0%|          | 0/3500 [00:00<?, ?it/s]

In [9]:
dataset = dataset['train'].add_column(name = "transcript", column = transcript_correct_ordering)

In [10]:
dataset[:3]

{'audio': [{'path': '/home/jupyter/novice/audio/audio_0.wav',
   'array': array([0., 0., 0., ..., 0., 0., 0.]),
   'sampling_rate': 16000},
  {'path': '/home/jupyter/novice/audio/audio_1.wav',
   'array': array([ 0.00000000e+00,  0.00000000e+00,  0.00000000e+00, ...,
          -9.15527344e-05, -6.10351562e-05, -3.05175781e-05]),
   'sampling_rate': 16000},
  {'path': '/home/jupyter/novice/audio/audio_10.wav',
   'array': array([ 0.00000000e+00,  0.00000000e+00,  9.15527344e-05, ...,
           1.92260742e-03,  1.43432617e-03, -1.83105469e-04]),
   'sampling_rate': 16000}],
 'transcript': ['Heading is one five zero, target is green commercial aircraft, tool to deploy is electromagnetic pulse.',
  'Heading is two six zero, target is black, white, and yellow commercial aircraft, tool to deploy is surface-to-air missiles.',
  'Heading is one five five, target is silver, red, and green helicopter, tool to deploy is machine gun.']}

In [11]:
dataset = dataset.train_test_split(test_size=0.2, shuffle=True)

In [12]:
dataset

DatasetDict({
    train: Dataset({
        features: ['audio', 'transcript'],
        num_rows: 2800
    })
    test: Dataset({
        features: ['audio', 'transcript'],
        num_rows: 700
    })
})

In [13]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained(
    "openai/whisper-medium", language="english", task="transcribe"
)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [14]:
def prepare_dataset(example):
    audio = example["audio"]

    example = processor(
        audio=audio["array"],
        sampling_rate=16000,
        text=example["transcript"],
    )

    # compute input length of audio sample in seconds
    example["input_length"] = len(audio["array"]) / 16000

    return example


In [15]:
dataset["train"].features

{'audio': Audio(sampling_rate=None, mono=True, decode=True, id=None),
 'transcript': Value(dtype='string', id=None)}

In [16]:
from datasets import Audio

sampling_rate = processor.feature_extractor.sampling_rate

In [17]:
dataset = dataset.cast_column("audio", Audio(sampling_rate=sampling_rate))

In [18]:
dataset["train"].features

{'audio': Audio(sampling_rate=16000, mono=True, decode=True, id=None),
 'transcript': Value(dtype='string', id=None)}

In [19]:
dataset = dataset.map(
    prepare_dataset, remove_columns=dataset.column_names["train"], num_proc=1
)

Map:   0%|          | 0/2800 [00:00<?, ? examples/s]

Map:   0%|          | 0/700 [00:00<?, ? examples/s]

In [20]:
max_input_length = 30.0


def is_audio_in_length_range(length):
    return length < max_input_length

In [21]:
dataset["train"] = dataset["train"].filter(
    is_audio_in_length_range,
    input_columns=["input_length"],
)

Filter:   0%|          | 0/2800 [00:00<?, ? examples/s]

In [22]:
dataset["train"]

Dataset({
    features: ['input_features', 'labels', 'input_length'],
    num_rows: 2800
})

In [23]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union


@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(
        self, features: List[Dict[str, Union[List[int], torch.Tensor]]]
    ) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [
            {"input_features": feature["input_features"][0]} for feature in features
        ]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(
            labels_batch.attention_mask.ne(1), -100
        )

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [24]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

In [25]:
import evaluate

metric = evaluate.load("wer")

In [26]:
from transformers.models.whisper.english_normalizer import BasicTextNormalizer

normalizer = BasicTextNormalizer()


def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = processor.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = processor.batch_decode(label_ids, skip_special_tokens=True)

    # compute orthographic wer
    wer_ortho = 100 * metric.compute(predictions=pred_str, references=label_str)

    # compute normalised WER
    pred_str_norm = [normalizer(pred) for pred in pred_str]
    label_str_norm = [normalizer(label) for label in label_str]
    # filtering step to only evaluate the samples that correspond to non-zero references:
    pred_str_norm = [
        pred_str_norm[i] for i in range(len(pred_str_norm)) if len(label_str_norm[i]) > 0
    ]
    label_str_norm = [
        label_str_norm[i]
        for i in range(len(label_str_norm))
        if len(label_str_norm[i]) > 0
    ]

    wer = 100 * metric.compute(predictions=pred_str_norm, references=label_str_norm)

    return {"wer_ortho": wer_ortho, "wer": wer}

In [None]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-medium")



In [None]:
from functools import partial

# disable cache during training since it's incompatible with gradient checkpointing
model.config.use_cache = False

# set language and task for generation and re-enable cache
model.generate = partial(
    model.generate, language="english", task="transcribe", use_cache=True
)

In [None]:
!pip install transformers[torch]

In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper-medium-unholy-trinity",  # name on the HF Hub
    per_device_train_batch_size=16,
    gradient_accumulation_steps=1,  # increase by 2x for every 2x decrease in batch size
    learning_rate=1e-5,
    lr_scheduler_type="constant_with_warmup",
    warmup_steps=50,
    max_steps=500,  # increase to 4000 if you have your own GPU or a Colab paid plan
    gradient_checkpointing=True,
    fp16=True,
    fp16_full_eval=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=16,
    predict_with_generate=True,
    generation_max_length=225,
    save_steps=500,
    eval_steps=500,
    logging_steps=25,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
    push_to_hub=True,
)

In [None]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor,
)

In [None]:
trainer.train()