In [None]:
# Load Facebook's pre-trained multilingual model
model_checkpoint = "facebook/wav2vec2-large-xlsr-53"

# Set batch size for training
batch_size = 32

In [None]:
%%capture
# Install necessary libraries
!pip install datasets    # Library for using and sharing datasets
!pip install transformers==4.11.3   # Hugging Face's transformer models
!pip install librosa    # Library for audio and music processing
!pip install jiwer    # Calculate word error rate (WER)
!pip install ipywidgets   # Interactive HTML widgets for Jupyter notebooks
!pip install torch   # PyTorch library for deep learning



In [None]:
from huggingface_hub import notebook_login

notebook_login() # Logs into Hugging Face from notebook

In [None]:
%%capture
!apt install git-lfs # Installs Git Large File Storage extension

In [None]:
from datasets import load_dataset

# Load the Samromur, Malromur or Althingi ASR dataset from Hugging Face's dataset hub. Comment out the two datasets that won't be loaded each time.
dataset = load_dataset("language-and-voice-lab/samromur_asr")
#dataset = load_dataset("language-and-voice-lab/malromur_asr")
#dataset = load_dataset("language-and-voice-lab/althingi_asr")


In [None]:
# Display the dataset
dataset

In [None]:

# Function to rename the 'normalized_text' feature to 'text'
def rename_normalized_text(example):
    example['text'] = example['normalized_text']
    del example['normalized_text']
    return example

# Apply the transformation to the dataset
dataset = dataset.map(rename_normalized_text, remove_columns=['normalized_text'])

# Check the new dataset
print(dataset)

In [None]:
# Remove unnecessary columns from the dataset
dataset = dataset.remove_columns(["speaker_id", "gender", "age", "duration"])

In [None]:
# Function to display random examples from the dataset
def show_random_elements(dataset, num_examples=10):
    assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset."
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)
    
    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))

In [None]:
# Display 10 random elements from the training set after removing 'audio' and 'audio_id' columns
show_random_elements(dataset["train"].remove_columns(["audio", "audio_id"]), num_examples=10)

In [None]:
# Define a list of special characters to ignore in the transcriptions
chars_to_ignore_regex = '[\,\?\.\!\-\;\:\"\“\%\‘\”\�]'

# Function to remove these special characters from the transcriptions and convert them to lower case
def remove_special_characters(batch):
    batch["text"] = re.sub(chars_to_ignore_regex, '', batch["text"]).lower() + " "
    return batch

In [None]:
# Apply the function to remove special characters from the dataset
dataset = dataset.map(remove_special_characters)

In [None]:
# Display random elements from the cleaned dataset
show_random_elements(dataset["train"].remove_columns(["audio", "audio_id"]))

In [None]:
# Function to extract unique characters from the batch
def extract_all_chars(batch):
  all_text = " ".join(batch["text"])
  vocab = list(set(all_text))
  return {"vocab": [vocab], "all_text": [all_text]}

In [None]:
# Mapping the function 'extract_all_chars' to the dataset
vocabs = dataset.map(
  extract_all_chars,  # Function to apply
  batched=True,       # Apply function to batches of the dataset
  batch_size=-1,      # Batch size (-1 means to use the full dataset as one batch)
  keep_in_memory=True, # Keep all batches in memory while working
  remove_columns=dataset.column_names["train"] # Remove original columns while transforming
)

In [None]:
# Merging the vocabularies from the train and test sets and creating a unique vocabulary list
vocab_list = list(set(vocabs["train"]["vocab"][0]) | set(vocabs["test"]["vocab"][0]))

In [None]:
# Creating a dictionary where each unique character is associated with a unique integer
vocab_dict = {v: k for k, v in enumerate(vocab_list)}
# Display the dictionary
vocab_dict

In [None]:
# Adjusting the dictionary to use "|" for space instead of " "
vocab_dict["|"] = vocab_dict[" "]
del vocab_dict[" "]

In [None]:
# Adding "[UNK]" and "[PAD]" special tokens to the vocabulary dictionary
vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)
len(vocab_dict)

In [None]:
# Save the vocabulary dictionary to a JSON file
import json
with open('vocab.json', 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)

In [None]:
# Load the configuration from the model checkpoint
from transformers import AutoConfig

config = AutoConfig.from_pretrained(model_checkpoint)

# Set the tokenizer type based on the loaded configuration
tokenizer_type = config.model_type if config.tokenizer_class is None else None

# Keep the configuration only if a tokenizer class is defined in it, else set it to None
config = config if config.tokenizer_class is not None else None

In [None]:
# Load the tokenizer using the defined configuration and tokenizer type
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(
  "./",  # Load from the current directory
  config=config,  # Use the configuration obtained previously
  tokenizer_type=tokenizer_type,  # Use the tokenizer type determined earlier
  unk_token="[UNK]",  # Define the unknown token
  pad_token="[PAD]",  # Define the padding token
  word_delimiter_token="|",  # Define the word delimiter token
)

In [None]:
# Setting up the repository name for the model
model_checkpoint_name = model_checkpoint.split("/")[-1]  # Extracting the model name from the checkpoint
repo_name = f"{model_checkpoint_name}-name-of-dataset"  # Setting the repository name

In [None]:
# Retrieving the audio data of the first record in the training dataset
dataset["train"][0]["audio"]

