In [9]:
import os
import numpy as np
from numpy.fft import fft, ifft
import librosa
import IPython.display as ipd
import speech_recognition
import soundfile as sf
from io import BytesIO
import jiwer
import soundfile as sf
import sounddevice as sd
import soundfile
import wave
from IPython.display import Audio, Markdown
from collections import defaultdict

# Implement Psola

In [10]:
# implemetation from https://github.com/sannawag/TD-PSOLA/blob/master/td_psola.py
def psola(signal, peaks, f_ratio):
    """
    Time-Domain Pitch Synchronous Overlap and Add
    :param signal: original time-domain signal
    :param peaks: time-domain signal peak indices
    :param f_ratio: pitch shift ratio
    :return: pitch-shifted signal
    """
    N = len(signal)
    # Interpolate
    new_signal = np.zeros(N)
    new_peaks_ref = np.linspace(0, len(peaks) - 1, int(len(peaks) * f_ratio))
    new_peaks = np.zeros(len(new_peaks_ref)).astype(int)
    
    for i in range(len(new_peaks)):
        weight = new_peaks_ref[i] % 1
        left = np.floor(new_peaks_ref[i]).astype(int)
        right = np.ceil(new_peaks_ref[i]).astype(int)
        new_peaks[i] = int(peaks[left] * (1 - weight) + peaks[right] * weight)

    # PSOLA
    for j in range(len(new_peaks)):
        # find the corresponding old peak index
        i = np.argmin(np.abs(peaks - new_peaks[j]))
        # get the distances to adjacent peaks
        P1 = [new_peaks[j] if j == 0 else new_peaks[j] - new_peaks[j-1],
              N - 1 - new_peaks[j] if j == len(new_peaks) - 1 else new_peaks[j+1] - new_peaks[j]]
        # edge case truncation
        if peaks[i] - P1[0] < 0:
            P1[0] = peaks[i]
        if peaks[i] + P1[1] > N - 1:
            P1[1] = N - 1 - peaks[i]
        # linear OLA window
        window = list(np.linspace(0, 1, P1[0] + 1)[1:]) + list(np.linspace(1, 0, P1[1] + 1)[1:])
        # center window from original signal at the new peak
        new_signal[new_peaks[j] - P1[0]: new_peaks[j] + P1[1]] += window * signal[peaks[i] - P1[0]: peaks[i] + P1[1]]
    return new_signal

def compute_periods_per_sequence(signal, sequence, min_period, max_period):
    """
    Computes periodicity of a time-domain signal using autocorrelation
    :param sequence: analysis window length in samples. Computes one periodicity value per window
    :param min_period: smallest allowed periodicity
    :param max_period: largest allowed periodicity
    :return: list of measured periods in windows across the signal
    """
    offset = 0  # current sample offset
    periods = []  # period length of each analysis sequence
    N = len(signal)
    while offset < N:
        fourier = fft(signal[offset: offset + sequence])
        fourier[0] = 0  # remove DC component
        autoc = ifft(fourier * np.conj(fourier)).real
        if len(autoc) <= min_period:
            autoc_peak = min_period + autoc[-1]
        else:    
            autoc_peak = min_period + np.argmax(autoc[min_period: max_period])
        periods.append(autoc_peak)
        offset += sequence
    return periods

