### How we calculate the similarity

The DTW_Distance and similarity calculation formula defined as below:

$$dist(i, j) = \sqrt{\sum_{m} w_{m} |v_{m,i} - v_{m,j}|^{\tau}}$$

$$\text{similarity}_{w_x, w_y} = \exp\left( \frac{ -D(\mathbf{w}_x, \mathbf{w}_y)^{k} }{ |\pi_{\min}| } \right)$$

We defined new DTW calculation function with two parameter (tau and k) show as below:

In [3]:
from numba import njit
import numpy as np

#We use njit to boost up matrix computation efficiency. You may enable by remove '#'.


#@njit
def weighted_minkowski(vec1, vec2,  tau):

    total = 0.0
    for m in range(len(vec1)):
        diff = abs(vec1[m] - vec2[m])
        total += (diff ** tau)
    return np.sqrt(total)

#@njit
def minkowski_dtw_distance(seq1, seq2,  tau, k):
    n, m = len(seq1), len(seq2)
    dtw_matrix = np.full((n+1, m+1), np.inf)
    dtw_matrix[0, 0] = 0.0

    for i in range(1, n+1):
        for j in range(1, m+1):
            cost = weighted_minkowski(seq1[i-1], seq2[j-1], tau)
            dtw_matrix[i, j] = cost + min(dtw_matrix[i-1, j],    # insertion
                                         dtw_matrix[i, j-1],    # deletion
                                         dtw_matrix[i-1, j-1])  # match
    return np.exp(-dtw_matrix[n, m]**k)




In [None]:
# Below are two audio examples from two different speaker.
# We are going to calculate the similarity value between them.

import IPython
import os
import textgrid
import librosa

def get_pathset(paths,file_format=".wav"):
    return [os.path.join(dir, each_file) for dir, mid, files in os.walk(paths) for each_file in files if each_file.endswith(file_format)]
audio_dir =r".\data\speech_files"
audio_path=get_pathset(audio_dir,".wav")
textgrid_path=get_pathset(audio_dir,".TextGrid")
tg1 = textgrid.TextGrid.fromFile(textgrid_path[0])
tg2= textgrid.TextGrid.fromFile(textgrid_path[1])
def get_a_sentence(audio_path, tg):
    audio, sr = librosa.load(audio_path)
    target_sr=16000
    wave_res = librosa.resample(audio, orig_sr=sr, target_sr=target_sr)
    tg_sentence1 = tg[0][1]
    start_sentence1 = int(tg_sentence1.minTime*target_sr) 
    end_sentence1 = int(tg_sentence1.maxTime*target_sr)
    waveform_input1=wave_res[start_sentence1:end_sentence1]
    return waveform_input1

waveform_input1=get_a_sentence(audio_path[0], tg1)
waveform_input2=get_a_sentence(audio_path[1], tg2)


In [20]:
IPython.display.Audio(waveform_input1,rate=16000)

In [21]:
IPython.display.Audio(waveform_input2,rate=16000)

In [27]:
# Get the latent space representation of above waveforms
import torch
from transformers import AutoProcessor, AutoModelForCTC
processor_H = AutoProcessor.from_pretrained("facebook/hubert-large-ls960-ft")
model_H = AutoModelForCTC.from_pretrained("facebook/hubert-large-ls960-ft")


input1=processor_H(waveform_input1, sampling_rate=16000, return_tensors="pt").input_values
input2=processor_H(waveform_input2, sampling_rate=16000, return_tensors="pt").input_values
# First, we could get result from CNN-layers by simply feature extractor functions
model_H.eval() 
with torch.no_grad():
    outputs1 = model_H(input1)[0]
    outputs2 = model_H(input2)[0]


