# VOSK

In [78]:
import os
import wave
import json
from vosk import Model, KaldiRecognizer
from pydub import AudioSegment
import librosa
import soundfile as sf
import noisereduce as nr
import unicodedata
import re

# Normalize text
def normalize_text(text):
    text = text.lower()
    text = re.sub(r"-", " ", text)
    text = re.sub(r"[^\w\s]", "", text)
    text = unicodedata.normalize('NFD', text)
    text = "".join([c for c in text if unicodedata.category(c) != 'Mn'])
    return text

# Adaptive noise reduction
def adaptive_noise_reduction(input_file, output_file):
    print(f"Loading {input_file} for adaptive noise reduction...")
    try:
        y, sr = librosa.load(input_file, sr=None)
        if len(y) == 0:
            print("Warning: Input audio is empty. Copying as is.")
            sf.write(output_file, y, sr)
            return input_file

        noise_profile = y[:int(sr * 0.8)]  # Use first 0.5 seconds for noise profile
        y_denoised = nr.reduce_noise(y=y, y_noise=noise_profile, sr=sr, prop_decrease=0.7)
        sf.write(output_file, y_denoised, sr)
        print(f"Noise-reduced audio saved to {output_file}")
        return output_file
    except Exception as e:
        print(f"Error in noise reduction: {e}")
        return input_file

# Equalization with language-specific parameters
def apply_equalization(input_file, output_file, lang):
    print(f"Applying equalization to {input_file} for language {lang}...")
    try:
        audio = AudioSegment.from_wav(input_file)

        # Language-specific high-pass and low-pass filter values
        if lang == "en":
            high_pass = 200
            low_pass = 2000
        elif lang == "es":
            high_pass = 250
            low_pass = 3500
        elif lang == "it":
            high_pass = 100
            low_pass = 3000
        else:
            high_pass = 200
            low_pass = 5000  # Default values

        boosted_audio = audio.high_pass_filter(high_pass).low_pass_filter(low_pass)
        boosted_audio.export(output_file, format="wav")
        print(f"Equalized audio saved to {output_file}")
        return output_file
    except Exception as e:
        print(f"Error in equalization: {e}")
        return input_file

# Preprocessing pipeline
def preprocess_audio(input_file, output_file, lang):
    if not os.path.exists(input_file):
        print(f"Error: File {input_file} does not exist.")
        return None

    print(f"Preprocessing {input_file} for language {lang}...")
    try:
        noise_reduced_file = "temp_noise_reduced.wav"
        noise_reduced = adaptive_noise_reduction(input_file, noise_reduced_file)

        equalized_file = "temp_equalized.wav"
        equalized = apply_equalization(noise_reduced, equalized_file, lang)

        final_audio = AudioSegment.from_wav(equalized).set_channels(1).set_frame_rate(16000)
        final_audio.export(output_file, format="wav")
        print(f"Final preprocessed audio saved to {output_file}")
        return output_file
    except Exception as e:
        print(f"Error in preprocessing audio: {e}")
        return None

# Transcribe using Vosk
def transcribe_vosk(audio_file, model_path):
    print(f"Processing audio file: {audio_file}")
    try:
        model = Model(model_path)
        rec = KaldiRecognizer(model, 16000)
        rec.SetWords(True)
    except Exception as e:
        print(f"Error loading Vosk model: {e}")
        return "Error: Failed to load Vosk model."

    try:
        with wave.open(audio_file, "rb") as wf:
            while True:
                data = wf.readframes(4000)
                if len(data) == 0:
                    break
                rec.AcceptWaveform(data)
        result = json.loads(rec.FinalResult())
        return result.get("text", "")
    except Exception as e:
        print(f"Error processing audio with Vosk: {e}")
        return "Error: Failed to process audio."

# Calculate WER
def calculate_wer_custom(reference, hypothesis):
    """
    Calculates Word Error Rate (WER) with normalization applied,
    ensuring minor impact for single-letter words and better handling of deletions and insertions.
    """
    # Normalize both reference and hypothesis
    reference = normalize_text(reference)
    hypothesis = normalize_text(hypothesis)

    # Tokenize into words
    reference_words = reference.split()
    hypothesis_words = hypothesis.split()

    # Initialize substitution, deletion, and insertion counters
    substitutions, deletions, insertions = 0, 0, 0

    # Align reference and hypothesis words using dynamic programming (Levenshtein distance)
    ref_len, hyp_len = len(reference_words), len(hypothesis_words)
    dp = [[0] * (hyp_len + 1) for _ in range(ref_len + 1)]

    # Fill the DP table
    for i in range(ref_len + 1):
        for j in range(hyp_len + 1):
            if i == 0:
                dp[i][j] = j  # All insertions
            elif j == 0:
                dp[i][j] = i  # All deletions
            elif reference_words[i - 1] == hypothesis_words[j - 1]:
                dp[i][j] = dp[i - 1][j - 1]  # No penalty for a match
            else:
                # Assign weights: 0.3 for single-letter mismatches, 1 for others
                subst_cost = (
                    0.3
                    if len(reference_words[i - 1]) == 1 or len(hypothesis_words[j - 1]) == 1
                    else 1
                )
                dp[i][j] = min(
                    dp[i - 1][j - 1] + subst_cost,  # Substitution
                    dp[i - 1][j] + 1,  # Deletion
                    dp[i][j - 1] + 1,  # Insertion
                )

    # Backtrack to count substitutions, deletions, and insertions
    i, j = ref_len, hyp_len
    while i > 0 and j > 0:
        if reference_words[i - 1] == hypothesis_words[j - 1]:
            i -= 1
            j -= 1
        elif dp[i][j] == dp[i - 1][j - 1] + (
            0.3 if len(reference_words[i - 1]) == 1 or len(hypothesis_words[j - 1]) == 1 else 1
        ):
            substitutions += 0.3 if len(reference_words[i - 1]) == 1 or len(hypothesis_words[j - 1]) == 1 else 1
            i -= 1
            j -= 1
        elif dp[i][j] == dp[i - 1][j] + 1:
            deletions += 0.3 if len(reference_words[i - 1]) == 1 else 1
            i -= 1
        else:
            insertions += 0.3 if len(hypothesis_words[j - 1]) == 1 else 1
            j -= 1

    # Handle remaining deletions and insertions
    while i > 0:
        deletions += 0.3 if len(reference_words[i - 1]) == 1 else 1
        i -= 1
    while j > 0:
        insertions += 0.3 if len(hypothesis_words[j - 1]) == 1 else 1
        j -= 1

    # Calculate WER
    reference_length = len(reference_words)

    # Handle edge case for empty reference
    if reference_length == 0:
        return 100.0 if substitutions + deletions + insertions > 0 else 0.0

    wer = ((substitutions + deletions + insertions) / reference_length) * 100
    return round(wer, 2)