def find_peaks(signal, fs, max_hz=950, min_hz=75, analysis_win_ms=40, max_change=1.005, min_change=0.995):
    """
    Find sample indices of peaks in time-domain signal
    :param max_hz: maximum measured fundamental frequency
    :param min_hz: minimum measured fundamental frequency
    :param analysis_win_ms: window size used for autocorrelation analysis
    :param max_change: restrict periodicity to not increase by more than this ratio from the mean
    :param min_change: restrict periodicity to not decrease by more than this ratio from the mean
    :return: peak indices
    """
    N = len(signal)
    min_period = fs // max_hz
    max_period = fs // min_hz

    # compute pitch periodicity
    sequence = int(analysis_win_ms / 1000 * fs)  # analysis sequence length in samples

    periods = compute_periods_per_sequence(signal, sequence, min_period, max_period)

    # simple hack to avoid octave error: assume that the pitch should not vary much, restrict range
    mean_period = np.mean(periods)
    max_period = int(mean_period * 1.1)
    min_period = int(mean_period * 0.9)
    periods = compute_periods_per_sequence(signal, sequence, min_period, max_period)

    # find the peaks
    peaks = [np.argmax(signal[:int(periods[0]*1.1)])]
    while True:
        prev = peaks[-1]
        idx = prev // sequence  # current autocorrelation analysis window
        if prev + int(periods[idx] * max_change) >= N:
            break
        # find maximum near expected location
        peaks.append(prev + int(periods[idx] * min_change) +
                np.argmax(signal[prev + int(periods[idx] * min_change): prev + int(periods[idx] * max_change)]))
    return np.array(peaks)

def shift_pitch(signal, fs, f_ratio):
    """
    Calls psola pitch shifting algorithm
    :param signal: original signal in the time-domain
    :param fs: sample rate
    :param f_ratio: ratio by which the frequency will be shifted
    :return: pitch-shifted signal
    """
    peaks = find_peaks(signal, fs)
    new_signal = psola(signal, peaks, f_ratio)
    return new_signal

In [11]:
def add_noise(input_signal, noise_level):
    white_noise = np.random.randn(len(input_signal))
    mixed_audio = input_signal + noise_level * white_noise[:len(input_signal)]
    return mixed_audio


# Code for calculating metrics

In [12]:
def transcribe_audio(file, recognizer, language):
    # Language Code for Dutch = "nl-NL", for english use "en-US"
    
    # try:
    wav_io = BytesIO()
    sf.write(wav_io, file, 16000, format='WAV')
    wav_io.seek(0)
    
    audio_file = speech_recognition.AudioFile(wav_io)
    with audio_file as source:
            audio_data = recognizer.record(source)
    
    text = recognizer.recognize_whisper(audio_data, language)
    wav_io.close()
    if text == "":
        text = "empty"
    return text  
    # except:
    #     return "empty"
    # finally:
    #    wav_io.close()
      
       

In [13]:
def extract_mfcc(y, n_mfcc=13, rate=16000):
    """
    Extract MFCC features from an audio file.
    
    Parameters:
    file_path (str): Path to the audio file.
    n_mfcc (int): Number of MFCC features to extract.
    
    Returns:
    np.ndarray: Extracted MFCC features.
    """

    mfccs = librosa.feature.mfcc(y=y, sr=rate, n_mfcc=n_mfcc)
    return np.mean(mfccs.T, axis=0)  # Average across time frames

def aggregate_speaker_features(files):
    """
    Aggregate MFCC features for all recordings of a speaker.
    
    Parameters:
    file_paths (list): List of file paths for a single speaker's recordings.
    
    Returns:
    np.ndarray: Aggregated feature vector for the speaker.
    """
    features = [extract_mfcc(file) for file in files]
    return np.mean(features, axis=0)

def get_speaker_files(base_path):
    """
    Get a dictionary of speaker IDs and their corresponding file paths.
    
    Parameters:
    base_path (str): Base directory containing the audio files.
    
    Returns:
    dict: Dictionary with speaker IDs as keys and lists of file paths as values.
    """
    speaker_files = defaultdict(list)
    for file in os.listdir(base_path):
        if file.endswith('.wav'):
            speaker_id = file.split('-')[0]
            speaker_files[speaker_id].append(os.path.join(base_path, file))
    return speaker_files

