<a href="https://colab.research.google.com/github/Himakar098/AnalysisandObservation/blob/main/STT_Model_Team_Code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Step 1: Data Preprocessing**

Converting the Audio files into WAV format andand resample to 16kHz. Converting to mono if they are in stereo format.Using librosa to load and resample audio.

Necessary imports

In [None]:
import librosa
import soundfile as sf
import os
import numpy as np
from IPython.display import Audio, display

## Load files

### Load audio files

In [None]:
def load_audio(file_paths):
    """
    Load audio files.

    :param file_paths: list of str, path(s) to audio file(s)
    :return: list of tuples (file_path, audio_array, sample_rate)
    """
    loaded_files = []

    for file_path in file_paths:
        # Load audio file
        audio, sr = librosa.load(file_path, sr=None, mono=False)
        loaded_files.append((file_path, audio, sr))

        print(f"Loaded {file_path}")
        print(f"Shape: {audio.shape}, Sample rate: {sr}")
        print("---")

    return loaded_files

**usage**

In [None]:
# Load audio files
loaded_files = load_audio("downloads/telugu30min.wav", "downloads/english30min.wav")
# Change to your file system paths
# Can use single or multiple files at a time

You can also directly upload the files usind this

In [None]:
from google.colab import files

uploaded = files.upload()  # This will prompt you to upload files

file_paths = list(uploaded.keys())

Saving common_voice_te_40251053.mp3 to common_voice_te_40251053.mp3
Saving common_voice_te_40432862.mp3 to common_voice_te_40432862.mp3


In [None]:
# Load audio files
loaded_files = load_audio(file_paths)

Loaded common_voice_te_40251053.mp3
Shape: (195840,), Sample rate: 32000
---
Loaded common_voice_te_40432862.mp3
Shape: (161280,), Sample rate: 32000
---


Data **Preprocessing**

In [None]:
def preprocess_audio(loaded_files, target_sr=16000, mono=True):
    """
    Preprocess loaded audio files and display them for listening.

    :param loaded_files: list of tuples (file_path, audio_array, sample_rate)
    :param target_sr: int, target sample rate (default: 16000)
    :param mono: bool, whether to convert to mono (default: True)
    :return: list of tuples (original_path, processed_audio_array, sr)
    """
    processed_files = []

    for file_path, audio, sr in loaded_files:
        original_filename = os.path.splitext(os.path.basename(file_path))[0]

        # Display original audio
        print(f"Original audio: {original_filename}")
        display(Audio(audio, rate=sr))

        # Resample if necessary
        if sr != target_sr:
            audio = librosa.resample(y=audio, orig_sr=sr, target_sr=target_sr)

        # Convert to mono if required
        if mono and audio.ndim > 1:
            audio = librosa.to_mono(audio)

        processed_files.append((file_path, audio, target_sr))

        # Display processed audio
        print(f"Processed audio: {original_filename}")
        display(Audio(audio, rate=target_sr))

        print(f"Processed {file_path}")
        print(f"Shape: {audio.shape}, Sample rate: {target_sr}")
        print("---")

    return processed_files

In [None]:
# Process loaded audio files
processed_files = preprocess_audio(loaded_files, target_sr=16000, mono=True)

Original audio: common_voice_te_40251053


Processed audio: common_voice_te_40251053


Processed common_voice_te_40251053.mp3
Shape: (97920,), Sample rate: 16000
---
Original audio: common_voice_te_40432862


Processed audio: common_voice_te_40432862


Processed common_voice_te_40432862.mp3
Shape: (80640,), Sample rate: 16000
---


# **Data Augmentation Techniques**

In [None]:
import torch
import torchaudio
from transformers import Wav2Vec2Processor
import soundfile as sf

### Add background noise

In [None]:
def add_noise(audio, noise_factor=0.005):
    noise = np.random.randn(len(audio))
    return audio + noise_factor * noise

