In [None]:
!git lfs install
!git clone https://github.com/babylonhealth/primock57.git

Please remove readme files in the directories "audio" and "transcripts" before running the below step .

In [None]:
import os
import shutil

def remove_checkpoints_and_readme(directory):
    checkpoints_path = os.path.join(directory, ".ipynb_checkpoints")
    md_path = os.path.join(directory, ".md")
    readme_path = os.path.join(directory, "README.md")

    # Remove .ipynb_checkpoints folder
    if os.path.exists(checkpoints_path):
        shutil.rmtree(checkpoints_path)

    # Remove .md folder
    if os.path.exists(md_path):
        shutil.rmtree(md_path)

    # Remove README.md file
    if os.path.exists(readme_path):
        os.remove(readme_path)

# Remove unwanted files and folders from both directories
remove_checkpoints_and_readme("/content/primock57/audio/")
remove_checkpoints_and_readme("/content/primock57/transcripts/")


In [None]:
import os
import json
import re
!pip install textgrid
!pip install pydub
from textgrid import TextGrid
from pydub import AudioSegment

!pip install textgrid
!pip install pydub

def parse_textgrid(textgrid_path):
    tg = TextGrid.fromFile(textgrid_path)
    data = []

    for tier in tg:
        for interval in tier:
            cleaned_text = re.sub(r"<.*?>", "", interval.mark.strip())
            data.append({
                "xmin": interval.minTime,
                "xmax": interval.maxTime,
                "text": cleaned_text
            })

    return data

def split_audio(audio_path, textgrid_data, output_dir):
    """
    Splits audio into segments based on intervals from the TextGrid file.
    """
    audio = AudioSegment.from_file(audio_path)
    os.makedirs(output_dir, exist_ok=True)

    processed_entries = []
    for i, entry in enumerate(textgrid_data):
        # Skip entries with empty or whitespace-only text
        if not entry["text"].strip():
            continue

        start_time = entry["xmin"] * 1000  # Convert to milliseconds
        end_time = entry["xmax"] * 1000  # Convert to milliseconds
        segment = audio[start_time:end_time]

        segment_path = os.path.join(output_dir, f"{os.path.basename(audio_path).split('.')[0]}_segment_{i}.wav")
        segment.export(segment_path, format="wav")
        processed_entries.append({
            "audio_filepath": segment_path,
            "text": entry["text"]
        })

    return processed_entries

def process_primock57_dataset(audio_dir, transcripts_dir, output_audio_dir, output_json_path):
    """
    Processes the entire Primock57 dataset:
    - Parses TextGrid files.
    - Splits audio files into segments.
    - Outputs a JSON dataset compatible with Wav2Vec2.
    """
    all_processed_data = []

    # Ensure only valid audio and transcript files are considered
    audio_files = sorted([f for f in os.listdir(audio_dir) if f.endswith('.wav')])
    textgrid_files = sorted([f for f in os.listdir(transcripts_dir) if f.endswith('.TextGrid')])

    print(len(audio_files))        # 114
    print(len(textgrid_files))     # 114 should be 114 = 57*2

    # Ensure that the number of audio files and textgrid files match
    if len(audio_files) != len(textgrid_files):
        print("Warning: Mismatch in the number of audio and transcript files.")

    # Process each file pair
    for audio_file, textgrid_file in zip(audio_files, textgrid_files):
        audio_path = os.path.join(audio_dir, audio_file)
        textgrid_path = os.path.join(transcripts_dir, textgrid_file)

        # Check if file names match (optional)
        if os.path.splitext(audio_file)[0] != os.path.splitext(textgrid_file)[0]:
            print(f"Skipping unmatched pair: {audio_file} and {textgrid_file}")
            continue

        # Parse the TextGrid file
        textgrid_data = parse_textgrid(textgrid_path)

        # Split the audio into segments, skipping empty ones
        processed_data = split_audio(audio_path, textgrid_data, output_audio_dir)
        all_processed_data.extend(processed_data)

    # Save the processed data as a JSON file
    with open(output_json_path, "w") as f:
        json.dump(all_processed_data, f, indent=4)

    return all_processed_data

# Directories
audio_dir = "/content/primock57/audio/"           # Path to the audio directory
transcripts_dir = "/content/primock57/transcripts/"  # Path to the transcripts directory
output_audio_dir = "/content/processed_audio/"    # Directory to save processed audio segments
output_json_path = "/content/primock57_dataset.json"  # Path to save the final JSON dataset

# Get the audio files before processing the dataset
audio_files = sorted([f for f in os.listdir(audio_dir) if f.endswith('.wav')])  # Define audio_files here