def calculate_similarity_matrix(utterances):
    """
    Compute the voice similarity matrix for given utterances.
    
    Parameters:
    utterances (list): List of feature vectors representing each speaker.
    
    Returns:
    np.ndarray: Similarity matrix.
    """
    num_speakers = len(utterances)
    similarity_matrix = np.zeros((num_speakers, num_speakers))
    
    for i in range(num_speakers):
        for j in range(num_speakers):
            similarity_matrix[i, j] = np.dot(utterances[i], utterances[j]) / (np.linalg.norm(utterances[i]) * np.linalg.norm(utterances[j]))
    
    return similarity_matrix

def calculate_diagonal_dominance(matrix):
    """
    Calculate the diagonal dominance of a given similarity matrix.
    
    Parameters:
    matrix (np.ndarray): Similarity matrix.
    
    Returns:
    float: Diagonal dominance value.
    """
    N = matrix.shape[0]
    diag_avg = np.mean(np.diag(matrix))
    off_diag_mask = np.ones(matrix.shape, dtype=bool)
    np.fill_diagonal(off_diag_mask, 0)
    off_diag_avg = np.mean(matrix[off_diag_mask])
    
    return abs(diag_avg - off_diag_avg)

def calculate_deid(matrix_oo, matrix_op):
    """
    Calculate the DeID metric given the original and original-protected similarity matrices.
    
    Parameters:
    matrix_oo (np.ndarray): Original similarity matrix.
    matrix_op (np.ndarray): Original-protected similarity matrix.
    
    Returns:
    float: DeID metric.
    """
    D_diag_OO = calculate_diagonal_dominance(matrix_oo)
    D_diag_OP = calculate_diagonal_dominance(matrix_op)
    
    DeID = 1 - (D_diag_OP / D_diag_OO)
    return DeID

def calculate_gvd(matrix_oo, matrix_pp):
    """
    Calculate the GVD metric given the original and pseudonymised similarity matrices.
    
    Parameters:
    matrix_oo (np.ndarray): Original similarity matrix.
    matrix_pp (np.ndarray): Pseudonymised similarity matrix.
    
    Returns:
    float: GVD metric.
    """
    D_diag_OO = calculate_diagonal_dominance(matrix_oo)
    D_diag_PP = calculate_diagonal_dominance(matrix_pp)
    
    GVD = 10 * np.log10(D_diag_PP / D_diag_OO)
    return GVD

# Gender mapping
spk2gender = {
    '1272': 'm', '1462': 'f', '1673': 'f', '174': 'm', '1919': 'f', '1988': 'f', 
    '1993': 'f', '2035': 'f', '2078': 'm', '2086': 'm', '2277': 'f', '2412': 'f', 
    '2428': 'm', '251': 'm', '2803': 'm', '2902': 'm', '3000': 'm', '3081': 'f', 
    '3170': 'm', '3536': 'f', '3576': 'f', '3752': 'm', '3853': 'f', '422': 'm', 
    '5338': 'f', '5536': 'm', '5694': 'm', '5895': 'f', '6241': 'm', '6295': 'm', 
    '6313': 'f', '6319': 'f', '6345': 'f', '652': 'm', '777': 'm', '7850': 'f', 
    '7976': 'm', '8297': 'm', '84': 'f', '8842': 'f'
}


# On dev-clean dataset

In [14]:
data_folder = 'data/dev-clean-audio'
audio_dict = {}
for root, dirs, files in os.walk(data_folder):
    for file in files:
        file_path = os.path.join(root, file)
        file_path = file_path.replace('\\', '/')
        if file.endswith('.wav'):
            temp_dict = {}
            original, rate = librosa.load(file_path, sr=16000)
            temp_dict['original'] = original
            f_ratio = 1.3
            psola_audio = shift_pitch(original, rate, f_ratio)
            temp_dict['psola'] = psola_audio
            noise_audio = add_noise(psola_audio, 0.001)
            temp_dict['noise'] = noise_audio
            key = file.split('.')[0]
            audio_dict[key] = temp_dict
        # elif file.endswith('.txt'):
        #     with open(file_path, 'r') as f:
        #         for line in f:
        #             key, sentence = line.split(' ', 1)  # Split on the first space
        #             key = key.strip()
        #             sentence = sentence.strip()
        #             audio_dict[key]['transcript'] = sentence