# Evaluation function
def evaluate_asr():
    audio_files = [
        {"file": "Ex4_audio_files/EN/checkin.wav", "lang": "en", "truth": "Where is the check-in desk?"},
        {"file": "Ex4_audio_files/EN/parents.wav", "lang": "en", "truth": "I have lost my parents."},
        {"file": "Ex4_audio_files/EN/suitcase.wav", "lang": "en", "truth": "Please, I have lost my suitcase."},
        {"file": "Ex4_audio_files/EN/what_time.wav", "lang": "en", "truth": "What time is my plane?"},
        {"file": "Ex4_audio_files/EN/where.wav", "lang": "en", "truth": "Where are the restaurants and shops?"},
        {"file": "Ex4_audio_files/IT/checkin_it.wav", "lang": "it", "truth": "Dove e' il bancone?"},
        {"file": "Ex4_audio_files/IT/parents_it.wav", "lang": "it", "truth": "Ho perso i miei genitori."},
        {"file": "Ex4_audio_files/IT/suitcase_it.wav", "lang": "it", "truth": "Per favore, ho perso la mia valigia."},
        {"file": "Ex4_audio_files/IT/what_time_it.wav", "lang": "it", "truth": "A che ora e' il mio aereo?"},
        {"file": "Ex4_audio_files/IT/where_it.wav", "lang": "it", "truth": "Dove sono i ristoranti e i negozi?"},
        {"file": "Ex4_audio_files/ES/checkin_es.wav", "lang": "es", "truth": "¿Dónde están los mostradores?"},
        {"file": "Ex4_audio_files/ES/parents_es.wav", "lang": "es", "truth": "He perdido a mis padres."},
        {"file": "Ex4_audio_files/ES/suitcase_es.wav", "lang": "es", "truth": "Por favor, he perdido mi maleta."},
        {"file": "Ex4_audio_files/ES/what_time_es.wav", "lang": "es", "truth": "¿A qué hora es mi avión?"},
        {"file": "Ex4_audio_files/ES/where_es.wav", "lang": "es", "truth": "¿Dónde están los restaurantes y las tiendas?"},
        {"file": "Ex4_audio_files/EN/where-is.mp3", "lang": "en", "truth": "Where is my keyboard?"},
        {"file": "Ex4_audio_files/EN/blanket.mp3", "lang": "en", "truth": "Can I have a blanket please"},
    ]

    model_paths = {
        "en": "vosk/vosk-model-small-en-us-0.15",
        "it": "vosk/vosk-model-small-it-0.22",
        "es": "vosk/vosk-model-small-es-0.42"
    }

    results = []
    language_wer = {"EN": [], "IT": [], "ES": []}

    for item in audio_files:
        file = item["file"]
        lang = item["lang"]
        truth = item["truth"]

        print(f"\nProcessing {file} ({lang.upper()})...")
        preprocessed_file = f"preprocessed_{os.path.basename(file)}"
        recognized_text = ""

        try:
            preprocess_audio(file, preprocessed_file, lang)
            recognized_text = transcribe_vosk(preprocessed_file, model_paths[lang])
        except Exception as e:
            recognized_text = f"Error: {e}"

        wer = calculate_wer_custom(truth, recognized_text)
        print(f"Reference: {truth}")
        print(f"Recognized: {recognized_text}")
        print(f"WER: {wer:.2f}%")

        language_wer[lang.upper()].append(wer)
        results.append({"File": file, "Language": lang.upper(), "WER": wer})

    print("\n--- Evaluation Results ---")
    for res in results:
        print(f"{res['File']:<30} {res['Language']:<10} {res['WER']:<10.2f}")

    print("\n--- Average WER by Language ---")
    for lang, wer_list in language_wer.items():
        avg_wer = sum(wer_list) / len(wer_list) if wer_list else 0
        print(f"{lang}: {avg_wer:.2f}%")

if __name__ == "__main__":
    evaluate_asr()



Processing Ex4_audio_files/EN/checkin.wav (EN)...
Preprocessing Ex4_audio_files/EN/checkin.wav for language en...
Loading Ex4_audio_files/EN/checkin.wav for adaptive noise reduction...
Noise-reduced audio saved to temp_noise_reduced.wav
Applying equalization to temp_noise_reduced.wav for language en...
Equalized audio saved to temp_equalized.wav
Final preprocessed audio saved to preprocessed_checkin.wav
Processing audio file: preprocessed_checkin.wav
Reference: Where is the check-in desk?
Recognized: where is the check in desk
WER: 0.00%

Processing Ex4_audio_files/EN/parents.wav (EN)...
Preprocessing Ex4_audio_files/EN/parents.wav for language en...
Loading Ex4_audio_files/EN/parents.wav for adaptive noise reduction...
Noise-reduced audio saved to temp_noise_reduced.wav
Applying equalization to temp_noise_reduced.wav for language en...
Equalized audio saved to temp_equalized.wav
Final preprocessed audio saved to preprocessed_parents.wav
Processing audio file: preprocessed_parents.wav

# DEEPSPEECH(very bad performance)

In [71]:
import os
import deepspeech
import wave
from vosk import Model, KaldiRecognizer
import json
import jiwer
import nltk
from nltk.corpus import stopwords
from pydub import AudioSegment, effects
import numpy as np
import re
import noisereduce as nr
import librosa
import soundfile as sf

import librosa.effects
import scipy.signal as signal
from scipy.signal import wiener
import unicodedata
from langdetect import detect
import language_tool_python


