In [2]:
import os
import re
import json
import random
import numpy as np
from dataclasses import dataclass
from pathlib import Path

import torch
from datasets import load_metric, Dataset, Audio
from transformers import ( Wav2Vec2CTCTokenizer, 
                           Wav2Vec2FeatureExtractor, 
                           Wav2Vec2Processor, 
                           Wav2Vec2ForCTC, 
                           TrainingArguments, 
                           Trainer ) 

import IPython.display as ipd

from typing import Generator, Any

In [3]:
current_dir = os.getcwd()

def check_exists_path(path):
    if not os.path.exists(path):
        os.makedirs(f"{current_dir}/{path}")

# Мультиязычная модель Speech Recognition
## Загрузка датасетов

In [4]:
non_alphanum_chars_regexp = re.compile(r"[^\w\s]", flags=re.IGNORECASE)

def remove_special_characters(text: str) -> str:
    return re.sub(non_alphanum_chars_regexp, '', text).lower()

In [5]:
def samples_generator(split_name: str) -> Generator[dict[str, Any], None, None]:
    base_path = Path(f"./data/{split_name}_audio/")
    file_path_patttern = "*/*.wav"
    for path in base_path.glob(file_path_patttern):
        file_path_template = f"{path.parent.name}/{path.stem}.{{ext}}"

        audio_path = f"./data/{split_name}_audio/{file_path_template.format(ext='wav')}"
        text_path = f"./data/{split_name}_transcription/{file_path_template.format(ext='txt')}"

        with open(text_path, "r") as f:
            transcription = f.read()
        transcription = remove_special_characters(transcription)

        yield { "audio": audio_path, "transcription": transcription }

def load_dataset(split_name: str) -> Dataset:
    return Dataset.from_generator(samples_generator, gen_kwargs={"split_name": split_name}).cast_column("audio", Audio(sampling_rate=16000))

In [6]:
train_dataset = load_dataset("train")
test_dataset = load_dataset("test")
validation_dataset = load_dataset("valid")

In [6]:
rand_int = random.randint(0, len(train_dataset) - 1)

print(f"{rand_int}: {train_dataset[rand_int]['transcription']}")
ipd.Audio(data=train_dataset[rand_int]["audio"]["array"], rate=16000)

925: hi ive lost my bank card and please stop all transactions i dont want anybody using my card for anything i dont know where it is can you do this for me thank you


## Предобработка данных

In [7]:
def chars_generator():
    for text in train_dataset["transcription"]:
       yield from text
    for text in test_dataset["transcription"]:
       yield from text
    for text in validation_dataset["transcription"]:
       yield from text

chars = set(chars_generator())
vocab = {v: k for k, v in enumerate(sorted(chars))}

vocab["[UNK]"] = len(vocab)
vocab["[PAD]"] = len(vocab)

vocab_path = "./models/multilingual"
check_exists_path(vocab_path)
with open(f'{vocab_path}/vocab.json', 'w') as vocab_file:
    json.dump(vocab, vocab_file)

In [None]:
tokenizer = Wav2Vec2CTCTokenizer.from_pretrained("./models/multilingual/", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token=" ")
feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=True)
processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)

In [9]:
def prepare_dataset(batch):
    audio = batch["audio"]

    # batched output is "un-batched"
    batch["input_values"] = processor(audio["array"], sampling_rate=audio["sampling_rate"]).input_values[0]
    batch["input_length"] = len(batch["input_values"])
    
    with processor.as_target_processor():
        batch["labels"] = processor(batch["transcription"]).input_ids
    return batch

train_dataset = train_dataset.map(prepare_dataset, remove_columns=train_dataset.column_names)
test_dataset = test_dataset.map(prepare_dataset, remove_columns=test_dataset.column_names)
validation_dataset = validation_dataset.map(prepare_dataset, remove_columns=validation_dataset.column_names)

## Дообучение модели

In [15]:
@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
    """

    processor: Wav2Vec2Processor
    padding: bool | str = True

    def __call__(self, features: list[dict[str, list[int] | torch.Tensor]]) -> dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lenghts and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch
    
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

In [None]:
wer_metric = load_metric("wer")

def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [None]:
model = Wav2Vec2ForCTC.from_pretrained(
    "facebook/wav2vec2-xls-r-300m", 
    attention_dropout=0.0,
    hidden_dropout=0.0,
    feat_proj_dropout=0.0,
    mask_time_prob=0.05,
    layerdrop=0.0,
    ctc_loss_reduction="mean", 
    pad_token_id=processor.tokenizer.pad_token_id,
    vocab_size=len(processor.tokenizer),
)

model.freeze_feature_extractor()

training_args = TrainingArguments(
  output_dir="./models/multilingual/",
  group_by_length=True,
  per_device_train_batch_size=4,
  gradient_accumulation_steps=4,
  eval_accumulation_steps = 8,
  evaluation_strategy="steps",
  num_train_epochs=30,
  gradient_checkpointing=True,
  fp16=True,
  save_steps=100,
  eval_steps=100,
  logging_steps=100,
  learning_rate=3e-4,
  warmup_steps=300,
  save_total_limit=4,
)

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset,
    eval_dataset=validation_dataset,
    tokenizer=processor.feature_extractor,
)

In [None]:
trainer.train()
model_path = "./models/en"
check_exists_path(model_path)
torch.save(model.state_dict(), f"{model_path}/multilingual_model_state.pth")