def apply_noise_to_processed_files(processed_files, noise_factor=0.005):
    augmented_files = []

    for file_path, audio, sr in processed_files:
        augmented_audio = add_noise(audio, noise_factor)
        augmented_files.append((file_path, augmented_audio, sr))

        original_filename = os.path.splitext(os.path.basename(file_path))[0]

        # Display original audio
        print(f"Original audio: {original_filename}")
        display(Audio(audio, rate=sr))

        # Display augmented audio
        print(f"Augmented audio: {original_filename}")
        display(Audio(augmented_audio, rate = target_sr))

        print(f"Applied noise to {file_path}")
        print(f"Original shape: {audio.shape}, Augmented shape: {augmented_audio.shape}")
        print("---")

    return augmented_files

In [None]:
# Usage
target_sr=16000
augmented_audio = apply_noise_to_processed_files(processed_files)

Original audio: common_voice_te_40251053


Augmented audio: common_voice_te_40251053


Applied noise to common_voice_te_40251053.mp3
Original shape: (97920,), Augmented shape: (97920,)
---
Original audio: common_voice_te_40432862


Augmented audio: common_voice_te_40432862


Applied noise to common_voice_te_40432862.mp3
Original shape: (80640,), Augmented shape: (80640,)
---


Speed perturbation

In [None]:
def change_speed(audio, speed_factor):
    return librosa.effects.time_stretch(audio, rate=speed_factor)

def apply_speed_to_processed_files(processed_files, speed_factor=1.0):
    augmented_files = []

    for file_path, audio, sr in processed_files:
        original_filename = os.path.splitext(os.path.basename(file_path))[0]

        # Display original audio
        print(f"Original audio: {original_filename}")
        display(Audio(audio, rate=sr))

        # Apply speed change
        augmented_audio = change_speed(audio, speed_factor)
        augmented_files.append((file_path, augmented_audio, sr))

        # Display augmented audio
        print(f"Augmented audio: {original_filename}")
        display(Audio(augmented_audio, rate=sr))

        print(f"Applied augmentations to {file_path}")
        print(f"Original shape: {audio.shape}, Augmented shape: {augmented_audio.shape}")
        print("---")

    return augmented_files

In [None]:
# Usage
speed_changed_audio = apply_speed_to_processed_files(processed_files, speed_factor=1.2)  # 20% faster

Original audio: common_voice_te_40251053


Augmented audio: common_voice_te_40251053


Applied augmentations to common_voice_te_40251053.mp3
Original shape: (97920,), Augmented shape: (81600,)
---
Original audio: common_voice_te_40432862


Augmented audio: common_voice_te_40432862


Applied augmentations to common_voice_te_40432862.mp3
Original shape: (80640,), Augmented shape: (67200,)
---


Pitch shifting

In [None]:
def pitch_shift(audio, sr, n_steps):
    return librosa.effects.pitch_shift(audio, sr=sr, n_steps=n_steps)

def shift_pitch_to_processed_files(processed_files, sr, n_steps):
    augmented_files = []

    for file_path, audio, sr in processed_files:
        original_filename = os.path.splitext(os.path.basename(file_path))[0]

        # Display original audio
        print(f"Original audio: {original_filename}")
        display(Audio(audio, rate=sr))

        # Apply pitch shift
        augmented_audio = pitch_shift(audio, sr, n_steps)
        augmented_files.append((file_path, augmented_audio, sr))

        # Display augmented audio
        print(f"Augmented audio: {original_filename}")
        display(Audio(augmented_audio, rate=sr))

        print(f"Applied augmentations to {file_path}")
        print(f"Original shape: {audio.shape}, Augmented shape: {augmented_audio.shape}")
        print("---")

In [None]:
# Usage
pitch_shifted_audio = shift_pitch_to_processed_files(processed_files, target_sr, n_steps=2)  # Shift pitch up by 2 semitones

Original audio: common_voice_te_40251053


Augmented audio: common_voice_te_40251053