# Time-stretching to slow down audio without affecting pitch
def slow_down_preserving_pitch(input_file, output_file, stretch_factor=0.7):
    """
    Slows down audio while preserving pitch using librosa's time_stretch.
    - stretch_factor: Less than 1 slows down; greater than 1 speeds up.
    """
    try:
        y, sr = librosa.load(input_file, sr=None)
        y_slowed = librosa.effects.time_stretch(y, rate=stretch_factor)
        sf.write(output_file, y_slowed, sr)
        print(f"Processed audio saved to {output_file}")
    except Exception as e:
        print(f"Error in slowing down audio: {e}")
        sf.write(output_file, y, sr)


# Noise reduction (adaptive approach)
def adaptive_noise_reduction(input_file, output_file, prop_decrease):
    print(f"Loading {input_file} for adaptive noise reduction...")
    try:
        y, sr = librosa.load(input_file, sr=16000)
        if len(y) == 0:
            print("Warning: Input audio is empty. Copying as is.")
            sf.write(output_file, y, sr)
            return output_file

        noise_profile = y[:int(sr * 0.5)]  # Use first 0.5 seconds for noise profile
        y_denoised = nr.reduce_noise(y=y, y_noise=noise_profile, sr=sr, prop_decrease=prop_decrease)
        sf.write(output_file, y_denoised, sr)
        print(f"Noise-reduced audio saved to {output_file}")
        return output_file
    except Exception as e:
        print(f"Error in noise reduction: {e}")
        return input_file

# Compression and amplification
def compress_and_amplify(input_file, output_file, threshold, ratio):
    print(f"Compressing and amplifying speech in {input_file}...")
    try:
        if not os.path.exists(input_file):
            print("Error: Input file does not exist for compression.")
            return None

        audio = AudioSegment.from_wav(input_file)
        if len(audio) == 0:
            print("Warning: Audio is empty during compression. Copying as is.")
            audio.export(output_file, format="wav")
            return output_file

        compressed_audio = effects.compress_dynamic_range(
            audio,
            threshold=threshold,
            ratio=ratio,
            attack=20.0,
            release=100.0
        )

        normalized_audio = effects.normalize(compressed_audio)
        normalized_audio.export(output_file, format="wav")
        print(f"Compressed and amplified audio saved to {output_file}")
        return output_file
    except Exception as e:
        print(f"Error in compression and amplification: {e}")
        return input_file

# Equalization
def apply_equalization(input_file, output_file, low_pass_cutoff, high_pass_cutoff):
    print(f"Applying equalization to {input_file}...")
    try:
        if not os.path.exists(input_file):
            print("Error: Input file does not exist for EQ.")
            return None

        audio = AudioSegment.from_wav(input_file)
        if len(audio) == 0:
            print("Warning: Audio is empty during EQ. Copying as is.")
            audio.export(output_file, format="wav")
            return output_file

        boosted_audio = audio.high_pass_filter(high_pass_cutoff).low_pass_filter(low_pass_cutoff)
        boosted_audio.export(output_file, format="wav")
        print(f"Equalized audio saved to {output_file}")
        return output_file
    except Exception as e:
        print(f"Error in equalization: {e}")
        return input_file

# Analyze audio characteristics
def analyze_audio(input_file):
    print("Analyzing audio for parameter selection...")
    try:
        y, sr = librosa.load(input_file, sr=16000, mono=True)
        duration = librosa.get_duration(y=y, sr=sr)
        print(f"Audio duration (for analysis): {duration:.2f}s")

        noise_level = detect_noise_level(input_file)

        if len(y) > 0:
            rms = np.sqrt(np.mean(y**2))
            peak = np.max(np.abs(y))
        else:
            rms = 0
            peak = 0

        rms_db = 20 * np.log10(rms) if rms > 0 else -100
        peak_db = 20 * np.log10(peak) if peak > 0 else -100
        dynamic_range = peak_db - rms_db
        print(f"RMS dB: {rms_db:.2f}, Peak dB: {peak_db:.2f}, Dynamic Range: {dynamic_range:.2f} dB")

        # Frequency analysis for EQ decisions
        if len(y) > 0:
            S = np.abs(librosa.stft(y, n_fft=1024, hop_length=512))
            freqs = librosa.fft_frequencies(sr=sr, n_fft=1024)
            low_freq_mask = freqs < 300
            high_freq_mask = freqs > 3000

            low_energy = np.mean(S[low_freq_mask, :]) if np.any(low_freq_mask) else 0
            high_energy = np.mean(S[high_freq_mask, :]) if np.any(high_freq_mask) else 0
            mean_energy = np.mean(S)
        else:
            low_energy = 0
            high_energy = 0
            mean_energy = 0

        # Adjust noise reduction settings for short audio
        if noise_level > -25:
            noise_prop_decrease = 0.3 # Stronger noise reduction for very noisy audio
        elif noise_level > -35:
            noise_prop_decrease = 0.2 # Moderate noise reduction
        else:
            noise_prop_decrease = 0.1  # Light noise reduction for clean audio

        # Adjust compression logic for short but valid audio
        if dynamic_range > 18 and duration > 3.0:
            apply_compression = True
            comp_threshold = -25.0
            comp_ratio = 1.5
        elif dynamic_range > 18 and duration <= 3.0:
            apply_compression = True
            comp_threshold = -22.0  # Lower threshold for shorter audio
            comp_ratio = 1.2  # Slightly weaker compression
        else:
            apply_compression = False
            comp_threshold = None
            comp_ratio = None

         # Refined EQ logic for short audio
        if mean_energy > 0:
            if low_energy > (mean_energy * 1.5):
                high_pass_cutoff = 200
            else:
                high_pass_cutoff = 100

            if high_energy > (mean_energy * 1.5):
                low_pass_cutoff = 3500
            else:
                low_pass_cutoff = 3000
        else:
            high_pass_cutoff = 100
            low_pass_cutoff = 2000

        # Adjust for short audio (2–3 seconds)
        if duration < 3.0:
            skip_eq = False  # Apply EQ since audio is valid for full processing
            noise_prop_decrease = 0.2  # Moderate noise reduction
        else:
            skip_eq = False  # Always process EQ for longer audio

        # Skip EQ for clean audio with very low noise
        if noise_level < -30:
            skip_eq = True

        print("Analysis complete. Parameters chosen:")
        print(f"Noise Prop Decrease: {noise_prop_decrease}")
        print(f"Apply Compression: {apply_compression}, Threshold: {comp_threshold}, Ratio: {comp_ratio}")
        print(f"EQ: Low-pass {low_pass_cutoff} Hz, High-pass {high_pass_cutoff} Hz, Skip EQ: {skip_eq}")

        return {
            'noise_prop_decrease': noise_prop_decrease,
            'apply_compression': apply_compression,
            'comp_threshold': comp_threshold,
            'comp_ratio': comp_ratio,
            'high_pass_cutoff': high_pass_cutoff,
            'low_pass_cutoff': low_pass_cutoff,
            'skip_eq': skip_eq
        }
    except Exception as e:
        print(f"Error in analyzing audio: {e}")
        return {
            'noise_prop_decrease': 0.2,
            'apply_compression': False,
            'comp_threshold': None,
            'comp_ratio': None,
            'high_pass_cutoff': 200,
            'low_pass_cutoff': 3000,
            'skip_eq': True
        }

