In [1]:
!pip install -q evaluate datasets transformers jiwer

In [2]:
import pandas as pd
import torch
import torchaudio
from datasets import Dataset, DatasetDict
from transformers import (
    Wav2Vec2Processor,
    Wav2Vec2ForCTC,
    Trainer,
    TrainingArguments,
    Wav2Vec2CTCTokenizer,
    Wav2Vec2FeatureExtractor,
)
import evaluate
import os
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import numpy as np 

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(DEVICE)

cuda


In [3]:
from huggingface_hub import login

login(new_session=False,
      write_permission=True, 
      token='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', 
      add_to_git_credential=True)

Token is valid (permission: write).
[1m[31mCannot authenticate through git-credential as no helper is defined on your machine.
You might have to re-authenticate when pushing to the Hugging Face Hub.
Run the following command in your terminal in case you want to set the 'store' credential helper as default.

git config --global credential.helper store

Read https://git-scm.com/book/en/v2/Git-Tools-Credential-Storage for more details.[0m
Token has not been saved to git credential helper.
Your token has been saved to /root/.cache/huggingface/token
Login successful


In [4]:
base_path = "/kaggle/input/medical-speech-transcription-and-intent/Medical Speech, Transcription, and Intent"
csv_file_path = os.path.join(base_path, "overview-of-recordings.csv")
recordings_path = os.path.join(base_path, "recordings")

df = pd.read_csv(csv_file_path)

def find_subdirectory_and_path(file_name):
    for subdirectory in ['test', 'train', 'validate']:
        file_path = os.path.join(recordings_path, subdirectory, file_name)
        if os.path.exists(file_path):
            return subdirectory, file_path
    return None, None 

df[['subdirectory', 'file_path']] = df['file_name'].apply(
    lambda file_name: pd.Series(find_subdirectory_and_path(file_name))
)
df = df.drop(['writer_id','speaker_id','file_download','file_name'], axis=1)

In [5]:
from datasets import Dataset, DatasetDict, Audio
import pandas as pd

dataset = Dataset.from_pandas(df)

train_dataset = dataset.filter(lambda x: x['subdirectory'] == 'train')
test_dataset = dataset.filter(lambda x: x['subdirectory'] == 'test')
validate_dataset = dataset.filter(lambda x: x['subdirectory'] == 'validate')

dataset_dict = DatasetDict({
    "train": train_dataset,
    "test": test_dataset,
    "validate": validate_dataset
})

for split in dataset_dict:
    dataset_dict[split] = dataset_dict[split].cast_column("file_path", Audio())
    dataset_dict[split] = dataset_dict[split].rename_column("file_path", "audio")
    dataset_dict[split] = dataset_dict[split].rename_column("phrase", "text")


data = dataset_dict.remove_columns(["subdirectory","prompt",'audio_clipping', 'audio_clipping:confidence',
                                    'background_noise_audible', 'background_noise_audible:confidence',
                                    'overall_quality_of_the_audio', 'quiet_speaker', 'quiet_speaker:confidence'])

print(dataset_dict)

Filter:   0%|          | 0/6661 [00:00<?, ? examples/s]

Filter:   0%|          | 0/6661 [00:00<?, ? examples/s]

Filter:   0%|          | 0/6661 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['audio_clipping', 'audio_clipping:confidence', 'background_noise_audible', 'background_noise_audible:confidence', 'overall_quality_of_the_audio', 'quiet_speaker', 'quiet_speaker:confidence', 'text', 'prompt', 'subdirectory', 'audio'],
        num_rows: 381
    })
    test: Dataset({
        features: ['audio_clipping', 'audio_clipping:confidence', 'background_noise_audible', 'background_noise_audible:confidence', 'overall_quality_of_the_audio', 'quiet_speaker', 'quiet_speaker:confidence', 'text', 'prompt', 'subdirectory', 'audio'],
        num_rows: 5895
    })
    validate: Dataset({
        features: ['audio_clipping', 'audio_clipping:confidence', 'background_noise_audible', 'background_noise_audible:confidence', 'overall_quality_of_the_audio', 'quiet_speaker', 'quiet_speaker:confidence', 'text', 'prompt', 'subdirectory', 'audio'],
        num_rows: 385
    })
})


