# AMD at Work: Fine-tuning and Testing Cutting-Edge Speech Models

## Importing Modules

In [None]:
from transformers import Wav2Vec2CTCTokenizer, Wav2Vec2FeatureExtractor, Wav2Vec2Processor, TrainingArguments, Trainer, Wav2Vec2ForCTC, AutoFeatureExtractor, AutoModelForAudioClassification
from huggingface_hub import login
from unidecode import unidecode
import json
import re
import torch

import evaluate

from datasets import load_dataset, load_metric, DatasetDict
import numpy as np
import pandas as pd
import IPython.display as ipd

import random
from IPython.display import Audio, display

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)

## Explore the google/fleurs dataset

In [None]:
# Load the dataset

dataset = load_dataset(
    "google/fleurs", 
    "es_419", 
    split={'train':'train', 'test':'test', 'validation':'validation'},
    trust_remote_code=True
)

'''
Google/fleurs dataset contains some inconsistent audio examples depending on the
language selected. For the Spanish language, one workaround is to filter for those 
invalid records by noticing that the maximum value of the waveform is around 1e-4. 
For more information see the corresponding discussion on Hugging Face:
https://huggingface.co/datasets/google/fleurs/discussions/16
'''
dataset = dataset.filter(lambda example: example['audio']['array'].max()>1e-4)
dataset

In [None]:
# Explore the first record on train split
dataset["train"][0]

In [None]:
# Dictionaries with label to id and viceversa
labels = dataset["train"].features["gender"].names[:2] # Extract gender of person's speech
label2id, id2label = dict(), dict()
for i, label in enumerate(labels):
    label2id[label] = str(i)
    id2label[str(i)] = label


# Explore some dataset examples
idx_list = []
num_examples = 5

for _ in range(num_examples):
    rand_idx = random.randint(0, len(dataset["train"])-1)
    example = dataset["train"][rand_idx] # select a random example
    audio = example["audio"] # extract waveform
    idx_list.append(rand_idx) 

    print(f'Item: {rand_idx} | Label: {id2label[str(example["gender"])]}={label2id[id2label[str(example["gender"])]]}')
    print(f'Shape: {audio["array"].shape}, sampling rate: {audio["sampling_rate"]}')
    display(Audio(audio["array"], rate=audio["sampling_rate"]))
    print()

In [None]:
# Display the corresponding Raw text transcription of each audio record
pd.DataFrame({'sentence':dataset['train'][idx_list]['raw_transcription']})

In [None]:
# Histogtram of duration of audio records in train split

sampling_rate = 16000

duration_in_seconds = pd.Series([len(k['audio']['array'])/sampling_rate for k in dataset['train']])

ax = duration_in_seconds.hist(rwidth = 0.8)
ax.set_xlabel('Duration in seconds')
ax.set_ylabel('Frequency')
ax.grid(False)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.set_title('Histogram of speech duration | Train split')


## Automatic Speech Recognition in Spanish

### Data Collator

In [None]:
class DataCollatorCTCWithPadding:

    def __init__(self, processor, padding = True):
        self.processor = processor
        self.padding = padding

    def __call__(self, features):

        # Split input and labels. They might need different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        # Pad input features
        batch = self.processor.pad(input_features, padding = self.padding, return_tensors = "pt")

        # Prepare labels for processing and use processor
        label_texts = [self.processor.decode(feature["input_ids"], skip_special_tokens = True) for feature in label_features]
        labels_batch = self.processor(text = label_texts, padding = self.padding, return_tensors = "pt")
        
        # Replace padding with -100 to ignore
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1),-100)
        batch["labels"] = labels

        return batch

### Finetuning class