Applied augmentations to common_voice_te_40251053.mp3
Original shape: (97920,), Augmented shape: (97920,)
---
Original audio: common_voice_te_40432862


Augmented audio: common_voice_te_40432862


Applied augmentations to common_voice_te_40432862.mp3
Original shape: (80640,), Augmented shape: (80640,)
---


SpecAugment (time and frequency masking)

In [None]:
import random
def spec_augment(spec, num_mask=2, freq_masking_max_percentage=0.15, time_masking_max_percentage=0.3):
    spec = spec.copy()
    for i in range(num_mask):
        all_frames_num, all_freqs_num = spec.shape
        freq_percentage = random.uniform(0.0, freq_masking_max_percentage)
        num_freqs_to_mask = int(freq_percentage * all_freqs_num)
        f0 = np.random.uniform(low=0.0, high=all_freqs_num - num_freqs_to_mask)
        f0 = int(f0)
        spec[:, f0:f0 + num_freqs_to_mask] = 0
        time_percentage = random.uniform(0.0, time_masking_max_percentage)
        num_frames_to_mask = int(time_percentage * all_frames_num)
        t0 = np.random.uniform(low=0.0, high=all_frames_num - num_frames_to_mask)
        t0 = int(t0)
        spec[t0:t0 + num_frames_to_mask, :] = 0
    return spec

def apply_spec_augment_to_processed_files(processed_files, num_mask=2, freq_masking_max_percentage=0.15, time_masking_max_percentage=0.3):
    augmented_files = []

    for file_path, audio, sr in processed_files:
        original_filename = os.path.splitext(os.path.basename(file_path))[0]

        # Display original audio
        print(f"Original audio: {original_filename}")
        display(Audio(audio, rate=sr))

        # Convert to mel spectrogram
        mel_spec = librosa.feature.melspectrogram(y=audio, sr=sr)

        # Apply spectral augmentation
        augmented_spec = spec_augment(mel_spec, num_mask, freq_masking_max_percentage, time_masking_max_percentage)

        # Convert back to audio
        augmented_audio = librosa.feature.inverse.mel_to_audio(augmented_spec, sr=sr)

        augmented_files.append((file_path, augmented_audio, sr))

        # Display augmented audio
        print(f"Augmented audio: {original_filename}")
        display(Audio(augmented_audio, rate=sr))

        print(f"Applied spectral augmentation to {file_path}")
        print(f"Original shape: {audio.shape}, Augmented shape: {augmented_audio.shape}")
        print("---")

    return augmented_files

In [None]:
# Apply spectral augmentation to all files
augmented_files = apply_spec_augment_to_processed_files(
        processed_files,
        num_mask=2,
        freq_masking_max_percentage=0.15,
        time_masking_max_percentage=0.3)

Original audio: common_voice_te_40251053


Augmented audio: common_voice_te_40251053


Applied spectral augmentation to common_voice_te_40251053.mp3
Original shape: (97920,), Augmented shape: (97792,)
---
Original audio: common_voice_te_40432862


Augmented audio: common_voice_te_40432862


Applied spectral augmentation to common_voice_te_40432862.mp3
Original shape: (80640,), Augmented shape: (80384,)
---


Combining augmentations -- do not run this part. Need to be updated.

In [None]:
def augment_audio(audio, sr, impulse_response=None):
    augmentations = [
        add_noise,
        lambda x: change_speed(x, np.random.uniform(0.9, 1.1)),
        lambda x: pitch_shift(x, sr, np.random.randint(-2, 3)),
    ]

    if impulse_response is not None:
        augmentations.append(lambda x: apply_impulse_response(x, impulse_response))

    # Randomly choose 2-3 augmentations
    num_augs = np.random.randint(2, 4)
    chosen_augmentations = np.random.choice(augmentations, num_augs, replace=False)

    for aug_func in chosen_augmentations:
        audio = aug_func(audio)

    # Always apply spec augment as the last step
    mel_spec = librosa.feature.melspectrogram(y=audio, sr=sr)
    augmented_spec = spec_augment(mel_spec)
    audio = librosa.feature.inverse.mel_to_audio(augmented_spec, sr=sr)

    return audio