Some weights of the model checkpoint at facebook/hubert-large-ls960-ft were not used when initializing HubertForCTC: ['hubert.encoder.pos_conv_embed.conv.weight_g', 'hubert.encoder.pos_conv_embed.conv.weight_v']
- This IS expected if you are initializing HubertForCTC from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing HubertForCTC from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of HubertForCTC were not initialized from the model checkpoint at facebook/hubert-large-ls960-ft and are newly initialized: ['hubert.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'hubert.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a down-

In [31]:
outputs2[0].shape

torch.Size([99, 32])

In [44]:
# print the similarity value
tau=3.5
k=0.1
similarity_value=minkowski_dtw_distance(np.array(outputs1[0]), np.array(outputs2[0]),  tau, k)
print(similarity_value)

0.11805344579728855


### Use a multithreaded network to find the best values ​​for the optimization parameters tau and k.

In [None]:
from typing import Iterable, List
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset
from timeit import default_timer as timer
from torch.nn import Transformer
from torch import Tensor
from sklearn.model_selection import train_test_split
import copy
import tqdm
import librosa
import seaborn as sns
import torch.nn as nn
import torch
import torch.nn.functional as F

import math
import os
import pandas as pd
import matplotlib.pyplot as plt
from scipy.spatial.distance import euclidean
import plotly.graph_objects as go
import pandas as pd
#import jiwer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
import pickle
from transformers import AutoProcessor, AutoModelForCTC
#from phonemizer.backend.espeak.wrapper import EspeakWrapper
import soundfile as sf
from sklearn.manifold import TSNE
processor_H = AutoProcessor.from_pretrained("facebook/hubert-large-ls960-ft")
model_H = AutoModelForCTC.from_pretrained("facebook/hubert-large-ls960-ft")




Some weights of the model checkpoint at facebook/hubert-large-ls960-ft were not used when initializing HubertForCTC: ['hubert.encoder.pos_conv_embed.conv.weight_g', 'hubert.encoder.pos_conv_embed.conv.weight_v']
- This IS expected if you are initializing HubertForCTC from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing HubertForCTC from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of HubertForCTC were not initialized from the model checkpoint at facebook/hubert-large-ls960-ft and are newly initialized: ['hubert.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'hubert.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a down-

In [4]:
human_result_path=r".\data\BBP-2023-TestData.xlsx"
human_result = pd.read_excel(human_result_path)
human_result=human_result[human_result["group"]!="control"]

tg_sentence_list=['A BOY FELL FROM A WINDOW',
 'BIG DOGS CAN BE DANGEROUS',
 'THE SHOES WERE VERY DIRTY',
 'THE PLAYER LOST A SHOE',
 "SHE'S DRINKING FROM HER OWN CUP",
 'THE PICTURE CAME FROM A BOOK',
 'THE CAR IS GOING TOO FAST',
 'THE PAINT DRIPPED ON THE GROUND',
 'THE TOWEL FELL ON THE FLOOR',
 'THE KITCHEN WINDOW WAS CLEAN',
 'THE MAILMAN BROUGHT A LETTER',
 'THE MOTHER HEARD THE BABY',
 'SHE FOUND HER PURSE IN THE TRASH',
 "IT'S TIME TO GO TO BED",
 'MOTHER READ THE INSTRUCTIONS',
 'THE DOG IS EATING SOME MEAT',
 'THE PAINTER USES A BRUSH',
 'SWIMMERS CAN HOLD THEIR BREATH',
 "THEY'RE PUSHING AN OLD CAR",
 'THEY HAD TWO EMPTY BOTTLES',
 'THE DOG SLEEPS IN A BASKET',
 "THEY'RE PLAYING IN THE PARK",
 'THE BABY SLEPT ALL NIGHT',
 'THE SALT SHAKER IS EMPTY',
 'THE POLICEMAN KNOWS THE WAY',
 'THE BUCKETS FILL UP QUICKLY',
 'THE JANITOR SWEPT THE FLOOR',
 'THE LADY WASHED THE SHIRT',
 'THE MATCH BOXES ARE EMPTY',
 'THE MAN IS PAINTING A SIGN',
 'THE DOG CAME HOME AT LAST',
 'THEY HEARD A FUNNY NOISE',
 'THEY FOUND HIS BROTHER HIDING',
 'THE DOG PLAYED WITH A STICK',
 'THE BOOK TELLS A STORY',
 'THE MATCHES ARE ON A SHELF',
 'THE TEAM IS PLAYING WELL',
 'THE SHIRTS ARE IN THE CLOSET',
 'THEY WATCHED THE SCARY MOVIE',
 'THE TALL MAN TIED HIS SHOES',
 'A LETTER FELL ON THE FLOOR',
 'THE BALL BOUNCED VERY HIGH',
 'MOTHER CUT THE BIRTHDAY CAKE',
 'THE FOOTBALL GAME IS OVER',
 'SHE STOOD NEAR THE WINDOW',
 'SHE USES HER SPOON TO EAT',
 'THE CAT LAY ON THE BED',
 "HE'S WASHING HIS FACE WITH SOAP",
 'THE DOG IS CHASING THE CAT',
 'THE BABY HAS BLUE EYES',
 'THE BAG FELL OFF THE SHELF',
 'THEY KNOCKED ON THE WINDOW',
 'THE FOOTBALL HIT THE GOALPOST',
 'MOTHER GOT A SAUCEPAN',
 'THE BABY WANTS HIS BOTTLE',
 'THE BALL BROKE THE WINDOW',
 'THE WAITER BROUGHT THE CREAM',
 'THE GIRL IS WASHING HER HAIR',
 'THE GIRL PLAYED WITH THE BABY',
 'THEY ARE DRINKING COFFEE']

talkers = ['BRP', 'FAR', 'TUR', 'SPA']

In [6]:
def sim_measure(df, distance_matrix, tg_sentence_list, talkers):
    talker_to_idx = {talker: idx for idx, talker in enumerate(talkers)}
    
    sentence_idx = tg_sentence_list.index(df['sentence_test'].iloc[0])
  
    df['training_suffix'] = df['training_condition'].str[-3:]
    df['test_talker'] = df['speaker_test'].str[-3:]
    mask = df['training_condition'].str.startswith('b')
    df['similarity'] = 0.0
    if mask.any():
        ind_T = df.loc[mask, 'training_suffix'].map(talker_to_idx).values
        ind_test = df.loc[mask, 'test_talker'].map(talker_to_idx).values
        arr = distance_matrix[sentence_idx][ind_T]
        df.loc[mask, 'similarity'] = arr[np.arange(len(arr)), ind_test]
    
    if (~mask).any():
        ind_test = df.loc[~mask, 'test_talker'].map(talker_to_idx).values
        arr = distance_matrix[sentence_idx][ind_test]
        masked_arr = np.where(arr == 1, np.nan, arr)
        means = np.nanmean(masked_arr, axis=1)
        df.loc[~mask, 'similarity'] = np.nan_to_num(means) 
    df = df.rename(columns={
        'st_mt_cnt': 'Condition',
        'training_condition': 'TrainingTalker',
        'speaker_test': 'TestTalker',
        'id2': 'ListenerID',
        'sentence_test': 'Sentence',
        'score_test': 'numCorrect',
        'possible_score_test': 'numWord'
    }).drop(columns=['training_suffix', 'test_talker'])
    
    return df

In [None]:
import numpy as np
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from IPython.display import clear_output

from rpy2.robjects.packages import importr
from rpy2.robjects import Formula, pandas2ri

import pandas as pd
import rpy2.robjects as ro
from rpy2.robjects import pandas2ri

# Activate the pandas conversion
pandas2ri.activate()

class OptimizerLogger:
    def __init__(self):
        self.history = {'params': [], 'loss': []}
    
    def callback(self, xk):

        current_loss = objective_function(xk,sentence_matrix, human_result, tg_sentence_list, talkers)
        self.history['params'].append(xk.copy())
        self.history['loss'].append(current_loss)
        
        if len(self.history['loss']) % 5 == 0:
            self.plot_progress()

    def plot_progress(self):

        clear_output(wait=True)
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
        
        ax1.plot(self.history['loss'], 'r-', lw=2)
        ax1.set_title('Loss (Negative Log-Likelihood)')
        ax1.set_xlabel('Iteration')
        ax1.grid(True)
        
        params = np.array(self.history['params'])
        ax2.plot(params[:, 0], 'b-', lw=2, label='tau')
        ax2.plot(params[:, 1], 'g--', lw=2, label='k')
        ax2.set_title('Parameter Trajectory')
        ax2.set_xlabel('Iteration')
        ax2.legend()
        ax2.grid(True)
        
        plt.tight_layout()
        plt.show()



def create_distance_matrix(sentence_matrix,tau, k):
    distance_matirx=np.zeros((60,4,4))
    for _,i in enumerate(sentence_matrix):
        for _1,each_talker1 in enumerate(i):
            for _2,each_talker2 in enumerate(i):
                distance_matirx[_][_1][_2]=minkowski_dtw_distance(each_talker1, each_talker2, tau, k)
    return distance_matirx
def objective_function(params, sentence_matrix,human_result, tg_sentence_list, talkers):

    tau, k = params
    
    distance_matrix = create_distance_matrix(sentence_matrix, tau, k)
    result = sim_measure(human_result, distance_matrix, tg_sentence_list, talkers)
    
    def first_value(group):
        return group.iloc[0]
    #result = out_df.groupby(['TrainingTalker', 'TestTalker']).apply(first_value).reset_index(drop=True)
    
    training_suffix = result['TrainingTalker'].str[-3:]
    mask = (
        (result['Condition'] == 'st ') & (training_suffix == result['TestTalker']) |
        (result['Condition'] == 'mt') & (training_suffix != result['TestTalker'])
    )
    result = result[~mask]
    
    result["numIncorrect"]=result["numWord"]-result["numCorrect"]
    result["prop_correct"]=result["numCorrect"]/result["numWord"]

    lme4 = importr('lme4')
    r_data = ro.conversion.py2rpy(result)
    formula = Formula('prop_correct ~ 1 + similarity + (1| TestTalker) + (1|Sentence)')
    model = lme4.glmer(formula, family="binomial", data=r_data)
    log_likelihood = ro.r['logLik'](model)

    #result['similarity'] = result['similarity'].replace(0, 1e-10)
    #log_likelihood = np.mean(np.log(result['similarity']))
    return -log_likelihood 
logger = OptimizerLogger()
with open(".\data\hubert_T24.pkl", "rb") as f:
    sentence_matrix = pickle.load(f)

initial_guess = [1, 1]
bounds = [(0.1, 10.0), (0.1, 10.0)]

optimization_result = minimize(
    fun=objective_function,
    x0=initial_guess,
    args=(sentence_matrix, human_result, tg_sentence_list, talkers),
    method='L-BFGS-B',
    bounds=bounds,
    callback=logger.callback, 
    options={'maxiter': 100, 'disp': True}
)


if optimization_result.success:
    tau_opt, k_opt = optimization_result.x
    print(f"best parameter: tau={tau_opt:.4f}, k={k_opt:.4f}")
    print(f"max log likelihood: {-optimization_result.fun:.4f}")
else:
    print("fail:", optimization_result.message)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

from rpy2.robjects.packages import importr
from rpy2.robjects import Formula, pandas2ri

import pandas as pd
import rpy2.robjects as ro
from rpy2.robjects import pandas2ri
#utils = importr('utils')
#utils.install_packages('lme4')
mu = 0

sigma = 1
x = np.linspace(mu - 1*sigma, mu + 1*sigma, 30)
x=x+11
new_min, new_max = 1, 11
normalized_list1 = list((x[:25] - x[:25].min()) / (x[:25].max() - x[:25].min()) * (new_max - new_min) + new_min)
new_min, new_max = 11, 20
normalized_list2 = list((x[24:] - x[24:].min()) / (x[24:].max() - x[24:].min()) * (new_max - new_min) + new_min)

normalized_list=np.array(normalized_list1[:-1]+normalized_list2)

n_points = 20  
#param1_space = np.linspace(1, 30, n_points)
param1_space = normalized_list
param2_space = np.linspace(0.01, 0.4, 20)
param1_grid, param2_grid = np.meshgrid(param1_space, param2_space)


log_likelihood = np.zeros_like(param1_grid)






def objective_function(params, sentence_matrix,human_result, tg_sentence_list, talkers):

    tau, k = params
    
    distance_matrix = create_distance_matrix(sentence_matrix, tau, k)
    result = sim_measure(human_result, distance_matrix, tg_sentence_list, talkers)
    
    def first_value(group):
        return group.iloc[0]
    #result = out_df.groupby(['TrainingTalker', 'TestTalker']).apply(first_value).reset_index(drop=True)
    
    training_suffix = result['TrainingTalker'].str[-3:]
    mask = (
        (result['Condition'] == 'st ') & (training_suffix == result['TestTalker']) |
        (result['Condition'] == 'mt') & (training_suffix != result['TestTalker'])
    )
    result = result[~mask]
    
    result["numIncorrect"]=result["numWord"]-result["numCorrect"]
    result["prop_correct"]=result["numCorrect"]/result["numWord"]

    lme4 = importr('lme4')
    r_data = ro.conversion.py2rpy(result)
    formula = Formula('prop_correct ~ 1 + similarity + (1| TestTalker) + (1|Sentence)')
    model = lme4.glmer(formula, family="binomial", data=r_data)
    log_likelihood = ro.r['logLik'](model)

    #result['similarity'] = result['similarity'].replace(0, 1e-10)
    #log_likelihood = np.mean(np.log(result['similarity']))
    return -log_likelihood 


for i in range(len(param2_space)):
    for j in range(len(param1_space)):
        params = np.array([param1_grid[i,j], param2_grid[i,j]])
        log_likelihood[i,j] = -objective_function(
            params, 
            sentence_matrix, 
            human_result, 
            tg_sentence_list, 
            talkers
        )


max_idx = np.unravel_index(log_likelihood.argmax(), log_likelihood.shape)
best_params = (param1_grid[max_idx], param2_grid[max_idx])


fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')


surf = ax.plot_surface(
    param1_grid, 
    param2_grid, 
    log_likelihood, 
    cmap='viridis',
    alpha=0.8,
    linewidth=0,
    antialiased=True
)


ax.scatter(
    best_params[0], 
    best_params[1], 
    log_likelihood.max(),
    color='red', 
    s=100, 
    label=f'Max at ({best_params[0]:.2f}, {best_params[1]:.2f})'
)


ax.set_xlabel('Parameter tau', fontsize=12)
ax.set_ylabel('Parameter k', fontsize=12)
ax.set_zlabel('Log-Likelihood', fontsize=12)
ax.set_title('Parameter Space Exploration', fontsize=14)
fig.colorbar(surf, shrink=0.5, aspect=10)


ax.view_init(elev=30, azim=45)


ax.legend()
plt.tight_layout()
plt.show()