In [6]:
import re
chars_to_ignore_regex = '[\,\?\.\!\-\;\:\"\ï\`\√\d\\n]'

def remove_special_characters(batch):
    batch["text"] = re.sub(chars_to_ignore_regex, '', batch["text"]).lower() + " "
    return batch
    
data = data.map(remove_special_characters)

def extract_all_chars(batch):
  all_text = " ".join(batch["text"])
  vocab = list(set(all_text))
  return {"vocab": [vocab], "all_text": [all_text]}

vocabs = data.map(extract_all_chars, batched=True, 
                  batch_size=-1, 
                  keep_in_memory=True, 
                  remove_columns=data.column_names["train"])

vocab_list = list(set(vocabs["train"]["vocab"][0]) | set(vocabs["test"]["vocab"][0]))
vocab_dict = {v: k for k, v in enumerate(sorted(vocab_list))}

vocab_dict["|"] = vocab_dict[" "]
del vocab_dict[" "]
vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)

import json

with open('vocab.json', 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)


Map:   0%|          | 0/381 [00:00<?, ? examples/s]

Map:   0%|          | 0/5895 [00:00<?, ? examples/s]

Map:   0%|          | 0/385 [00:00<?, ? examples/s]

Map:   0%|          | 0/381 [00:00<?, ? examples/s]

Map:   0%|          | 0/5895 [00:00<?, ? examples/s]

Map:   0%|          | 0/385 [00:00<?, ? examples/s]

In [13]:
from transformers import Wav2Vec2CTCTokenizer

tokenizer = Wav2Vec2CTCTokenizer( # added from_pretrained
    "./vocab.json",
    unk_token="[UNK]",
    pad_token="[PAD]",
    word_delimiter_token="|"
    # return_tensors="pt"
)

In [14]:
from transformers import SeamlessM4TFeatureExtractor

feature_extractor = SeamlessM4TFeatureExtractor(feature_size=80, num_mel_bins=80, sampling_rate=16000, padding_value=0.0)

In [15]:
from transformers import Wav2Vec2BertProcessor

processor = Wav2Vec2BertProcessor(feature_extractor=feature_extractor, tokenizer=tokenizer)

In [23]:
from transformers import Wav2Vec2BertForCTC, TrainingArguments, Trainer

model = Wav2Vec2BertForCTC.from_pretrained(
    "facebook/w2v-bert-2.0",
    # attention_dropout=0.0,
    # hidden_dropout=0.0,
    # feat_proj_dropout=0.0,
    # mask_time_prob=0.0,
    # layerdrop=0.0,
    # ctc_loss_reduction="mean",
    add_adapter=True,
    pad_token_id=processor.tokenizer.pad_token_id,
    vocab_size=len(processor.tokenizer),
).to(DEVICE)

Some weights of Wav2Vec2BertForCTC were not initialized from the model checkpoint at facebook/w2v-bert-2.0 and are newly initialized: ['adapter.layers.0.ffn.intermediate_dense.bias', 'adapter.layers.0.ffn.intermediate_dense.weight', 'adapter.layers.0.ffn.output_dense.bias', 'adapter.layers.0.ffn.output_dense.weight', 'adapter.layers.0.ffn_layer_norm.bias', 'adapter.layers.0.ffn_layer_norm.weight', 'adapter.layers.0.residual_conv.bias', 'adapter.layers.0.residual_conv.weight', 'adapter.layers.0.residual_layer_norm.bias', 'adapter.layers.0.residual_layer_norm.weight', 'adapter.layers.0.self_attn.linear_k.bias', 'adapter.layers.0.self_attn.linear_k.weight', 'adapter.layers.0.self_attn.linear_out.bias', 'adapter.layers.0.self_attn.linear_out.weight', 'adapter.layers.0.self_attn.linear_q.bias', 'adapter.layers.0.self_attn.linear_q.weight', 'adapter.layers.0.self_attn.linear_v.bias', 'adapter.layers.0.self_attn.linear_v.weight', 'adapter.layers.0.self_attn_conv.bias', 'adapter.layers.0.self_

In [18]:
def prepare_dataset(batch):
    audio = batch["audio"]
    batch["input_features"] = processor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]
    batch["input_length"] = len(batch["input_features"])

    batch["labels"] = processor(text=batch["text"]).input_ids
    return batch