# Noise level detection
def detect_noise_level(input_file):
    try:
        audio = AudioSegment.from_wav(input_file)
        noise_level = audio.dBFS
        print(f"Detected noise level: {noise_level:.2f} dBFS")
        return noise_level
    except Exception as e:
        print(f"Error in detecting noise level: {e}")
        return -100

# Preprocess audio pipeline
def preprocess_audio(input_file, output_file):
    if not os.path.exists(input_file):
        print(f"Error: File {input_file} does not exist.")
        return None

    print(f"Preprocessing {input_file}...")

    try:
        # Load audio and check duration
        audio = AudioSegment.from_wav(input_file)
        audio_duration = len(audio) / 1000.0  # Convert from ms to seconds

        if audio_duration < 1.4:
            print("Audio is too short and may be too fast. Slowing down...")
            temp_slowed_file = "temp_slowed.wav"
            slow_down_preserving_pitch(input_file, temp_slowed_file, stretch_factor=0.85)
            input_file = temp_slowed_file  # Update input file to the slowed version

        # Reanalyze audio after slowing down (if applicable)
        params = analyze_audio(input_file)

        # Perform noise reduction
        noise_reduced_file = "temp_noise_reduced.wav"
        adaptive_noise_reduction(input_file, noise_reduced_file, params['noise_prop_decrease'])

        processed_file = noise_reduced_file

        # Apply compression if needed
        if params['apply_compression']:
            compressed_file = "temp_compressed.wav"
            compress_and_amplify(processed_file, compressed_file, params['comp_threshold'], params['comp_ratio'])
            processed_file = compressed_file

        # Apply EQ if not skipped
        if not params['skip_eq']:
            equalized_file = "temp_equalized.wav"
            apply_equalization(processed_file, equalized_file, params['low_pass_cutoff'], params['high_pass_cutoff'])
            processed_file = equalized_file

        # Convert to mono, 16kHz, and normalize at the end
        print("Converting to mono, adjusting sample rate, and normalizing final audio...")
        if not os.path.exists(processed_file):
            print("Error: Processed file does not exist, aborting.")
            return None

        final_audio = AudioSegment.from_wav(processed_file).set_channels(1).set_frame_rate(16000)
        final_audio = effects.normalize(final_audio)

        final_length = len(final_audio) / 1000.0
        print(f"Final audio duration: {final_length:.2f}s")

        final_audio.export(output_file, format="wav")
        print(f"Final preprocessed audio saved to {output_file}, length: {final_length:.2f}s")

        return output_file
    except Exception as e:
        print(f"Error in preprocessing audio: {e}")
        return None

# Function to transcribe using DeepSpeech
def transcribe_deepspeech(audio_file, model_path, scorer_path):
    print(f"Processing audio file: {audio_file}")
    ds = deepspeech.Model(model_path)
    ds.enableExternalScorer(scorer_path)
    print("Model and scorer loaded successfully.")

    with wave.open(audio_file, 'rb') as wf:
        # Verify WAV file properties
        if wf.getsampwidth() != 2:
            raise ValueError("Audio file must be 16-bit PCM.")
        if wf.getnchannels() != 1:
            raise ValueError("Audio file must be mono.")
        if wf.getframerate() != 16000:
            raise ValueError("Audio file must have a sample rate of 16 kHz.")

        frames = wf.readframes(wf.getnframes())
        audio_data = np.frombuffer(frames, dtype=np.int16)

    text = ds.stt(audio_data)
    return text

def normalize_text(text):
    """
    Normalizes text by:
    - Converting to lowercase
    - Replacing hyphens with spaces (to separate compound words)
    - Removing other punctuation
    - Removing accents from characters
    """
    # Convert all text to lowercase
    text = text.lower()
    
    # Replace hyphens with spaces to separate words
    text = re.sub(r"-", " ", text)
    
    # Remove remaining punctuation, except for word and space characters
    text = re.sub(r"[^\w\s]", "", text)
    
    # Remove accents from characters
    # Normalize to NFD to separate accents, then remove them
    text = unicodedata.normalize('NFD', text)
    text = "".join([c for c in text if unicodedata.category(c) != 'Mn'])
    
    return text