In [None]:
class ASRFineTuner:

    def __init__(self, pretrained_model_tag, dataset_name, output_dir, num_train_epochs = 5, learning_rate=3e-4, batch_size = 16):
        
        self.pretrained_model_tag = pretrained_model_tag
        self.dataset_name = dataset_name
        self.output_dir = output_dir
        self.num_train_epochs = num_train_epochs
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        
        # Load and preprocess dataset
        self.dataset = load_dataset(self.dataset_name, "es_419", split={'train':'train', 'test':'test', 'validation':'validation'}, trust_remote_code=True)
        self.dataset = self.dataset.filter(lambda example: example['audio']['array'].max()>1e-4) #remove invalid examples
        
        self.tokenized_dataset =  self.dataset.map(self._remove_special_characters)
        self._create_vocabulary_json() # Create vocabulary tokens file
        
        self.vocab_dict = None # contains the vocabulary letters. For display only

        # Load tokenizer, feature extractor, processor
        self.tokenizer = Wav2Vec2CTCTokenizer("./vocab.json", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|",)
        self.feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=True)
        self.processor = Wav2Vec2Processor(feature_extractor = self.feature_extractor, tokenizer = self.tokenizer)

        # Tokenize dataset
        self.tokenized_dataset = self.tokenized_dataset.map(self._prepare_dataset, num_proc=4, remove_columns=self.dataset.column_names["train"]) 
        self.train_dataset = self.tokenized_dataset['train']
        self.test_dataset = self.tokenized_dataset['test']
        self.validation_dataset = self.tokenized_dataset['validation']        

        # Instantiate data collator
        self.data_collator = DataCollatorCTCWithPadding(processor=self.processor, padding=True)

        # Load the model
        self.model = Wav2Vec2ForCTC.from_pretrained(
            self.pretrained_model_tag, 
            attention_dropout=0.1,
            hidden_dropout=0.1,
            feat_proj_dropout=0.0,
            mask_time_prob=0.05,
            layerdrop=0.1,
            ctc_loss_reduction="mean", 
            pad_token_id = self.processor.tokenizer.pad_token_id,
            vocab_size = len(self.processor.tokenizer)
        ).to("cuda")
        
        # Wav2Vec2 layers are used to extract acoustic features from the raw speech signal. 
        # thus the feaure extraction part of the model has been sufficiently trained and does not need additional fine-tune
        self.model.freeze_feature_encoder() 

        # Gradient checkpointing reduces memory footprint during training  by storing
        # only a subset of intermediate activations and recomputing the rest during backward pass
        self.model.gradient_checkpointing_enable()
        
        
        # Training arguments
        self.training_args = TrainingArguments(
            output_dir = self.output_dir,
            group_by_length = True,
            per_device_train_batch_size = 4,
            per_device_eval_batch_size= 4,
            eval_strategy = "epoch",
            num_train_epochs=self.num_train_epochs,
            fp16=True, #enabled mixed precision
            save_steps = 100,
            eval_steps = 100,
            logging_steps = 10,
            learning_rate = self.learning_rate,
            warmup_steps = 50,
            save_total_limit = 2,
            push_to_hub = False
        )

    
        # Trainer
        self.trainer = Trainer(
            model = self.model,
            data_collator = self.data_collator,
            args = self.training_args,
            compute_metrics = self._compute_metrics,
            train_dataset = self.train_dataset,
            eval_dataset = self.validation_dataset,
            tokenizer = self.processor.feature_extractor,
        )

        # Inference results
        self.results = None
        

    # -- Helper methods --

    def _prepare_dataset(self, batch):
        audio = batch["audio"]
        
        # batched input_values and labels
        batch["input_values"] = self.processor(audio["array"], sampling_rate=16000).input_values[0]
        batch["labels"] = self.processor(text = batch['raw_transcription']).input_ids
        
        return batch

    def _remove_special_characters(self,batch):
        chars_to_ignore_regex =  "[.,?!;:'-=@$#<>\[\]_{}|&`~'*\/()+%0-9']"
        batch["raw_transcription"] = re.sub(chars_to_ignore_regex, "",unidecode(batch["raw_transcription"])).lower() + " "
        
        return batch

    def _extract_all_chars(self,batch):
      all_text = " ".join(batch["raw_transcription"])
      vocab = list(set(all_text))
        
      return {"vocab": [vocab], "all_text": [all_text]}

    def _create_vocabulary_json(self):
        # Aggreagates all the transcription text
        vocabs = self.tokenized_dataset.map(
            self._extract_all_chars, 
            batched=True, 
            batch_size=-1,
            keep_in_memory=True,
            remove_columns=self.dataset.column_names["train"]
        )

        # Create a vocabulary (letters) dictionary
        vocab_list = list(set(vocabs["train"]["vocab"][0]) | set(vocabs["test"]["vocab"][0]) | set(vocabs["validation"]["vocab"][0]))
        vocab_dict = {v: k for k, v in enumerate(vocab_list)}
        vocab_dict["|"] = vocab_dict[" "]
        del vocab_dict[" "]
        vocab_dict["[UNK]"] = len(vocab_dict)
        vocab_dict["[PAD]"] = len(vocab_dict)

        # Save the vocabulary as json for Wav2Vec2CTCTokenizer
        with open('vocab.json', 'w') as vocab_file:
            json.dump(vocab_dict, vocab_file)

        self.vocab_dict = vocab_dict

    def _compute_metrics(self, pred):
        pred_logits = pred.predictions
        pred_ids = np.argmax(pred_logits, axis=-1)
    
        pred.label_ids[pred.label_ids == -100] = self.processor.tokenizer.pad_token_id
    
        pred_str = self.processor.batch_decode(pred_ids) #predicted string
        label_str = self.processor.batch_decode(pred.label_ids, group_tokens=False) 

        wer_metric = evaluate.load("wer", trust_remote_code=True) #Word Error Rate metric
        wer = wer_metric.compute(predictions=pred_str, references=label_str)
        
        return {"wer": wer}

    def _map_to_result(self,batch):        
        with torch.no_grad():
            input_values = torch.tensor(batch["input_values"], device="cuda").unsqueeze(0)
            logits = self.model(input_values).logits
        
        pred_ids = torch.argmax(logits, dim=-1)
        batch["pred_str"] = self.processor.batch_decode(pred_ids)[0]
        batch["text"] = self.processor.decode(batch["labels"], group_tokens=False)        
        
        return batch


    # -- Class methods --
    def train(self):
        self.trainer.train()

    def predict_test_set(self):
        results = self.test_dataset.map(self._map_to_result, remove_columns = self.test_dataset.column_names)
        
        return results
    