Room impulse response simulation

In [None]:
from scipy import signal
def apply_impulse_response(audio, impulse_response):
    return signal.convolve(audio, impulse_response, mode='full')[:len(audio)]
def apply_impulse_response_to_processed_files(processed_files, impulse_response_path):
    augmented_files = []

    # Load impulse response
    impulse_response, ir_sr = librosa.load(impulse_response_path, sr=None)

    for file_path, audio, sr in processed_files:
        original_filename = os.path.splitext(os.path.basename(file_path))[0]

        # Display original audio
        print(f"Original audio: {original_filename}")
        display(Audio(audio, rate=sr))

        # Apply room impulse response
        reverb_audio = apply_impulse_response(audio, impulse_response)

        # Display reverb audio
        print(f"Reverb audio: {original_filename}")
        display(Audio(reverb_audio, rate=sr))
        augmented_files.append((file_path, augmented_audio, sr))

        # Display augmented audio
        print(f"Augmented audio: {original_filename}")
        display(Audio(augmented_audio, rate=sr))

        print(f"Applied augmentations to {file_path}")
        print(f"Original shape: {audio.shape}, Augmented shape: {augmented_audio.shape}")
        print("---")

    return augmented_files

In [None]:
impulse_response_path = "Downloads/impulse_response.wav"
augmented_files = apply_impulse_response_to_processed_files(
        processed_files,
        impulse_response_path)

# Testing Models

In [None]:
from huggingface_hub import notebook_login

notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

### 1. **Whisper**:

Data preprocessing:

a. Audio files should be in WAV format, 16-bit PCM.

b. Use librosa or pydub to load and resample audio to 16kHz if necessary.

Data augmentation:

a. Add background noise

b. Apply speed perturbation

c. Simulate room acoustics


In [None]:
from transformers import WhisperFeatureExtractor

feature_extractor = WhisperFeatureExtractor.from_pretrained("openai/whisper-small")

from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained("openai/whisper-small", language="detect", task="transcribe")

from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained("openai/whisper-small", language="detect", task="transcribe")

In [None]:
import torch
import numpy as np
from transformers import WhisperFeatureExtractor, WhisperTokenizer, WhisperForConditionalGeneration
import warnings

# Suppress specific warnings
warnings.filterwarnings("ignore", message="Special tokens have been added in the vocabulary")

def process_audio_files(processed_files, model_name="openai/whisper-small"):
    # Load the model components
    feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name)
    tokenizer = WhisperTokenizer.from_pretrained(model_name, language="detect", task="transcribe")
    model = WhisperForConditionalGeneration.from_pretrained(model_name)

    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)

    results = []

    for file_path, audio, sr in processed_files:
        # Ensure audio is float32 and normalized
        audio = audio.astype(np.float32)
        if np.max(np.abs(audio)) > 1.0:
            audio = audio / np.max(np.abs(audio))

        # Extract features
        input_features = feature_extractor(audio, sampling_rate=sr, return_tensors="pt").input_features.to(device)

        # Generate token ids
        with torch.no_grad():
            predicted_ids = model.generate(input_features)

        # Decode token ids to text
        transcription = tokenizer.batch_decode(predicted_ids, skip_special_tokens=True)[0]

        # Extract language and transcription
        detected_language = tokenizer.decode(predicted_ids[0][1:2])  # Language token
        transcribed_text = tokenizer.decode(predicted_ids[0][2:], skip_special_tokens=True)  # Rest of the transcription

        results.append({
            "file_path": file_path,
            "detected_language": detected_language.strip(),
            "transcription": transcribed_text.strip()
        })

    return results

In [None]:
# Process the files
results = process_audio_files(processed_files)

# Print results
for result in results:
    print(f"File: {result['file_path']}")
    print(f"Detected Language: {result['detected_language']}")
    print(f"Transcription: {result['transcription']}")
    print("---")

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


