In [1]:
import os
import glob
import librosa
import numpy as np
import soundfile as sf
import time
import scipy

def vtlp_filters(fbank_mx, alpha=1.0, f_high=None):
    """
    Apply vocal tract length perturbation (VTLP) to the filterbank matrix.
    :param fbank_mx: filterbank matrix
    :param alpha: warping factor
    :param f_high: maximum frequency for warping
    :return: warped filterbank matrix
    """
    n_filters, n_fft = fbank_mx.shape
    warped_filters = np.zeros((n_filters, n_fft))
    
    if f_high is None:
        f_high = n_fft / 2
    
    for m in range(n_filters):
        for k in range(n_fft):
            f = (n_fft - 1) * k / (n_fft - 1)
            if f < f_high / 2:
                f_warped = alpha * f
            elif f < f_high:
                f_warped = alpha * f + (1 - alpha) * (f_high / 2)
            else:
                f_warped = f
            
            k_warped = int(n_fft * f_warped / (n_fft - 1))
            if k_warped < n_fft:
                warped_filters[m, k_warped] += fbank_mx[m, k]
    
    return warped_filters



def vtlp(input_wav_file, output_wav_file, alpha=1.0, f_high=None):
    """
    Apply vocal tract length perturbation (VTLP) to a .wav file.
    :param input_wav_file: input .wav file
    :param output_wav_file: output .wav file
    :param alpha: warping factor
    :param f_high: maximum frequency for warping
    """
    # Load the input .wav file
    y, sr = librosa.load(input_wav_file)
    
    # Compute the spectrogram
    n_fft = 2048
    hop_length = 512
    win_length = n_fft
    window = scipy.signal.windows.hann(win_length, sym=False)
    
    D = librosa.stft(y, n_fft=n_fft, hop_length=hop_length,
                     win_length=win_length, window=window)
    
    # Compute the filterbank matrix
    n_mels = 256
    fbank_mx = librosa.filters.mel(sr=sr, n_fft=n_fft, n_mels=n_mels)
    
    
    # Apply VTLP to the filterbank matrix
    warped_filters = vtlp_filters(fbank_mx, alpha=alpha, f_high=f_high)
    
     
    # Compute the mel spectrogram using the warped filterbank matrix
    S = np.dot(warped_filters, np.abs(D))
    
       
    start = time.time()
    # Invert the mel spectrogram to audio
    y_hat = librosa.feature.inverse.mel_to_audio(S,
                                                 sr=sr,
                                                 n_fft=n_fft,
                                                 hop_length=hop_length,
                                                 win_length=win_length,
                                                 window=window,
                                                 power=1,
                                                 n_iter=128,
                                                 length=len(y))
    end = time.time()
    print(end - start)
    # Save the output .wav file
    sf.write(output_wav_file, y_hat, sr)
    print("VTLP COMPLETED with alpha: " +str(alpha))

input_folder = 'lt400'
output_folder = 'output_folder'
alpha_values = [ 0.7, 0.85, 1, 1.2, 1.4]

if not os.path.exists(output_folder):
    os.makedirs(output_folder)

for desired_alpha in alpha_values:
    output_folder = os.path.join(output_folder, f'alpha{desired_alpha}')
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for file in glob.glob(os.path.join(input_folder, '*.wav')):
        output_filename = os.path.join(output_folder, os.path.basename(file))
        vtlp(file, output_filename, desired_alpha)

        

3.1470987796783447
VTLP COMPLETED with alpha: 0.7
2.62402081489563
VTLP COMPLETED with alpha: 0.7
4.554767608642578
VTLP COMPLETED with alpha: 0.7
5.672191858291626
VTLP COMPLETED with alpha: 0.7
3.07045316696167
VTLP COMPLETED with alpha: 0.7
2.4246418476104736
VTLP COMPLETED with alpha: 0.7
2.277528762817383
VTLP COMPLETED with alpha: 0.7
1.5671801567077637
VTLP COMPLETED with alpha: 0.7
4.372097969055176
VTLP COMPLETED with alpha: 0.7
2.4639339447021484
VTLP COMPLETED with alpha: 0.7
6.084608316421509
VTLP COMPLETED with alpha: 0.7
3.634899854660034
VTLP COMPLETED with alpha: 0.7
4.996829986572266
VTLP COMPLETED with alpha: 0.7
2.3478825092315674
VTLP COMPLETED with alpha: 0.7
4.7110230922698975
VTLP COMPLETED with alpha: 0.7
3.182436227798462
VTLP COMPLETED with alpha: 0.7
1.8966562747955322
VTLP COMPLETED with alpha: 0.7
3.2827091217041016
VTLP COMPLETED with alpha: 0.7
2.707087993621826
VTLP COMPLETED with alpha: 0.7
2.0991640090942383
VTLP COMPLETED with alpha: 0.7
4.75092315673