In [None]:
spanish_ASR = ASRFineTuner(
    pretrained_model_tag = "facebook/wav2vec2-large-xlsr-53", 
    dataset_name = "google/fleurs",
    output_dir = './spanish_asr_out',
    num_train_epochs = 5
)

# Fine-tune the model
spanish_ASR.train()

In [None]:
# Perform inference 
results = spanish_ASR.predict_test_set()

In [None]:
results

In [None]:
import random
import pandas as pd
from IPython.display import display, HTML


def show_random_elements(dataset, num_examples=50):

    # Shows 50 examples    
    assert num_examples <= len(dataset), "Not enough elements in the dataset."
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)
    
    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))


show_random_elements(results)

***

## Audio Spectrogram Transformer for audio classification

In [None]:
from transformers import ASTFeatureExtractor
from datasets import load_dataset, Audio, DatasetDict, Dataset
from transformers import AutoModelForAudioClassification
import torchaudio
import torch
import numpy as np
import random
import IPython

### Prepare and explore the dataset

In [None]:
# Prepare the dataset by selecting a few examples

audio_dataset = load_dataset("agkphysics/AudioSet",
                             trust_remote_code=True,
                             split = "test",
                             streaming = True
                            )

audio_dataset_sample = [next(iter(audio_dataset)) for _ in range(50)] # select 50 examples
audio_dataset_sample = Dataset.from_list(random.sample(audio_dataset_sample,5)) # dataset with 5 random examples from the 50 before
audio_dataset_sample = DatasetDict({'test':audio_dataset_sample}) # transform to datasetdict object
audio_dataset_sample