trancript_filepath = "data/dev-clean-audio/transcript.txt"
with open(trancript_filepath, 'r') as f:
    for line in f:
        key, sentence = line.split(' ', 1)  # Split on the first space
        key = key.strip()
        sentence = sentence.strip()
        audio_dict[key]['transcript'] = sentence

            

# DeID & GVD

In [15]:
speaker_files = defaultdict(list)
speaker_files_psola = defaultdict(list)
speaker_files_noise = defaultdict(list)
for key, items in audio_dict.items():
   speaker_id = key.split('-')[0] 
   speaker_files[speaker_id].append(items['original'])
   speaker_files_psola[speaker_id].append(items['psola'])
   speaker_files_noise[speaker_id].append(items['noise'])

# Separating male and female speakers
male_speakers = {spk: paths for spk, paths in speaker_files.items() if spk2gender[spk] == 'm'}
female_speakers = {spk: paths for spk, paths in speaker_files.items() if spk2gender[spk] == 'f'}
psola_male_speakers = {spk: paths for spk, paths in speaker_files_psola.items() if spk2gender[spk] == 'm'}
psola_female_speakers = {spk: paths for spk, paths in speaker_files_psola.items() if spk2gender[spk] == 'f'}
noise_male_speakers = {spk: paths for spk, paths in speaker_files_noise.items() if spk2gender[spk] == 'm'}
noise_female_speakers = {spk: paths for spk, paths in speaker_files_noise.items() if spk2gender[spk] == 'f'}

utterances_male = [aggregate_speaker_features(paths) for spk, paths in male_speakers.items()]
utterances_female = [aggregate_speaker_features(paths) for spk, paths in female_speakers.items()]
utterances_psola_male = [aggregate_speaker_features(paths) for spk, paths in psola_male_speakers.items()]
utterances_psola_female = [aggregate_speaker_features(paths) for spk, paths in psola_female_speakers.items()]
utterances_noise_male = [aggregate_speaker_features(paths) for spk, paths in noise_male_speakers.items()]
utterances_noise_female = [aggregate_speaker_features(paths) for spk, paths in noise_female_speakers.items()]

# Calculating similarity matrices for male and female
matrix_oo_male = calculate_similarity_matrix(utterances_male)
matrix_oo_female = calculate_similarity_matrix(utterances_female)

# Psola
# Male DeID and GVD
matrix_op_male_psola = calculate_similarity_matrix(utterances_male + utterances_psola_male)[len(utterances_male):, :len(utterances_male)]
matrix_pp_male_psola = calculate_similarity_matrix(utterances_psola_male)

deid_male_psola = calculate_deid(matrix_oo_male, matrix_op_male_psola)
gvd_male_psola = calculate_gvd(matrix_oo_male, matrix_pp_male_psola)

# Female DeID and GVD
matrix_op_female_psola = calculate_similarity_matrix(utterances_female + utterances_psola_female)[len(utterances_female):, :len(utterances_female)]
matrix_pp_female_psola = calculate_similarity_matrix(utterances_psola_female)

deid_female_psola = calculate_deid(matrix_oo_female, matrix_op_female_psola)
gvd_female_psola = calculate_gvd(matrix_oo_female, matrix_pp_female_psola)

# Displaying results
print(f"Male DeID: {deid_male_psola:.2f}, Male GVD: {gvd_male_psola:.2f}")
print(f"Female DeID: {deid_female_psola:.2f}, Female GVD: {gvd_female_psola:.2f}")

# Psola + Noise
# Male DeID and GVD
matrix_op_male_noise = calculate_similarity_matrix(utterances_male + utterances_noise_male)[len(utterances_male):, :len(utterances_male)]
matrix_pp_male_noise = calculate_similarity_matrix(utterances_noise_male)