In [19]:
data = data.cast_column("audio", Audio(sampling_rate=16_000))

In [20]:
data = data.map(prepare_dataset, remove_columns=data.column_names["validate"], num_proc=4)

  self.pid = os.fork()


Map (num_proc=4):   0%|          | 0/381 [00:00<?, ? examples/s]

Map (num_proc=4):   0%|          | 0/5895 [00:00<?, ? examples/s]

Map (num_proc=4):   0%|          | 0/385 [00:00<?, ? examples/s]

In [21]:
def map_to_result(batch):
    with torch.no_grad():
        # Extract features and pass them through the model
        input_features = torch.tensor(batch["input_features"], device="cuda").unsqueeze(0)
        logits = model(input_features).logits

        # Get predicted token IDs
        pred_ids = torch.argmax(logits, dim=-1)

        # Decode predictions
        batch["pred_str"] = processor.batch_decode(pred_ids, skip_special_tokens=True)[0].strip()  # Combine tokens
        batch["text"] = processor.decode(batch["labels"], group_tokens=False).strip()  # Clean up references

    return batch


In [24]:
results = data["validate"].map(map_to_result, remove_columns=data["validate"].column_names)

Map:   0%|          | 0/385 [00:00<?, ? examples/s]

In [25]:
predictions = [pred.strip() for pred in results["pred_str"]]  # Remove extra spaces and special tokens
references = [ref.strip() for ref in results["text"]]  # Remove extra spaces

In [26]:
import evaluate
import numpy as np

wer_metric = evaluate.load("wer")

def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = float('inf') # init wer with a default value

    try:
        wer = wer_metric.compute(predictions=pred_str, references=label_str)
    except Exception as e:
        print(f"Error computing WER. Predictions: {pred_str}, References: {label_str}, Error: {e}")

    return {"wer": wer}


Downloading builder script:   0%|          | 0.00/4.49k [00:00<?, ?B/s]

In [27]:
wer = wer_metric.compute(predictions=predictions, references=references)

In [28]:
print("w2v2-base_kabir Val WER: {:.3f}".format(wer))

w2v2-base_kabir Val WER: 1.000


In [29]:
from datasets import ClassLabel
import random
import pandas as pd
from IPython.display import display, HTML

def show_random_elements(dataset, num_examples=10):
    assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset."
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)
    
    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))

In [30]:
show_random_elements(results)

Unnamed: 0,pred_str,text
0,d gcdehdodwdcycucdbdcbdknpvncnhdvucudgdgdwbcdqbabpbaldbabgc,i feel discomfort throughout the body in general
1,cdcdcdcdabuvavbdljewbcdcdvbcecvndcdrcwdcvcbcndcbdgcdwimcdubnb,the warming system of my house is broken and feel so cold
2,cdkcdvdwcavbjdjdcdcbcacgzyqedadgzrzwb,i was travelling by ship and i feel dizzy
3,cdckcbdbdgwdgrjowadcudbdvibv gcdegnvacdbdw,i feel pain in my heart when i wake up
4,cdcdbcgudungdhidwdwdcngwygapb,i have blurred vision
5,cnhrdnbulgqnvibilwdweucbwednbnipdpdcixlxyecejucbgnvabgcabzwapvbnhvdrpuycg,i feel like i went to an acupuncture's practice and had needles in my shoulder
6,dcnbcdcdcdvbicdcdvuavugdwpcdcdbcdw,i feel weak all over
7,dgndcecdc cnjxcgdjpivdwcvdbdqypzcugubvjblbjvavagdg,i have eruptions on my face that come and go
8,cdcwcdwdcdunbdnbpgvcpdcudujdjbdbvlucdubdcdcdc,i feel aching on my insides
9,cbnwbnbucbyzpcbcnda cegwgzypdwgadymibicdcdvcbdpdwbdcdcwygwidbdbgnpvjucvykndcbyducawcndcncn,i have a foot ache in winter or when it feels cold why