In [None]:
# Explore the first example
audio_dataset_sample['test']['audio'][0]

In [None]:
# Resampling waveform to 16kHz
sampling_rate = 16000
audio_dataset_sample = audio_dataset_sample.cast_column('audio', Audio(sampling_rate = sampling_rate))

In [None]:
# Explore audio samples
num_examples = 5
for k in range(num_examples):
    example = audio_dataset_sample['test'][k]
    actual_label = example['human_labels']
    print(f'True labels: {actual_label}')
    display(IPython.display.Audio(data = np.asarray(example['audio']['array']),rate = sampling_rate, autoplay=False) )

### Inference: Audio classification on examples

In [None]:
# Aggregate waveforms in a single list
waveforms  = [np.asarray(k['audio']['array']) for k in audio_dataset_sample['test']] 

# Apply feature extractor on waveforms
feature_extractor = ASTFeatureExtractor()
inputs = feature_extractor(waveforms, sampling_rate=sampling_rate, padding="max_length", return_tensors="pt")
input_values = inputs.input_values

# Instantiate the model for inference
model = AutoModelForAudioClassification.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")

# Set to inference mode
with torch.no_grad():
  outputs = model(input_values)

# Predicted labels
predicted_class_ids = outputs.logits.argmax(-1)

for id in predicted_class_ids:
    print("Predicted class:", model.config.id2label[id.item()])
    

***

## Pyannote audio diarization on telephone calls in Spanish language

In [None]:
from transformers import ASTFeatureExtractor
from datasets import load_dataset, Audio, DatasetDict, Dataset
from transformers import AutoModelForAudioClassification
from pyannote.audio import Pipeline
import torch
import torchaudio
import numpy as np
import random

import IPython

### Prepare and explore the dataset

In [None]:
hf_token = "Your_Hugging_Face_Token"

audio_dataset = load_dataset("talkbank/callhome", 
                             "spa", 
                             trust_remote_code=True, 
                             split = "data", 
                             streaming = True, 
                             token= hf_token
                            )

data_iter = iter(audio_dataset)
audio_dataset_sample = [next(data_iter) for _ in range(30)]
audio_dataset_sample = Dataset.from_list(random.sample(audio_dataset_sample,3))
audio_dataset_sample = DatasetDict({'test':audio_dataset_sample})
audio_dataset_sample



In [None]:
# Explore the first example
audio_dataset_sample['test']['audio'][0]

In [None]:
# Examples of Telephone comversations by limiting to 15 seconds of audio

secs = 15
sampling_rate = 16000
num_examples = audio_dataset_sample['test'].num_rows

for k in range(num_examples):
    example = audio_dataset_sample['test'][k]
    
    print(f'Telephone conversations: {k+1} of {num_examples}')
    conversation_snippet = np.asarray(example['audio']['array'][-secs*sampling_rate:]) #select last 15 seconds of audio
    display(IPython.display.Audio(data = conversation_snippet,rate = sampling_rate, autoplay=False) )

### Inference: Audio diarization on first example

In [None]:
hf_token = "Your_Hugging_Face_Token"

# Load the model
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token = hf_token)
pipeline.to(torch.device("cuda"))

# Perform inference on the first Telephone conversation audio example
example = audio_dataset_sample['test'][0]
waveform_snippet = example['audio']['array'][-secs*sampling_rate:] #slice for the last 15 seconds
waveform_snippet = torch.tensor(waveform_snippet, device = 'cuda').unsqueeze(0)

# Apply pretrained pipeline
diarization = pipeline({"waveform":waveform_snippet, "sample_rate":sampling_rate})

# Print the result
for turn, _, speaker in diarization.itertracks(yield_label=True):
    print(f"start={turn.start:.1f}s stop={turn.end:.1f}s speaker_{speaker}")


***