In [None]:
import os
import json
import wave
import sys
import json
from vosk import Model, KaldiRecognizer, SetLogLevel
import pandas as pd

def transcribe_files_to_tsv(input_folder, output_filename):
    # Set the log level to 0 to disable debug messages
    SetLogLevel(0)

    model = Model(lang="ca")
    transcriptions = []

    # Iterate through all files in the input folder
    for file in os.listdir(input_folder):
        #print(os.path.basename(file))
        if file.endswith(".wav"):
            wf = wave.open(os.path.join(input_folder, file), "rb")
            if wf.getnchannels() != 1 or wf.getsampwidth() != 2 or wf.getcomptype() != "NONE":
                print(f"Audio file {file} must be WAV format mono PCM.")
                continue

            rec = KaldiRecognizer(model, wf.getframerate())
            rec.SetWords(True)
            rec.SetPartialWords(True)

            # Process the audio file
            last = None
            while True:
                data = wf.readframes(4000)
                if len(data) == 0:
                    break      
                if rec.AcceptWaveform(data):
                    last = rec.Result()
                else:
                    rec.PartialResult()             
                #if rec.AcceptWaveform(data):
                    
            result_json = rec.Result()
            result_dict = json.loads(result_json)
            if(result_dict["text"] == ""):
                try:
                    result_dict = json.loads(last)   
                except:
                    print("Error") 
            #results = result_dict["result"]
            #print(os.path.basename(file)+"saved")
            transcriptions.append((os.path.basename(file), result_dict["text"]))
            #print(result_dict["text"])
                    

    # Convert the list of transcriptions to a DataFrame
    transcriptions_df = pd.DataFrame(transcriptions, columns=['filename', 'transcription'])

    # Save the output to a TSV file
    transcriptions_df.to_csv(output_filename, sep="\t", index=False)


import csv
from pathlib import Path
from jiwer import wer
from torchmetrics.functional.audio.pesq import perceptual_evaluation_speech_quality
from scipy.io import wavfile
import warnings
from pesq import cypesq
import torch
import librosa

# Suppress the specific warning messages
warnings.filterwarnings("ignore", message="To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).")

def read_audio_file(file_path):
    data, fs = librosa.load(file_path, sr=16000)
    data = torch.tensor(data, dtype=torch.float32)
    return fs, data

def calculate_pesq(fs, ref_audio, deg_audio, mode):
    # Detach the tensors from the computation graph and move them to the CPU
    ref_audio_detached = ref_audio.detach().cpu()
    deg_audio_detached = deg_audio.detach().cpu()
    
    try:
        # Call the perceptual_evaluation_speech_quality function with PyTorch tensors
        return perceptual_evaluation_speech_quality(ref_audio_detached, deg_audio_detached, fs, mode)
    except cypesq.NoUtterancesError:
        print("No utterances detected in the input audio files.")
        return 1

def pesq_from_paths(ref_file_path, deg_file_path):
    fs_ref, ref_audio = read_audio_file(ref_file_path)
    fs_deg, deg_audio = read_audio_file(deg_file_path)
    # Ensure the sampling frequencies are the same
    assert fs_ref == fs_deg, "Sampling frequencies must be the same" + str(fs_ref) + str(fs_deg)

    # Choose the mode based on the sampling frequency
    mode = 'wb' if fs_ref == 16000 else 'nb'

    if len(ref_audio) > len(deg_audio):
        pad_length = len(ref_audio) - len(deg_audio)
        deg_audio = torch.cat([deg_audio, torch.zeros(pad_length, dtype=torch.float32)])
    elif len(ref_audio) < len(deg_audio):
        deg_audio = deg_audio[:len(ref_audio)]

    # Calculate PESQ score
    pesq_score = calculate_pesq(fs_ref, ref_audio, deg_audio, mode)
    return pesq_score