### **2. Wav2vec2**

In [None]:
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor
import torch

audio, sr = preprocess_audio("downlaod/audio.wav")
augmented_audio = augment_audio(audio, sr)
save_audio("augmented_audio.wav", augmented_audio, sr)

processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h")
model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h")

input_audio, _ = librosa.load("audio.wav", sr=16000)
inputs = processor(input_audio, sampling_rate=16000, return_tensors="pt")

with torch.no_grad():
    logits = model(inputs.input_values).logits
predicted_ids = torch.argmax(logits, dim=-1)
transcription = processor.batch_decode(predicted_ids)[0]
print(transcription)

### 3. Wav2vec2-BERT

In [None]:
from transformers import Wav2Vec2BertForCTC, Wav2Vec2BertProcessor
import torch

audio, sr = preprocess_audio("downlaod/audio.wav")
augmented_audio = augment_audio(audio, sr)

processor = Wav2Vec2BertProcessor.from_pretrained("facebook/wav2vec2-bert-960h")
model = Wav2Vec2BertForCTC.from_pretrained("facebook/wav2vec2-bert-960h")

input_audio, _ = librosa.load("audio.wav", sr=16000)
inputs = processor(input_audio, sampling_rate=16000, return_tensors="pt")

with torch.no_grad():
    logits = model(inputs.input_values).logits
predicted_ids = torch.argmax(logits, dim=-1)
transcription = processor.batch_decode(predicted_ids)[0]
print(transcription)

### SeamlessM4T v2

In [None]:
from seamless_communication.models.inference import Translator

audio, sr = preprocess_audio("path/to/audio.wav")
augmented_audio = augment_audio(audio, sr)

# Save augmented audio to a temporary file (Another way using librosa)
librosa.output.write_wav("temp_augmented.wav", augmented_audio, sr)

translator = Translator("seamlessM4T_v2_large", "cpu")
translated_text, _, _ = translator.predict("audio.wav", "eng", "fra", task="S2TT")
print(translated_text)

# Evaluating the performance of models using:

### Accuracy Metrics:

a. Word Error Rate (WER):
WER is commonly used for speech recognition tasks. It measures the edit distance between the predicted and reference transcriptions.

b. BLEU Score:
BLEU is typically used for translation tasks, but can also be applied to speech recognition when comparing against reference transcriptions.

c. Character Error Rate (CER):
Similar to WER, but operates at the character level instead of word level.

d. Phoneme Error Rate (PER):
Useful for evaluating phoneme-level accuracy, especially for low-resource languages.

In [None]:
from jiwer import wer
from nltk.translate.bleu_score import sentence_bleu
import Levenshtein

def calculate_wer(reference, hypothesis):
    return wer(reference, hypothesis)

def calculate_cer(reference, hypothesis):
    return Levenshtein.distance(reference, hypothesis) / len(reference)

def calculate_bleu(reference, hypothesis):
    return sentence_bleu([reference.split()], hypothesis.split())

# Phoneme Error Rate calculation (assuming you have a phoneme transcription)
def calculate_per(reference_phonemes, hypothesis_phonemes):
    return Levenshtein.distance(reference_phonemes, hypothesis_phonemes) / len(reference_phonemes)

### Inference Speed and Resource Usage

To measure inference speed and resource usage, we'll use the time module and psutil library.

In [None]:
import time
import psutil
import torch

def measure_performance(model, input_data, inference_function):
    start_time = time.time()
    start_memory = psutil.virtual_memory().used

    if torch.cuda.is_available():
        start_gpu_memory = torch.cuda.memory_allocated()

    # Run inference
    output = inference_function(model, input_data)

    end_time = time.time()
    end_memory = psutil.virtual_memory().used

    if torch.cuda.is_available():
        end_gpu_memory = torch.cuda.memory_allocated()
        gpu_memory_used = (end_gpu_memory - start_gpu_memory) / 1024 / 1024  # MB
    else:
        gpu_memory_used = 0

    inference_time = end_time - start_time
    cpu_memory_used = (end_memory - start_memory) / 1024 / 1024  # MB

    return output, inference_time, cpu_memory_used, gpu_memory_used

