In [None]:
!pip install transformers

In [None]:
!pip install datasets
!pip install pydub
!pip install soundfile
!pip install torchaudio
!pip install librosa
!pip install jiwer

In [None]:
import os

from datasets import load_dataset, load_metric

In [None]:
common_voice_train = load_dataset("common_voice", "de", split="train")
common_voice_validation = load_dataset("common_voice", "de", split="validation")
common_voice_test = load_dataset("common_voice", "de", split="test")

In [None]:
common_voice_validation = load_dataset("common_voice", "de", split="validation")
common_voice_test_half = load_dataset("common_voice", "de", split="test[:7794]")

In [None]:
common_voice_train = common_voice_train.remove_columns(["accent", "age", "client_id", "down_votes", "gender", "locale", "segment", "up_votes"])
common_voice_validation = common_voice_validation.remove_columns(["accent", "age", "client_id", "down_votes", "gender", "locale", "segment", "up_votes"])
common_voice_test = common_voice_test.remove_columns(["accent", "age", "client_id", "down_votes", "gender", "locale", "segment", "up_votes"])

In [None]:
common_voice_validation = common_voice_validation.remove_columns(["accent", "age", "client_id", "down_votes", "gender", "locale", "segment", "up_votes"])
common_voice_test_half = common_voice_test_half.remove_columns(["accent", "age", "client_id", "down_votes", "gender", "locale", "segment", "up_votes"])

In [None]:
print(common_voice_train.shape)
print(common_voice_validation.shape)
print(common_voice_test.shape)

In [None]:
import re
chars_to_keep = '[^A-Za-zäüö ]+'

def remove_special_characters(batch):
    batch["sentence"] = re.sub(chars_to_keep, '', batch["sentence"]).lower() + " "
    return batch

In [None]:
common_voice_train = common_voice_train.map(remove_special_characters)
common_voice_validation = common_voice_validation.map(remove_special_characters)
common_voice_test = common_voice_test.map(remove_special_characters)

In [None]:
common_voice_test_half = common_voice_test_half.map(remove_special_characters)
common_voice_validation = common_voice_validation.map(remove_special_characters)

In [None]:
def extract_all_chars(batch):
  all_text = " ".join(batch["sentence"])
  vocab = list(set(all_text))
  return {"vocab": [vocab], "all_text": [all_text]}

In [None]:
vocab_train = common_voice_train.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=common_voice_train.column_names)
vocab_validation = common_voice_validation.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=common_voice_validation.column_names)
vocab_test = common_voice_test.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=common_voice_test.column_names)

In [None]:
vocab_list = list(set(vocab_train["vocab"][0]) | set(vocab_test["vocab"][0])| set(vocab_validation["vocab"][0]))

In [None]:
vocab_dict = {v: k for k, v in enumerate(vocab_list)}
vocab_dict

In [None]:
vocab_dict["|"] = vocab_dict[" "]
del vocab_dict[" "]

In [None]:
vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)
len(vocab_dict)

In [None]:
import json
with open('vocab.json', 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)

# Build Processor

In [None]:
import torch
from transformers import Wav2Vec2CTCTokenizer, Wav2Vec2FeatureExtractor, Wav2Vec2Processor

tokenizer = Wav2Vec2CTCTokenizer("./vocab.json", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")
feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=True)
processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)

In [None]:
common_voice_validation[0]

In [None]:
common_voice_test[0]

In [None]:
import soundfile as sf

def speech_file_to_array_fn(batch):
    speech_array, sampling_rate = sf.load(batch["path"])
    batch["speech"] = speech_array[0].numpy()
    batch["sampling_rate"] = sampling_rate
    batch["target_text"] = batch["sentence"]
    return batch

In [None]:
def speech_file_to_array_subsample_fn(batch):
    
    #dirty Hack to get in running in different processes
    import torchaudio
    import librosa
    import numpy as np
    
    speech_array, sampling_rate = torchaudio.load(batch["path"])
    batch["speech"] = speech_array
    batch["speech"] = librosa.resample(np.asarray(batch["speech"]), sampling_rate, 16_000)
    batch["sampling_rate"] = 16_000
    batch["target_text"] = batch["sentence"]
    return batch

In [None]:
print(common_voice_validation.shape)

common_voice_validation = common_voice_validation.map(speech_file_to_array_subsample_fn, remove_columns=common_voice_validation.column_names, num_proc=8)

In [None]:
common_voice_test_half = common_voice_test_half.map(speech_file_to_array_subsample_fn, remove_columns=common_voice_test_half.column_names, num_proc=8)

In [None]:
import IPython.display as ipd
import numpy as np
import random

rand_int = random.randint(0, len(common_voice_test_half_prep)-1)

ipd.Audio(data=np.asarray(common_voice_test_half_prep[rand_int]["input_values"]), autoplay=True, rate=16000)