def calculate_wer_custom(reference, hypothesis):
    """
    Calculates Word Error Rate (WER) with normalization applied,
    ensuring minor impact for single-letter words and better handling of deletions and insertions.
    """
    # Normalize both reference and hypothesis
    reference = normalize_text(reference)
    hypothesis = normalize_text(hypothesis)

    # Tokenize into words
    reference_words = reference.split()
    hypothesis_words = hypothesis.split()

    # Initialize substitution, deletion, and insertion counters
    substitutions, deletions, insertions = 0, 0, 0

    # Align reference and hypothesis words using dynamic programming (Levenshtein distance)
    ref_len, hyp_len = len(reference_words), len(hypothesis_words)
    dp = [[0] * (hyp_len + 1) for _ in range(ref_len + 1)]

    # Fill the DP table
    for i in range(ref_len + 1):
        for j in range(hyp_len + 1):
            if i == 0:
                dp[i][j] = j  # All insertions
            elif j == 0:
                dp[i][j] = i  # All deletions
            elif reference_words[i - 1] == hypothesis_words[j - 1]:
                dp[i][j] = dp[i - 1][j - 1]  # No penalty for a match
            else:
                # Assign weights: 0.3 for single-letter mismatches, 1 for others
                subst_cost = (
                    0.3
                    if len(reference_words[i - 1]) == 1 or len(hypothesis_words[j - 1]) == 1
                    else 1
                )
                dp[i][j] = min(
                    dp[i - 1][j - 1] + subst_cost,  # Substitution
                    dp[i - 1][j] + 1,  # Deletion
                    dp[i][j - 1] + 1,  # Insertion
                )

    # Backtrack to count substitutions, deletions, and insertions
    i, j = ref_len, hyp_len
    while i > 0 and j > 0:
        if reference_words[i - 1] == hypothesis_words[j - 1]:
            i -= 1
            j -= 1
        elif dp[i][j] == dp[i - 1][j - 1] + (
            0.3 if len(reference_words[i - 1]) == 1 or len(hypothesis_words[j - 1]) == 1 else 1
        ):
            substitutions += 0.3 if len(reference_words[i - 1]) == 1 or len(hypothesis_words[j - 1]) == 1 else 1
            i -= 1
            j -= 1
        elif dp[i][j] == dp[i - 1][j] + 1:
            deletions += 0.3 if len(reference_words[i - 1]) == 1 else 1
            i -= 1
        else:
            insertions += 0.3 if len(hypothesis_words[j - 1]) == 1 else 1
            j -= 1

    # Handle remaining deletions and insertions
    while i > 0:
        deletions += 0.3 if len(reference_words[i - 1]) == 1 else 1
        i -= 1
    while j > 0:
        insertions += 0.3 if len(hypothesis_words[j - 1]) == 1 else 1
        j -= 1

    # Calculate WER
    reference_length = len(reference_words)

    # Handle edge case for empty reference
    if reference_length == 0:
        return 100.0 if substitutions + deletions + insertions > 0 else 0.0

    wer = ((substitutions + deletions + insertions) / reference_length) * 100
    return round(wer, 2)


# Evaluation function
def evaluate_asr():
    audio_files = [
        {"file": "Ex4_audio_files/EN/checkin.wav", "lang": "en", "truth": "Where is the check-in desk?"},
        {"file": "Ex4_audio_files/EN/parents.wav", "lang": "en", "truth": "I have lost my parents."},
        {"file": "Ex4_audio_files/EN/suitcase.wav", "lang": "en", "truth": "Please, I have lost my suitcase."},
        {"file": "Ex4_audio_files/EN/what_time.wav", "lang": "en", "truth": "What time is my plane?"},
        {"file": "Ex4_audio_files/EN/where.wav", "lang": "en", "truth": "Where are the restaurants and shops?"},
        
        {"file": "Ex4_audio_files/IT/checkin_it.wav", "lang": "it", "truth": "Dove e' il bancone?"},
        {"file": "Ex4_audio_files/IT/parents_it.wav", "lang": "it", "truth": "Ho perso i miei genitori."},
        {"file": "Ex4_audio_files/IT/suitcase_it.wav", "lang": "it", "truth": "Per favore, ho perso la mia valigia."},
        {"file": "Ex4_audio_files/IT/what_time_it.wav", "lang": "it", "truth": "A che ora e' il mio aereo?"},
        {"file": "Ex4_audio_files/IT/where_it.wav", "lang": "it", "truth": "Dove sono i ristoranti e i negozi?"},

        {"file": "Ex4_audio_files/ES/checkin_es.wav", "lang": "es", "truth": "¿Dónde están los mostradores?"},
        {"file": "Ex4_audio_files/ES/parents_es.wav", "lang": "es", "truth": "He perdido a mis padres."},
        {"file": "Ex4_audio_files/ES/suitcase_es.wav", "lang": "es", "truth": "Por favor, he perdido mi maleta."},
        {"file": "Ex4_audio_files/ES/what_time_es.wav", "lang": "es", "truth": "¿A qué hora es mi avión?"},
        {"file": "Ex4_audio_files/ES/where_es.wav", "lang": "es", "truth": "¿Dónde están los restaurantes y las tiendas?"}
    ]

    EN_MODEL = "models/en/deepspeech-0.9.3-models.pbmm"
    EN_SCORER = "models/en/deepspeech-0.9.3-models.scorer"
    ES_MODEL = "models/es/output_graph_es.pbmm"
    ES_SCORER = "models/es/kenlm_es.scorer"
    IT_MODEL = "models/it/output_graph_it.pbmm"
    IT_SCORER = "models/it/kenlm_it.scorer"

    results = []
    language_wer = {"EN": [], "IT": [], "ES": []}

    for item in audio_files:
        file = item["file"]
        lang = item["lang"]
        truth = item["truth"]

        print(f"\nProcessing {file} ({lang.upper()})...")
        preprocessed_file = f"preprocessed_{os.path.basename(file)}"
        recognized_text = ""

        try:
            # Preprocess audio
            preprocess_audio(file, preprocessed_file)

            # Transcription
            if lang == "en":
                recognized_text = transcribe_deepspeech(preprocessed_file, EN_MODEL, EN_SCORER)
            elif lang == "es":
                recognized_text = transcribe_deepspeech(preprocessed_file, ES_MODEL, ES_SCORER)
            elif lang == "it":
                recognized_text = transcribe_deepspeech(preprocessed_file, IT_MODEL, IT_SCORER)
            else:
                recognized_text = "Unsupported Language"
        except Exception as e:
            recognized_text = f"Error: {e}"

        # Calculate WER
        wer = calculate_wer_custom(truth, recognized_text)
        print(f"Reference: {truth}")
        print(f"Recognized: {recognized_text}")
        print(f"WER: {wer}%")

        # Append WER to language-specific list
        language_wer[lang.upper()].append(wer)

        results.append({
            "File": file,
            "Language": lang.upper(),
            "WER": wer
        })

    # Print results
    print("\n--- Evaluation Results ---")
    print(f"{'File':<30} {'Language':<10} {'WER (%)':<10}")
    
    for res in results:
        print(f"{res['File']:<30} {res['Language']:<10} {res['WER']:<10}")

    # Calculate and display average WER per language
    print("\n--- Average WER by Language ---")
    for lang, wer_list in language_wer.items():
        if wer_list:
            avg_wer = sum(wer_list) / len(wer_list)
            print(f"{lang}: {avg_wer:.2f}%")
        else:
            print(f"{lang}: No data available.")

if __name__ == "__main__":
    evaluate_asr()