deid_male_noise = calculate_deid(matrix_oo_male, matrix_op_male_noise)
gvd_male_noise = calculate_gvd(matrix_oo_male, matrix_pp_male_noise)

# Female DeID and GVD
matrix_op_female_noise = calculate_similarity_matrix(utterances_female + utterances_noise_female)[len(utterances_female):, :len(utterances_female)]
matrix_pp_female_noise = calculate_similarity_matrix(utterances_noise_female)

deid_female_noise = calculate_deid(matrix_oo_female, matrix_op_female_noise)
gvd_female_noise = calculate_gvd(matrix_oo_female, matrix_pp_female_noise)

# Displaying results
print(f"Male DeID: {deid_male_noise:.2f}, Male GVD: {gvd_male_noise:.2f}")
print(f"Female DeID: {deid_female_noise:.2f}, Female GVD: {gvd_female_noise:.2f}")


Male DeID: 0.11, Male GVD: -0.88
Female DeID: 0.12, Female GVD: -0.86
Male DeID: 0.20, Male GVD: -1.43
Female DeID: 0.16, Female GVD: -0.95


# WER

In [16]:
# Initialize lists to store WER values for male and female speakers
clean_wer_male = []
clean_wer_female = []
psola_wer_male = []
psola_wer_female = []
noise_wer_male = []
noise_wer_female = []

recognizer = speech_recognition.Recognizer()

for key in audio_dict.keys():
    transcript = audio_dict[key]['transcript'].lower()
    speaker_id = key.split('-')[0]
    gender = spk2gender[speaker_id]
    
    # clean
    text = transcribe_audio(audio_dict[key]['original'], recognizer, 'tiny')
    if text != "empty":
        if gender == 'm':
            clean_wer_male.append(jiwer.wer(text.lower(), transcript))
        else:
            clean_wer_female.append(jiwer.wer(text.lower(), transcript))
    
    # psola
    text = transcribe_audio(audio_dict[key]['psola'], recognizer, 'tiny')
    if text != "empty":
        if gender == 'm':
            psola_wer_male.append(jiwer.wer(text.lower(), transcript))
        else:
            psola_wer_female.append(jiwer.wer(text.lower(), transcript))
    
    # noise
    text = transcribe_audio(audio_dict[key]['noise'], recognizer, 'tiny')
    if text != "empty":
        if gender == 'm':
            noise_wer_male.append(jiwer.wer(text.lower(), transcript))
        else:
            noise_wer_female.append(jiwer.wer(text.lower(), transcript))

# Calculate average WER for male and female speakers
clean_wer_male_avg = sum(clean_wer_male) / len(clean_wer_male) if clean_wer_male else float('nan')
clean_wer_female_avg = sum(clean_wer_female) / len(clean_wer_female) if clean_wer_female else float('nan')

psola_wer_male_avg = sum(psola_wer_male) / len(psola_wer_male) if psola_wer_male else float('nan')
psola_wer_female_avg = sum(psola_wer_female) / len(psola_wer_female) if psola_wer_female else float('nan')

noise_wer_male_avg = sum(noise_wer_male) / len(noise_wer_male) if noise_wer_male else float('nan')
noise_wer_female_avg = sum(noise_wer_female) / len(noise_wer_female) if noise_wer_female else float('nan')

In [17]:
# Display results
print(f"Before WER Male: {clean_wer_male_avg}")
print(f"Before WER Female: {clean_wer_female_avg}")
print(f"PSOLA WER Male: {psola_wer_male_avg}")
print(f"PSOLA WER Female: {psola_wer_female_avg}")
print(f"PSOLA + noise WER Male: {noise_wer_male_avg}")
print(f"PSOLA + noise WER Female: {noise_wer_female_avg}")

Before WER Male: 0.24064532517504428
Before WER Female: 0.2269092437386394
PSOLA WER Male: 0.2540091962794962
PSOLA WER Female: 0.245975442593483
PSOLA + noise WER Male: 0.26078805544122147
PSOLA + noise WER Female: 0.2507533470189283