In [None]:
common_voice_validation.save_to_disk("D:\Master\Data\common_voice_validation")

In [None]:
common_voice_test_half.save_to_disk("D:\Master\Data\common_voice_test_half")

In [None]:
from datasets import load_from_disk

common_voice_validation = load_from_disk("D:\Master\Data\cv_val_sa")

In [None]:
from datasets import load_from_disk

common_voice_test_half = load_from_disk("D:\Master\Data\cv_test_half_sa")

In [None]:
import IPython.display as ipd
import numpy as np
import random

rand_int = random.randint(0, len(common_voice_validation)-1)

ipd.Audio(data=np.asarray(common_voice_validation[rand_int]["input_values"]), rate=16000)

In [None]:
import IPython.display as ipd
import numpy as np
import random
rand_int = random.randint(0, len(common_voice_test_half)-1)

print("Target text:", common_voice_test_half[rand_int]["target_text"])
print("Input array shape:", np.asarray(common_voice_test_half[rand_int]["speech"]).shape)
print("Sampling rate:", common_voice_test_half[rand_int]["sampling_rate"])

In [None]:
def prepare_dataset(batch):
    # check that all files have the correct sampling rate
    #assert (
    #    len(set(batch["sampling_rate"])) == 1
    #), f"Make sure all inputs have the same sampling rate of {processor.feature_extractor.sampling_rate}."

    batch["input_values"] = processor(batch["speech"], sampling_rate=batch["sampling_rate"]).input_values
    
    with processor.as_target_processor():
        batch["labels"] = processor(batch["target_text"]).input_ids
        
    return batch

In [None]:
common_voice_test_half = common_voice_test_half.map(prepare_dataset, remove_columns=common_voice_test_half.column_names)

In [None]:
common_voice_test_half.save_to_disk("D:\Master\Data\cv_test_half_pro")

In [None]:
common_voice_validation = common_voice_validation.map(prepare_dataset, remove_columns=common_voice_validation.column_names)

In [None]:
common_voice_validation.save_to_disk("D:\Master\Data\cv_val_pro")

# Load processed datasets

In [None]:
from datasets import load_from_disk

test_sampled_pro = load_from_disk("E:/Master/data/test_sampled_pro")
train_sampled_pro = load_from_disk("E:/Master/data/train_sampled_pro")

In [None]:
columns_to_return = common_voice_validation_prep.column_names

common_voice_validation_prep.set_format(type='torch', columns=columns_to_return)

columns_to_return = common_voice_test_half_prep.column_names

common_voice_test_half_prep.set_format(type='torch', columns=columns_to_return)

In [None]:
import transformers

# Get logging
transformers.logging.set_verbosity(10)

In [None]:
import torch
import numpy as np

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
        max_length_labels (:obj:`int`, `optional`):
            Maximum length of the ``labels`` returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lenghts and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        batch.to_device("cuda:0")
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                max_length=self.max_length_labels,
                pad_to_multiple_of=self.pad_to_multiple_of_labels,
                return_tensors="pt",
            )
            labels_batch.to_device("cuda:0")
        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

In [None]:
from datasets import load_metric
wer_metric = load_metric("wer")

In [None]:
def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [None]:
from transformers import Wav2Vec2ForCTC

model = Wav2Vec2ForCTC.from_pretrained(
    "facebook/wav2vec2-large-xlsr-53", 
    attention_dropout=0.1,
    hidden_dropout=0.1,
    feat_proj_dropout=0.0,
    mask_time_prob=0.05,
    layerdrop=0.1,
    gradient_checkpointing=True, 
    ctc_loss_reduction="mean", 
    pad_token_id=processor.tokenizer.pad_token_id,
    vocab_size=len(processor.tokenizer)
)

In [None]:
#model.to('cuda:0')
model.freeze_feature_extractor()

In [None]:
#import torch
torch.cuda.is_available()

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
  output_dir="./wav2vec2-large-xlsr-ger-chris",
  group_by_length=False,
  per_device_train_batch_size=32,
  gradient_accumulation_steps=2,
  evaluation_strategy="steps",
  fp16=True,
  num_train_epochs=30,
  save_steps=400,
  eval_steps=400,
  logging_steps=400,
  learning_rate=3e-4,
  warmup_steps=500,
  save_total_limit=10,
)

In [None]:
from transformers import Trainer

#model= model.to('cuda:0')
tokenizer=processor.feature_extractor

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_sampled_pro,
    eval_dataset=test_sampled_pro,
    tokenizer=tokenizer,
)

In [None]:
from datetime import datetime

print("STARTED")
# datetime object containing current date and time
now = datetime.now()
 
print("now =", now)

# dd/mm/YY H:M:S
dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
print("date and time =", dt_string)	

In [None]:
trainer.train()