Measure the metricies for the above models

In [None]:
from seamless_communication.models.inference import Translator
import torch
import numpy as np

def evaluate_models(audio_file, reference_text):
    audio, sr = preprocess_audio(audio_file)

    # Whisper
    def whisper_inference(model, audio):
        return model.transcribe(audio)["text"]

    whisper_model = whisper.load_model("base")
    whisper_output, whisper_time, whisper_cpu, whisper_gpu = measure_performance(whisper_model, audio, whisper_inference)

    # wav2vec2
    def wav2vec2_inference(model, audio):
        inputs = processor(audio, sampling_rate=16000, return_tensors="pt")
        with torch.no_grad():
            logits = model(inputs.input_values).logits
        predicted_ids = torch.argmax(logits, dim=-1)
        return processor.batch_decode(predicted_ids)[0]

    processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h")
    wav2vec2_model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h")
    wav2vec2_output, wav2vec2_time, wav2vec2_cpu, wav2vec2_gpu = measure_performance(wav2vec2_model, audio, wav2vec2_inference)

    # wav2vec2-BERT
    processor_bert = Wav2Vec2BertProcessor.from_pretrained("facebook/wav2vec2-bert-960h")
    wav2vec2_bert_model = Wav2Vec2BertForCTC.from_pretrained("facebook/wav2vec2-bert-960h")
    wav2vec2_bert_output, wav2vec2_bert_time, wav2vec2_bert_cpu, wav2vec2_bert_gpu = measure_performance(wav2vec2_bert_model, audio, wav2vec2_inference)

    # SeamlessM4T v2
    def seamless_inference(model, audio):
        return model.predict(audio, "eng", "eng", task="S2TT")[0]

    seamless_model = Translator("seamlessM4T_v2_large", "cpu")
    seamless_output, seamless_time, seamless_cpu, seamless_gpu = measure_performance(seamless_model, audio_file, seamless_inference)

    # Calculate accuracy metrics
    models = ["Whisper", "wav2vec2", "wav2vec2-BERT", "SeamlessM4T v2"]
    outputs = [whisper_output, wav2vec2_output, wav2vec2_bert_output, seamless_output]
    times = [whisper_time, wav2vec2_time, wav2vec2_bert_time, seamless_time]
    cpu_memories = [whisper_cpu, wav2vec2_cpu, wav2vec2_bert_cpu, seamless_cpu]
    gpu_memories = [whisper_gpu, wav2vec2_gpu, wav2vec2_bert_gpu, seamless_gpu]

    results = []
    for model, output, time, cpu_mem, gpu_mem in zip(models, outputs, times, cpu_memories, gpu_memories):
        wer = calculate_wer(reference_text, output)
        cer = calculate_cer(reference_text, output)
        bleu = calculate_bleu(reference_text, output)

        results.append({
            "Model": model,
            "WER": wer,
            "CER": cer,
            "BLEU": bleu,
            "Inference Time (s)": time,
            "CPU Memory (MB)": cpu_mem,
            "GPU Memory (MB)": gpu_mem
        })

    return results

# Example usage
audio_file = "path/to/audio.wav"
reference_text = "This is the reference transcription."
results = evaluate_models(audio_file, reference_text)

# Print results
for result in results:
    print(f"\nModel: {result['Model']}")
    print(f"WER: {result['WER']:.4f}")
    print(f"CER: {result['CER']:.4f}")
    print(f"BLEU: {result['BLEU']:.4f}")
    print(f"Inference Time: {result['Inference Time (s)']:.4f} seconds")
    print(f"CPU Memory: {result['CPU Memory (MB)']:.2f} MB")
    print(f"GPU Memory: {result['GPU Memory (MB)']:.2f} MB")