Exception ignored in: <function Model.__del__ at 0x000001B9B24B70D0>
Traceback (most recent call last):
  File "c:\Users\pan shengxin\AppData\Local\Programs\Python\Python38\lib\site-packages\vosk\__init__.py", line 60, in __del__
    _c.vosk_model_free(self._handle)
AttributeError: 'Model' object has no attribute '_handle'



Processing Ex4_audio_files/EN/checkin.wav (EN)...
Preprocessing Ex4_audio_files/EN/checkin.wav...
Analyzing audio for parameter selection...
Audio duration (for analysis): 2.76s
Detected noise level: -26.64 dBFS
RMS dB: -26.64, Peak dB: -12.02, Dynamic Range: 14.62 dB
Analysis complete. Parameters chosen:
Noise Prop Decrease: 0.2
Apply Compression: False, Threshold: None, Ratio: None
EQ: Low-pass 3000 Hz, High-pass 200 Hz, Skip EQ: False
Loading Ex4_audio_files/EN/checkin.wav for adaptive noise reduction...
Noise-reduced audio saved to temp_noise_reduced.wav
Applying equalization to temp_noise_reduced.wav...
Equalized audio saved to temp_equalized.wav
Converting to mono, adjusting sample rate, and normalizing final audio...
Final audio duration: 2.76s
Final preprocessed audio saved to preprocessed_checkin.wav, length: 2.76s
Processing audio file: preprocessed_checkin.wav
Model and scorer loaded successfully.
Reference: Where is the check-in desk?
Recognized: where is the check in desk

# open-ai whisper


In [35]:
!pip install openai-whisper
!pip install torch torchaudio --index-url https://download.pytorch.org/whl/cpu