In [None]:
# Selecting a random example from the training dataset and playing its audio
import IPython.display as ipd
import numpy as np
import random

# Generate a random index within the range of the dataset
rand_int = random.randint(0, len(dataset["train"]))

# Print the text corresponding to the randomly selected example
print(dataset["train"][rand_int]["text"])

# Play the audio of the randomly selected example
# The audio data is converted to a numpy array and the sampling rate is set to 16000 Hz
ipd.Audio(data=np.asarray(dataset["train"][rand_int]["audio"]["array"]), autoplay=True, rate=16000)

In [None]:
# Generating a random index, then printing the corresponding target text, input array shape, and sampling rate
import numpy as np
import random

# Generate a random index within the range of the dataset
rand_int = random.randint(0, len(dataset["train"]))

# Print the target text of the randomly selected example
print("Target text:", dataset["train"][rand_int]["text"])

# Print the shape of the audio array of the randomly selected example
# The audio data is converted to a numpy array for this
print("Input array shape:", np.asarray(dataset["train"][rand_int]["audio"]["array"]).shape)

# Print the sampling rate of the audio of the randomly selected example
print("Sampling rate:", dataset["train"][rand_int]["audio"]["sampling_rate"])

In [None]:
# Loading the feature extractor associated with the model checkpoint
from transformers import AutoFeatureExtractor

# Initialize the feature extractor using the model checkpoint
# The feature extractor will be used to convert the audio data into the format expected by the model
feature_extractor = AutoFeatureExtractor.from_pretrained(model_checkpoint)

In [None]:
# Create a processor combining the tokenizer and feature extractor
from transformers import Wav2Vec2Processor

# The processor handles end-to-end transformation from raw audio data to model-ready input features
processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)

In [None]:
# Function to process the datasets and prepare them for the model
def prepare_dataset(batch):
    # Extract the audio from the batch
    audio = batch["audio"]

    # Process the audio data to get the input_values (i.e., the speech features)
    batch["input_values"] = processor(audio["array"], sampling_rate=audio["sampling_rate"]).input_values[0]
    
    # Get the length of the input_values
    batch["input_length"] = len(batch["input_values"])
    
    # Process the target text data to get the corresponding labels (i.e., the token IDs)
    with processor.as_target_processor():
        batch["labels"] = processor(batch["text"]).input_ids
    return batch


In [None]:
# Process the dataset using the 'prepare_dataset' function in parallel
# Remove original columns to reduce memory and storage consumption and set number of processes to be used to 4.
dataset = dataset.map(prepare_dataset, remove_columns=dataset.column_names["train"], num_proc=4)

In [None]:
# Filter the training data to only include examples with an input length less than a specified maximum length (in seconds)
dataset["train"] = dataset["train"].filter(
    lambda x: x < max_input_length_in_sec * processor.feature_extractor.sampling_rate, 
    input_columns=["input_length"]
)


In [None]:
# Define a custom data collator for handling padding of input data
@dataclass
class DataCollatorCTCWithPadding:
    
    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # Split inputs and labels - they have different lengths and require different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        # Pad the input features
        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        
        # Pad the label features
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                max_length=self.max_length_labels,
                pad_to_multiple_of=self.pad_to_multiple_of_labels,
                return_tensors="pt",
            )

        # Replace padding with -100 to ignore these tokens in loss computation
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

In [None]:
# Create an instance of the DataCollatorCTCWithPadding class
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

In [None]:
# Load the Word Error Rate (WER) metric
from datasets import load_metric
wer_metric = load_metric("wer")

In [None]:
def compute_metrics(pred):
    """
    Computes the Word Error Rate (WER) of the model's predictions.
    """
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    # Replace -100 in the labels as they are a special value for padding.
    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    # Decode the predictions and labels to texts
    pred_str = processor.batch_decode(pred_ids)
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    # Compute WER using the decoded texts
    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [None]:
# Initializing the pre-trained model for CTC
model = AutoModelForCTC.from_pretrained(
    model_checkpoint,
    attention_dropout=0.1,
    hidden_dropout=0.1,  
    feat_proj_dropout=0.0, 
    mask_time_prob=0.05, 
    layerdrop=0.1,  
    ctc_loss_reduction="mean", 
    pad_token_id=processor.tokenizer.pad_token_id, 
    vocab_size=len(processor.tokenizer)
)

In [None]:
# Setting up the training configuration
training_args = TrainingArguments(
  output_dir=repo_name,
  group_by_length=True, 
  per_device_train_batch_size=batch_size, 
  gradient_accumulation_steps=2,  
  evaluation_strategy="steps",  
  num_train_epochs=6,  
  gradient_checkpointing=True,  
  fp16=False, 
  save_steps=400,  
  eval_steps=400,  
  logging_steps=400,  
  learning_rate=3e-4,  
  warmup_steps=500, 
  save_total_limit=2,  
  push_to_hub=False,  
)

In [None]:
# Initializing the trainer
from transformers import Trainer

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    tokenizer=processor.feature_extractor,
)

In [None]:
# Start the training process
trainer.train()

In [None]:
# Evaluate the model on the validation set
test_results = trainer.evaluate(dataset["validation"])

In [None]:
# Display the evaluation results for the validation set
test_results