# Process the dataset
all_processed_data = process_primock57_dataset(audio_dir, transcripts_dir, output_audio_dir, output_json_path)
print(f"Processed {len(all_processed_data)} segments from {len(audio_files)} audio files.")


Collecting textgrid
  Downloading TextGrid-1.6.1.tar.gz (9.4 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: textgrid
  Building wheel for textgrid (setup.py) ... [?25l[?25hdone
  Created wheel for textgrid: filename=TextGrid-1.6.1-py3-none-any.whl size=10147 sha256=cafc1ad71166333c5ecc62c3992a67b4ee3eb609a015bcb135ba13cc47bb8fef
  Stored in directory: /root/.cache/pip/wheels/23/41/f2/e2ef1817bd163de3c21dd078966bdd71bd5c4455841f4ec016
Successfully built textgrid
Installing collected packages: textgrid
Successfully installed textgrid-1.6.1
Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1
114
114
Processed 6727 segments from 114 audio files.


Installing necessary libraries

In [None]:
%%capture
!pip install datasets==1.4.1
!pip install transformers
!pip install torchaudio
!pip install librosa
!pip install jiwer
!pip install --upgrade transformers huggingface_hub


In [None]:
# !pip uninstall transformers huggingface_hub -y
# !pip install transformers huggingface_hub


import json

# Load the dataset
with open("primock57_dataset.json", "r") as f:
    data = json.load(f)

# Extract unique characters from the text field
unique_chars = set()
for item in data:
    text = item["text"]
    unique_chars.update(text)

# Create a vocabulary dictionary
vocab = {char: idx for idx, char in enumerate(sorted(unique_chars))}
vocab["[UNK]"] = len(vocab)  # Add unknown token
vocab["[PAD]"] = len(vocab)  # Add padding token
vocab["|"] = len(vocab)      # Add word delimiter token

print(len(vocab))

# Save to a JSON file
with open("vocab.json", "w") as f:
    json.dump(vocab, f, indent=4)

print("Vocabulary file 'vocab.json' created.")


63
Vocabulary file 'vocab.json' created.


In [None]:
from transformers import Wav2Vec2CTCTokenizer

tokenizer = Wav2Vec2CTCTokenizer(
    "./vocab.json",
    unk_token="[UNK]",
    pad_token="[PAD]",
    word_delimiter_token="|"
)
print("Tokenizer initialized successfully.")


Tokenizer initialized successfully.


In [None]:
test_text = "Hello, how are you?"
tokens = tokenizer(test_text)
print(tokens)

decoded_text = tokenizer.decode(tokens["input_ids"])
print(decoded_text)


{'input_ids': [15, 38, 45, 45, 48, 3, 62, 41, 48, 56, 62, 34, 51, 38, 62, 58, 48, 54, 7], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}
Helo, how are you?


In [None]:
from transformers import Wav2Vec2FeatureExtractor

feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=True)

In [None]:
from transformers import Wav2Vec2Processor

processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)

In [None]:
# JAI SHREE RAM _/\_

from google.colab import drive
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


In [None]:
 processor.save_pretrained("/content/gdrive/MyDrive/wav2vec2-large-xlsr-primock-Himanshu")

[]

In [None]:
import json

def load_json_dataset(json_path):
    with open(json_path, 'r') as f:
        data = json.load(f)
    return [{"path": item["audio_filepath"], "sentence": item["text"]} for item in data if item["text"].strip()]

train_data = load_json_dataset("/content/primock57_dataset.json")
test_data = load_json_dataset("/content/primock57_dataset.json")
dev_data = load_json_dataset("/content/primock57_dataset.json")


In [None]:
from datasets import Dataset

# Convert list of dictionaries into a dictionary of lists
def list_to_dict(data):
    return {key: [item[key] for item in data] for key in data[0]}

# Convert train_data and test_data
train_data_dict = list_to_dict(train_data)
test_data_dict = list_to_dict(test_data)
dev_data_dict = list_to_dict(dev_data)

# Create Dataset objects
train_dataset = Dataset.from_dict(train_data_dict)
test_dataset = Dataset.from_dict(test_data_dict)
dev_dataset = Dataset.from_dict(dev_data_dict)


In [None]:
import torchaudio

def speech_file_to_array_fn(batch):
    speech_array, sampling_rate = torchaudio.load(batch["path"])
    batch["speech"] = speech_array[0].numpy()
    batch["sampling_rate"] = sampling_rate
    batch["target_text"] = batch["sentence"]
    return batch

In [None]:
# Please JSR

# Select a smaller part of the train dataset (e.g., first 100 samples)
train_dataset_subset = train_dataset.select(range(4000))

# Map the processing function on the subset of the train dataset
train_dataset_subset = train_dataset_subset.map(
    speech_file_to_array_fn,
    remove_columns=train_dataset_subset.column_names
)

# Select a smaller part of the train dataset (e.g., first 100 samples)
test_dataset_subset = test_dataset.select(range(4000,6000))

# Map the processing function on the subset of the train dataset
test_dataset_subset = test_dataset_subset.map(
    speech_file_to_array_fn,
    remove_columns=test_dataset_subset.column_names
)

# Select a smaller part of the train dataset (e.g., first 100 samples)
dev_dataset_subset = dev_dataset.select(range(6000))

# Map the processing function on the subset of the train dataset
dev_dataset_subset = dev_dataset_subset.map(
    speech_file_to_array_fn,
    remove_columns=dev_dataset_subset.column_names
)

# Print the number of samples in the subset to verify
print(f"Subset size: {len(train_dataset_subset)}")
print(f"Subset size: {len(test_dataset_subset)}")
print(f"Subset size: {len(dev_dataset_subset)}")


# train_dataset = train_dataset.map(speech_file_to_array_fn, remove_columns=train_dataset.column_names)
# test_dataset = test_dataset.map(speech_file_to_array_fn, remove_columns=test_dataset.column_names)

In [None]:
import librosa
import numpy as np

def resample(batch):
    # Ensure "speech" is a NumPy array
    speech = np.asarray(batch["speech"])

    # Resample if necessary
    if batch.get("sampling_rate", 48000) != 16000:
        batch["speech"] = librosa.resample(speech, orig_sr=batch["sampling_rate"], target_sr=16000)
        batch["sampling_rate"] = 16000
    else:
        batch["speech"] = speech  # Keep the original speech if already at 16 kHz

    return batch


In [None]:
train_dataset_subset = train_dataset_subset.map(resample, num_proc=4)
test_dataset_subset = test_dataset_subset.map(resample, num_proc=4)











In [None]:
import IPython.display as ipd
import numpy as np
import random

rand_int = random.randint(0, len(train_dataset_subset))

ipd.Audio(data=np.asarray(train_dataset_subset[rand_int]["speech"]), autoplay=True, rate=16000)

In [None]:
rand_int = random.randint(0, len(train_dataset_subset))

print("Target text:", train_dataset_subset[rand_int]["target_text"])
print("Input array shape:", np.asarray(train_dataset_subset[rand_int]["speech"]).shape)
print("Sampling rate:", train_dataset_subset[rand_int]["sampling_rate"])

Target text: How's that sound?
Input array shape: (16000,)
Sampling rate: 16000


In [None]:
def prepare_dataset(batch):
    # check that all files have the correct sampling rate
    assert (
        len(set(batch["sampling_rate"])) == 1
    ), f"Make sure all inputs have the same sampling rate of {processor.feature_extractor.sampling_rate}."

    batch["input_values"] = processor(batch["speech"], sampling_rate=batch["sampling_rate"][0]).input_values

    with processor.as_target_processor():
        batch["labels"] = processor(batch["target_text"]).input_ids
    return batch

In [None]:
train_dataset_subset = train_dataset_subset.map(prepare_dataset, remove_columns=train_dataset_subset.column_names, batch_size=8, num_proc=4, batched=True)
test_dataset_subset = test_dataset_subset.map(prepare_dataset, remove_columns=test_dataset_subset.column_names, batch_size=8, num_proc=4, batched=True)













In [None]:
import torch

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

@dataclass
class DataCollatorCTCWithPadding:
    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lenghts and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                max_length=self.max_length_labels,
                pad_to_multiple_of=self.pad_to_multiple_of_labels,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

In [None]:
from datasets import load_metric

wer_metric = load_metric("wer")

HBox(children=(FloatProgress(value=0.0, description='Downloading', max=1764.0, style=ProgressStyle(description…




In [None]:
def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    # we do not want to group tokens when computing the metrics
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [None]:
from transformers import Wav2Vec2ForCTC

model = Wav2Vec2ForCTC.from_pretrained(
    "facebook/wav2vec2-large-xlsr-53",
    attention_dropout=0.1,
    hidden_dropout=0.1,
    feat_proj_dropout=0.0,
    mask_time_prob=0.05,
    layerdrop=0.1,
    gradient_checkpointing=True,
    ctc_loss_reduction="mean",
    pad_token_id=processor.tokenizer.pad_token_id,
    vocab_size=len(processor.tokenizer)
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


HBox(children=(FloatProgress(value=0.0, description='config.json', max=1768.0, style=ProgressStyle(description…




HBox(children=(FloatProgress(value=0.0, description='pytorch_model.bin', max=1269737156.0, style=ProgressStyle…




Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at facebook/wav2vec2-large-xlsr-53 and are newly initialized: ['lm_head.bias', 'lm_head.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
model.freeze_feature_extractor()



In [None]:
# !pip install --upgrade wandb

from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="/content/gdrive/MyDrive/wav2vec2-large-xlsr-primock-Himanshu",
    # output_dir="./wav2vec2-large-xlsr-turkish-demo",
    group_by_length=True,
    per_device_train_batch_size=8,
    gradient_accumulation_steps=4,
    evaluation_strategy="steps",
    num_train_epochs=30,
    fp16=True,
    save_steps=400, #this would mean every 400 steps model gets saved which also means Google drive gets full
    eval_steps=400,
    logging_steps=400,
    learning_rate=3e-4,
    warmup_steps=500,
    save_total_limit=2,

)



HBox(children=(FloatProgress(value=0.0, description='model.safetensors', max=1269615400.0, style=ProgressStyle…

In [None]:
from transformers import Trainer

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset_subset,
    eval_dataset=test_dataset_subset,
    tokenizer=processor.feature_extractor,
)

  trainer = Trainer(


In [None]:
!pip install --upgrade wandb

import wandb

def disable_git(env=None):
    """Check if wandb.disable_git is set in the environment.
    """
    if env is None:
        env = os.environ
    # Fix: Removed default keyword argument and replaced with positional argument.
    val = env.get(wandb.env.DISABLE_GIT, "False")
    if isinstance(val, str):
        val = False if val.lower() == "false" else True
    return val

# Monkey patch wandb.env.disable_git with the fixed version
wandb.env.disable_git = disable_git




**wandb api key : a89dc5e792b2c4b8d3a3bee660844ff855d5d546**

```function ConnectButton(){
    console.log("Connect pushed");
    document.querySelector("#top-toolbar > colab-connect-button").shadowRoot.querySelector("#connect").click()
}
setInterval(ConnectButton,60000);```

In [None]:
trainer.train()

In [None]:
model.save_pretrained("/content/gdrive/MyDrive/wav2vec2-large-xlsr-primock-Himanshu")
processor.save_pretrained("/content/gdrive/MyDrive/wav2vec2-large-xlsr-primock-Himanshu")


In [None]:
from datasets import load_metric
from IPython.display import Audio, display
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC
import torch

# Paths to your dataset and model
json_file = "/content/primock57_dataset.json"
model_dir = "/content/gdrive/MyDrive/wav2vec2-large-xlsr-primock-Himanshu"  # Path to the directory with the trained model

# Load the trained model and processor
processor = Wav2Vec2Processor.from_pretrained(model_dir)
model = Wav2Vec2ForCTC.from_pretrained(model_dir).to("cuda" if torch.cuda.is_available() else "cpu")
model.eval()

# Load JSON dataset
import json
with open(json_file, "r") as f:
    test_data = json.load(f)

# Ensure valid entries
test_data = [entry for entry in test_data if entry.get("text", "").strip() and entry.get("audio_filepath")]

# Load Word Error Rate metric
wer_metric = load_metric("wer")

# Function for model evaluation
def evaluate_model(data):
    predictions = []
    references = []

    for i, sample in enumerate(data[:12]):  # Test on first 10 samples
        audio_path = sample["audio_filepath"]
        reference = sample["text"]

        # Load audio file
        try:
            speech_array, sampling_rate = torchaudio.load(audio_path)
        except Exception as e:
            print(f"Error loading audio: {audio_path} - {e}")
            continue

        # Resample if needed
        if sampling_rate != 16000:
            print(f"Resampling audio: {audio_path}")
            speech_array = librosa.resample(speech_array[0].numpy(), orig_sr=sampling_rate, target_sr=16000)
        else:
            speech_array = speech_array[0].numpy()

        # Prepare input
        inputs = processor(speech_array, sampling_rate=16000, return_tensors="pt", padding=True)
        input_values = inputs.input_values.to("cuda" if torch.cuda.is_available() else "cpu")

        # Predict
        with torch.no_grad():
            logits = model(input_values).logits
        predicted_ids = torch.argmax(logits, dim=-1)

        # Decode prediction
        transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
        predictions.append(transcription)
        references.append(reference)

        # Debugging outputs
        print(f"Sample {i + 1}")
        print(f"Audio File: {audio_path}")
        display(Audio(audio_path))  # Play the audio file
        print(f"Prediction: {transcription}")
        print(f"Reference: {reference}\n")

    # Calculate Word Error Rate
    wer = wer_metric.compute(predictions=predictions, references=references)
    print(f"Word Error Rate: {wer:.4f}")

# Run evaluation
evaluate_model(test_data)