def read_tsv_file(tsv_file_path):
    # Read the TSV file and store its contents in a dictionary
    tsv_data = {}

    with open(tsv_file_path, "r") as file:
        tsv_file = csv.reader(file, delimiter="\t")
        next(tsv_file)  # Skip header row
        for line in tsv_file:
            if len(line) == 2:
                filename, transcription = line
                tsv_data[filename] = {
                    "transcription": transcription
                }
            else:
                print(f"Skipping line with unexpected number of values: {line}")

    return tsv_data

def calculate_wer_pesq(original_data, augmented_data, output_file_path, alpha):
    # Calculate WER and PESQ for each file
    output = []
    for filename, data in original_data.items():
        original_transcription = data["transcription"]
        if filename in augmented_data:
            augmented_transcription = augmented_data[filename]["transcription"]
            ref_file_path = f"{ORIGINAL_FOLDER}/{filename}"
            deg_file_path = f"{augmented_folder}/{filename}"
            print(ref_file_path + "  " + deg_file_path)
            try:
                wer_score = wer(original_transcription, augmented_transcription)
            except:
                wer_score = 1
            pesq_score = pesq_from_paths(ref_file_path, deg_file_path)
            #print("PESQ "+ str(pesq_score)+"WER "+str(wer_score))
            output.append([filename, wer_score, str(pesq_score).replace("tensor(", "").replace(")", ""), alpha])
        else:
            print(f"No augmented transcription found for file {filename}")
    # Save the output to a TSV file
    with open(output_file_path, "w", newline="") as file:
        writer = csv.writer(file, delimiter="\t")
        writer.writerow(["Filename", "WER", "PESQ", "alpha"])
        writer.writerows(output)



#transcribe_files_to_tsv("lt400/", "lt400.tsv")
transcribe_files_to_tsv("alpha07/", os.path.join("alpha07", "alpha07.tsv"))
transcribe_files_to_tsv("alpha085/", os.path.join("alpha085", "alpha085.tsv"))
transcribe_files_to_tsv("alpha1/", os.path.join("alpha1", "alpha1.tsv"))
transcribe_files_to_tsv("alpha12/", os.path.join("alpha12", "alpha12.tsv"))
transcribe_files_to_tsv("alpha14/", os.path.join("alpha14", "alpha14.tsv"))

ORIGINAL_FOLDER = "lt400"


tsvoriginal_file_path = Path("lt400/lt400.tsv")
tsv_data = read_tsv_file(tsvoriginal_file_path)


augmented_folder = "alpha07"
alpha05_path = Path("alpha07/alpha07.tsv")
alpha05 = read_tsv_file(alpha05_path)
output_alpha05 = Path("OUTPUT07alpha.tsv")

calculate_wer_pesq(tsv_data, alpha05, output_alpha05,0.7)

augmented_folder = "alpha085"
alpha10_path = Path("alpha085/alpha085.tsv")
alpha10 = read_tsv_file(alpha10_path)
output_alpha10 = Path("OUTPUT085alpha.tsv")

calculate_wer_pesq(tsv_data, alpha10, output_alpha10,0.85)


augmented_folder = "alpha1"
alpha20_path = Path("alpha1/alpha1.tsv")
alpha20 = read_tsv_file(alpha20_path)
output_alpha20 = Path("OUTPUT1alpha.tsv")

calculate_wer_pesq(tsv_data, alpha20, output_alpha20,1)

augmented_folder = "alpha12"
alpha30_path = Path("alpha12/alpha12.tsv")
alpha30 = read_tsv_file(alpha30_path)
output_alpha30 = Path("OUTPUT12alpha.tsv")

calculate_wer_pesq(tsv_data, alpha30, output_alpha30,1.2)

augmented_folder = "alpha14"
alpha40_path = Path("alpha14/alpha14.tsv")
alpha40 = read_tsv_file(alpha40_path)
output_alpha40 = Path("OUTPUT14alpha.tsv")

calculate_wer_pesq(tsv_data, alpha40, output_alpha40,14)