Collecting openai-whisper
  Downloading openai-whisper-20240930.tar.gz (800 kB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
    Preparing wheel metadata: started
    Preparing wheel metadata: finished with status 'done'
Collecting tiktoken
  Downloading tiktoken-0.7.0-cp38-cp38-win_amd64.whl (798 kB)
Collecting more-itertools
  Downloading more_itertools-10.5.0-py3-none-any.whl (60 kB)
Building wheels for collected packages: openai-whisper
  Building wheel for openai-whisper (PEP 517): started
  Building wheel for openai-whisper (PEP 517): finished with status 'done'
  Created wheel for openai-whisper: filename=openai_whisper-20240930-py3-none-any.whl size=803358 sha256=3ca509301f3832fb377f6e4595b5a085f9c9383e5cdfa800f3db862ef0f35da1
  Stored in directory: c:\users\pan shengxin\appdata\local\pip\cache\wheels\58\9f\3f

You should consider upgrading via the 'c:\users\pan shengxin\appdata\local\programs\python\python38\python.exe -m pip install --upgrade pip' command.
You should consider upgrading via the 'c:\users\pan shengxin\appdata\local\programs\python\python38\python.exe -m pip install --upgrade pip' command.


Looking in indexes: https://download.pytorch.org/whl/cpu


In [49]:
import os
import librosa
import soundfile as sf
import unicodedata
import re
import whisper
import jiwer
from pydub import AudioSegment, effects
import noisereduce as nr

# Normalize text
def normalize_text(text):
    """
    Normalizes text by:
    - Converting to lowercase
    - Replacing hyphens with spaces (to separate compound words)
    - Removing other punctuation
    - Removing accents from characters
    """
    # Convert all text to lowercase
    text = text.lower()
    # Replace hyphens with spaces to separate words
    text = re.sub(r"-", " ", text)
    # Remove remaining punctuation, except for word and space characters
    text = re.sub(r"[^\w\s]", "", text)
    # Remove accents from characters
    text = unicodedata.normalize('NFD', text)
    text = "".join([c for c in text if unicodedata.category(c) != 'Mn'])
    return text
# Time-stretching to slow down audio without affecting pitch
def slow_down_preserving_pitch(input_file, output_file, stretch_factor=0.7):
    """
    Slows down audio while preserving pitch using librosa's time_stretch.
    - stretch_factor: Less than 1 slows down; greater than 1 speeds up.
    """
    try:
        y, sr = librosa.load(input_file, sr=None)
        y_slowed = librosa.effects.time_stretch(y, rate=stretch_factor)
        sf.write(output_file, y_slowed, sr)
        print(f"Processed audio saved to {output_file}")
    except Exception as e:
        print(f"Error in slowing down audio: {e}")
        sf.write(output_file, y, sr)



# Noise reduction (adaptive approach)
def adaptive_noise_reduction(input_file, output_file, prop_decrease):
    print(f"Loading {input_file} for adaptive noise reduction...")
    try:
        y, sr = librosa.load(input_file, sr=16000)
        if len(y) == 0:
            print("Warning: Input audio is empty. Copying as is.")
            sf.write(output_file, y, sr)
            return output_file

        noise_profile = y[:int(sr * 0.5)]  # Use first 0.5 seconds for noise profile
        y_denoised = nr.reduce_noise(y=y, y_noise=noise_profile, sr=sr, prop_decrease=prop_decrease)
        sf.write(output_file, y_denoised, sr)
        print(f"Noise-reduced audio saved to {output_file}")
        return output_file
    except Exception as e:
        print(f"Error in noise reduction: {e}")
        return input_file

# Compression and amplification
def compress_and_amplify(input_file, output_file, threshold, ratio):
    print(f"Compressing and amplifying speech in {input_file}...")
    try:
        if not os.path.exists(input_file):
            print("Error: Input file does not exist for compression.")
            return None

        audio = AudioSegment.from_wav(input_file)
        if len(audio) == 0:
            print("Warning: Audio is empty during compression. Copying as is.")
            audio.export(output_file, format="wav")
            return output_file

        compressed_audio = effects.compress_dynamic_range(
            audio,
            threshold=threshold,
            ratio=ratio,
            attack=20.0,
            release=100.0
        )

        normalized_audio = effects.normalize(compressed_audio)
        normalized_audio.export(output_file, format="wav")
        print(f"Compressed and amplified audio saved to {output_file}")
        return output_file
    except Exception as e:
        print(f"Error in compression and amplification: {e}")
        return input_file

# Equalization
def apply_equalization(input_file, output_file, low_pass_cutoff, high_pass_cutoff):
    print(f"Applying equalization to {input_file}...")
    try:
        if not os.path.exists(input_file):
            print("Error: Input file does not exist for EQ.")
            return None

        audio = AudioSegment.from_wav(input_file)
        if len(audio) == 0:
            print("Warning: Audio is empty during EQ. Copying as is.")
            audio.export(output_file, format="wav")
            return output_file

        boosted_audio = audio.high_pass_filter(high_pass_cutoff).low_pass_filter(low_pass_cutoff)
        boosted_audio.export(output_file, format="wav")
        print(f"Equalized audio saved to {output_file}")
        return output_file
    except Exception as e:
        print(f"Error in equalization: {e}")
        return input_file

# Analyze audio characteristics
def analyze_audio(input_file):
    print("Analyzing audio for parameter selection...")
    try:
        y, sr = librosa.load(input_file, sr=16000, mono=True)
        duration = librosa.get_duration(y=y, sr=sr)
        print(f"Audio duration (for analysis): {duration:.2f}s")

        noise_level = detect_noise_level(input_file)

        if len(y) > 0:
            rms = np.sqrt(np.mean(y**2))
            peak = np.max(np.abs(y))
        else:
            rms = 0
            peak = 0

        rms_db = 20 * np.log10(rms) if rms > 0 else -100
        peak_db = 20 * np.log10(peak) if peak > 0 else -100
        dynamic_range = peak_db - rms_db
        print(f"RMS dB: {rms_db:.2f}, Peak dB: {peak_db:.2f}, Dynamic Range: {dynamic_range:.2f} dB")

        # Frequency analysis for EQ decisions
        if len(y) > 0:
            S = np.abs(librosa.stft(y, n_fft=1024, hop_length=512))
            freqs = librosa.fft_frequencies(sr=sr, n_fft=1024)
            low_freq_mask = freqs < 300
            high_freq_mask = freqs > 3000

            low_energy = np.mean(S[low_freq_mask, :]) if np.any(low_freq_mask) else 0
            high_energy = np.mean(S[high_freq_mask, :]) if np.any(high_freq_mask) else 0
            mean_energy = np.mean(S)
        else:
            low_energy = 0
            high_energy = 0
            mean_energy = 0

        # Adjust noise reduction settings for short audio
        if noise_level > -25:
            noise_prop_decrease = 0.3 # Stronger noise reduction for very noisy audio
        elif noise_level > -35:
            noise_prop_decrease = 0.2 # Moderate noise reduction
        else:
            noise_prop_decrease = 0.1  # Light noise reduction for clean audio

        # Adjust compression logic for short but valid audio
        if dynamic_range > 18 and duration > 3.0:
            apply_compression = True
            comp_threshold = -25.0
            comp_ratio = 1.5
        elif dynamic_range > 18 and duration <= 3.0:
            apply_compression = True
            comp_threshold = -22.0  # Lower threshold for shorter audio
            comp_ratio = 1.2  # Slightly weaker compression
        else:
            apply_compression = False
            comp_threshold = None
            comp_ratio = None

        # Refined EQ logic for short audio
        if mean_energy > 0:
            if low_energy > (mean_energy * 1.5):
                high_pass_cutoff = 400
            else:
                high_pass_cutoff = 250

            if high_energy > (mean_energy * 1.5):
                low_pass_cutoff = 3500
            else:
                low_pass_cutoff = 4500
        else:
            high_pass_cutoff = 250
            low_pass_cutoff = 4500

        # Adjust for short audio (2–3 seconds)
        if duration < 3.0:
            skip_eq = False  # Apply EQ since audio is valid for full processing
            noise_prop_decrease = 0.2  # Moderate noise reduction
        else:
            skip_eq = False  # Always process EQ for longer audio

        # Skip EQ for clean audio with very low noise
        if noise_level < -30:
            skip_eq = True

        print("Analysis complete. Parameters chosen:")
        print(f"Noise Prop Decrease: {noise_prop_decrease}")
        print(f"Apply Compression: {apply_compression}, Threshold: {comp_threshold}, Ratio: {comp_ratio}")
        print(f"EQ: Low-pass {low_pass_cutoff} Hz, High-pass {high_pass_cutoff} Hz, Skip EQ: {skip_eq}")

        return {
            'noise_prop_decrease': noise_prop_decrease,
            'apply_compression': apply_compression,
            'comp_threshold': comp_threshold,
            'comp_ratio': comp_ratio,
            'high_pass_cutoff': high_pass_cutoff,
            'low_pass_cutoff': low_pass_cutoff,
            'skip_eq': skip_eq
        }
    except Exception as e:
        print(f"Error in analyzing audio: {e}")
        return {
            'noise_prop_decrease': 0.2,
            'apply_compression': False,
            'comp_threshold': None,
            'comp_ratio': None,
            'high_pass_cutoff': 200,
            'low_pass_cutoff': 3000,
            'skip_eq': True
        }

# Noise level detection
def detect_noise_level(input_file):
    try:
        audio = AudioSegment.from_wav(input_file)
        noise_level = audio.dBFS
        print(f"Detected noise level: {noise_level:.2f} dBFS")
        return noise_level
    except Exception as e:
        print(f"Error in detecting noise level: {e}")
        return -100

# Preprocess audio pipeline
def preprocess_audio(input_file, output_file):
    if not os.path.exists(input_file):
        print(f"Error: File {input_file} does not exist.")
        return None

    print(f"Preprocessing {input_file}...")

    try:
        # Load audio and check duration
        audio = AudioSegment.from_wav(input_file)
        audio_duration = len(audio) / 1000.0  # Convert from ms to seconds

        if audio_duration < 1.4:
            print("Audio is too short and may be too fast. Slowing down...")
            temp_slowed_file = "temp_slowed.wav"
            slow_down_preserving_pitch(input_file, temp_slowed_file, stretch_factor=0.85)
            input_file = temp_slowed_file  # Update input file to the slowed version

        # Reanalyze audio after slowing down (if applicable)
        params = analyze_audio(input_file)

        # Perform noise reduction
        noise_reduced_file = "temp_noise_reduced.wav"
        adaptive_noise_reduction(input_file, noise_reduced_file, params['noise_prop_decrease'])

        processed_file = noise_reduced_file

        # Apply compression if needed
        if params['apply_compression']:
            compressed_file = "temp_compressed.wav"
            compress_and_amplify(processed_file, compressed_file, params['comp_threshold'], params['comp_ratio'])
            processed_file = compressed_file

        # Apply EQ if not skipped
        if not params['skip_eq']:
            equalized_file = "temp_equalized.wav"
            apply_equalization(processed_file, equalized_file, params['low_pass_cutoff'], params['high_pass_cutoff'])
            processed_file = equalized_file

        # Convert to mono, 16kHz, and normalize at the end
        print("Converting to mono, adjusting sample rate, and normalizing final audio...")
        if not os.path.exists(processed_file):
            print("Error: Processed file does not exist, aborting.")
            return None

        final_audio = AudioSegment.from_wav(processed_file).set_channels(1).set_frame_rate(16000)
        final_audio = effects.normalize(final_audio)

        final_length = len(final_audio) / 1000.0
        print(f"Final audio duration: {final_length:.2f}s")

        final_audio.export(output_file, format="wav")
        print(f"Final preprocessed audio saved to {output_file}, length: {final_length:.2f}s")

        return output_file
    except Exception as e:
        print(f"Error in preprocessing audio: {e}")
        return None
# Transcribe using Whisper
def transcribe_whisper(audio_file, model_type="base"):
    print(f"Processing audio file: {audio_file} with Whisper")
    try:
        model = whisper.load_model(model_type)
        result = model.transcribe(audio_file)
        return result["text"]
    except Exception as e:
        print(f"Error processing audio with Whisper: {e}")
        return "Error: Failed to process audio with Whisper."

# Calculate WER
def calculate_wer_custom(reference, hypothesis):
    """
    Calculates Word Error Rate (WER) with normalization applied.
    """
    reference = normalize_text(reference)
    hypothesis = normalize_text(hypothesis)
    return jiwer.wer(reference, hypothesis) * 100

# Evaluation function
def evaluate_asr():
    audio_files = [
        {"file": "Ex4_audio_files/EN/checkin.wav", "lang": "en", "truth": "Where is the check-in desk?"},
        {"file": "Ex4_audio_files/EN/parents.wav", "lang": "en", "truth": "I have lost my parents."},
        {"file": "Ex4_audio_files/EN/suitcase.wav", "lang": "en", "truth": "Please, I have lost my suitcase."},
        {"file": "Ex4_audio_files/EN/what_time.wav", "lang": "en", "truth": "What time is my plane?"},
        {"file": "Ex4_audio_files/EN/where.wav", "lang": "en", "truth": "Where are the restaurants and shops?"},
        {"file": "Ex4_audio_files/IT/checkin_it.wav", "lang": "it", "truth": "Dove e' il bancone?"},
        {"file": "Ex4_audio_files/IT/parents_it.wav", "lang": "it", "truth": "Ho perso i miei genitori."},
        {"file": "Ex4_audio_files/IT/suitcase_it.wav", "lang": "it", "truth": "Per favore, ho perso la mia valigia."},
        {"file": "Ex4_audio_files/IT/what_time_it.wav", "lang": "it", "truth": "A che ora e' il mio aereo?"},
        {"file": "Ex4_audio_files/IT/where_it.wav", "lang": "it", "truth": "Dove sono i ristoranti e i negozi?"},
        {"file": "Ex4_audio_files/ES/checkin_es.wav", "lang": "es", "truth": "¿Dónde están los mostradores?"},
        {"file": "Ex4_audio_files/ES/parents_es.wav", "lang": "es", "truth": "He perdido a mis padres."},
        {"file": "Ex4_audio_files/ES/suitcase_es.wav", "lang": "es", "truth": "Por favor, he perdido mi maleta."},
        {"file": "Ex4_audio_files/ES/what_time_es.wav", "lang": "es", "truth": "¿A qué hora es mi avión?"},
        {"file": "Ex4_audio_files/ES/where_es.wav", "lang": "es", "truth": "¿Dónde están los restaurantes y las tiendas?"}
    ]

    results = []
    language_wer = {"EN": [], "IT": [], "ES": []}

    for item in audio_files:
        file = item["file"]
        lang = item["lang"]
        truth = item["truth"]

        print(f"\nProcessing {file} ({lang.upper()})...")
        preprocessed_file = f"preprocessed_{os.path.basename(file)}"
        recognized_text = ""

        try:
            preprocess_audio(file, preprocessed_file)
            recognized_text = transcribe_whisper(preprocessed_file)
        except Exception as e:
            recognized_text = f"Error: {e}"

        wer = calculate_wer_custom(truth, recognized_text)
        print(f"Reference: {truth}")
        print(f"Recognized: {recognized_text}")
        print(f"WER: {wer}%")

        language_wer[lang.upper()].append(wer)

        results.append({"File": file, "Language": lang.upper(), "WER": wer})

    print("\n--- Evaluation Results ---")
    for res in results:
        print(f"{res['File']:<30} {res['Language']:<10} {res['WER']:<10}")

    print("\n--- Average WER by Language ---")
    for lang, wer_list in language_wer.items():
        avg_wer = sum(wer_list) / len(wer_list) if wer_list else 0
        print(f"{lang}: {avg_wer:.2f}%")

if __name__ == "__main__":
    evaluate_asr()



Processing Ex4_audio_files/EN/checkin.wav (EN)...
Preprocessing Ex4_audio_files/EN/checkin.wav...
Analyzing audio for parameter selection...
Audio duration (for analysis): 2.76s
Detected noise level: -26.64 dBFS
RMS dB: -26.64, Peak dB: -12.02, Dynamic Range: 14.62 dB
Analysis complete. Parameters chosen:
Noise Prop Decrease: 0.2
Apply Compression: False, Threshold: None, Ratio: None
EQ: Low-pass 4500 Hz, High-pass 400 Hz, Skip EQ: False
Loading Ex4_audio_files/EN/checkin.wav for adaptive noise reduction...
Noise-reduced audio saved to temp_noise_reduced.wav
Applying equalization to temp_noise_reduced.wav...
Equalized audio saved to temp_equalized.wav
Converting to mono, adjusting sample rate, and normalizing final audio...
Final audio duration: 2.76s
Final preprocessed audio saved to preprocessed_checkin.wav, length: 2.76s
Processing audio file: preprocessed_checkin.wav with Whisper
Reference: Where is the check-in desk?
Recognized:  Where is the check-in desk?
WER: 0.0%

Processing 

KeyboardInterrupt: 