# Functions in project 2

In [35]:
import keyboard
import time
import wave
import pyaudio
import numpy as np
import IPython.display as ipd
import matplotlib.pyplot as plt

# Divide signal data into 20ms segements with 10ms interval for every two consecutive ones 
def create_segments(signal,sample_rate,width=10):
    num=int(width*sample_rate/1000)
    # first divide signal into 10ms segments
    ten_ms_segments = [signal[i:i+num] for i in range(0, len(signal), num)]
    twenty_ms_segments =[]
    for j in range(len(ten_ms_segments)):
        if j!=0:
            l1=ten_ms_segments[j-1]
            l2=ten_ms_segments[j]
            l=[0 for i in range(num*2)]
            for k in range(len(l1)):
                l[k]=l1[k]
                l[k+len(l1)]=l2[k]
            twenty_ms_segments.append(l)
    return twenty_ms_segments

# Preemphasize each segement
def Preemphasizing(segment):
    pre=np.zeros(len(segment))
    pre[0]=segment[0]
    for i in range(1,len(segment)):
        pre[i]=segment[i]-0.95*segment[i-1]
    return pre

# Window each preemphasized segement
def windowing(pre):
    pre=pre*np.hamming(len(pre))
    return pre

# Zero pad each windowed segement
def zero_padding(windowed):
    #zero padding for FFT
    length=512
    windowed_len=len(windowed)
    padding_len=length-windowed_len
    zeros=np.zeros((padding_len,))
    zero_padded=np.concatenate((windowed,zeros))
    return zero_padded

# Calculate the power spectrum of a segement
def FFT(frame):
    length=512
    fft=np.fft.rfft(frame, length)
    magnitude = np.abs(fft)
    power = ((1.0 / length) * ((magnitude) ** 2)) 

    return power

# Mel warping function
def warping_function(Hz):
    Mel = 2595 * np.log10(1+Hz/700)
    return Mel
# inverse function
def inverse_warping(Mel):
    Hz = 700*(np.power(10,Mel/2595)-1) 
    return Hz

# calculate mel spectra and log mel spectra
def filterbanks(power,filter_num,minHz=133.33,maxHz=6855.4976,length=512):
    maxMel=warping_function(maxHz)
    minMel=warping_function(minHz)
    #get start and end points of triangle filters in Mel
    pointsInMel=np.linspace(minMel,maxMel,filter_num+2)
    #get start and end points of triangle filters in Hz
    pointsInHz=inverse_warping(pointsInMel)
    #get start and end points of tiangle filters in total 257 points
    ranges=np.floor(length/2*pointsInHz/(maxHz-minHz))
    #normalize the triangle filters according to the lower bound
    ranges=ranges-ranges[0]
    #create filter banks with size(number of filters=40, points in power spectrum=257)
    filter_banks=np.zeros((filter_num,len(power)))
    for i in range(1,filter_num+1): 
        #get the left half of the traingle
        for j in range(int(ranges[i-1]),int(ranges[i])):
            filter_banks[i-1,j]=(j-ranges[i-1])/(ranges[i]-ranges[i-1])
        #get the right half of the traingle
        for j in range(int(ranges[i]),int(ranges[i+1])):
            filter_banks[i-1,j]=(ranges[i+1]-j)/(ranges[i+1]-ranges[i])
    #mel spectrum
    filter_banks=filter_banks.T
    Mel=np.dot(power,filter_banks)
    Mel=np.where(Mel==0,np.finfo(float).eps,Mel)
    #log mel spectrum
    Log_Mel=10*np.log(Mel) 
    return Mel,Log_Mel

from scipy.fftpack import dct,idct
# use dct to get the cepstral_coefficients
def mel_cepstrum(log_mel):
    cepstral_coefficients=dct(log_mel,axis=-1, type=2, norm='ortho')
    return cepstral_coefficients[:13]

# get idct
def IDCT(log_mel,num):
    IDCT_coefficients=idct(log_mel,type=2, n=num, norm='ortho')
    return IDCT_coefficients

# get log mel spectrum matrix, mel cpestrum matrix and idct matrix, given segments
def get_matrix(segments,num):
    Mel_cepstrum_matrix=[]
    log_Mel_spectrum_matrix=[]
    IDCT_matrix=[]
    for i in range(len(segments)):
        preemphasized=Preemphasizing(segments[i])
        windowed=windowing(preemphasized)
        zero_padded=zero_padding(windowed)
        power_spectrum=FFT(zero_padded)
        Mel_spectrum, log_Mel_spectrum=filterbanks(power_spectrum,num)
        log_Mel_spectrum_matrix.append(log_Mel_spectrum)
        Mel_cepstrum=mel_cepstrum(log_Mel_spectrum)
        Mel_cepstrum_matrix.append(Mel_cepstrum)
        IDCT_spectrum=IDCT(Mel_cepstrum,num)
        IDCT_matrix.append(IDCT_spectrum)
    return log_Mel_spectrum_matrix,Mel_cepstrum_matrix,IDCT_matrix
#plot the spectrum and cepstrum
def plot_show(matrix, title):
    plt.figure(figsize=(15, 8))
    plt.subplot(311)
    plt.imshow(matrix.T, origin='lower')
    plt.title(title)

#load wav file
def load_wav(file_path):
    wav_file = wave.open(file_path, 'r')
    # Get the audio data
    frames = wav_file.readframes(-1)
    signal = np.frombuffer(frames, dtype=np.int16)

    # Get the sample rate and time axis
    sample_rate = wav_file.getframerate()
    duration = len(signal) / sample_rate
    time = np.linspace(0., duration, len(signal))
    
    # Close the WAV file
    wav_file.close()
    return signal, sample_rate

# Mean subtraction and variance normalization

In [36]:
def mean_subtraction(matrix):
    means = np.mean(matrix, axis=0)/(np.shape(matrix)[0])
    matrix=matrix-means
    return matrix

def var_normalization(matrix):
    sd=np.sqrt(np.sum(np.square(matrix),axis=0)/(np.shape(matrix)[0]))
    return matrix*(1/sd)

## Get cepstrum feature

In [38]:
def get_cepstrum_features(file):
    signal, sample_rate=load_wav(file)
    seg=create_segments(signal,sample_rate)
    log_mel_spectrum_matrix,mel_cepstrum_matrix,IDCT_matrix=get_matrix(seg,40)
    log_mel_spectrum_matrix,mel_cepstrum_matrix,IDCT_matrix=np.array(log_mel_spectrum_matrix),np.array(mel_cepstrum_matrix),np.array(IDCT_matrix)

    mean_subtracted=mean_subtraction(mel_cepstrum_matrix)
    var_normalized=var_normalization(mean_subtracted)
    return var_normalized

## DTW

In [39]:
def euclidean_distance(a, b):
    return np.sqrt(np.sum((a - b) ** 2))


def DTW(input, template):
    # calculate the Euclidean distance between two feature vectors

        
    # Initialize the cost matrix
    rows, cols = len(input), len(template)
    cost_matrix = np.zeros((rows, cols))

    # Fill the cost matrix
    for i in range(rows):
        for j in range(cols):
            cost_matrix[i, j] = euclidean_distance(input[i], template[j])

    # Initialize the accumulated cost matrix
    accumulated_cost = np.zeros((rows, cols))
    accumulated_cost[0, 0] = cost_matrix[0, 0]

    # Fill the first row and column of the accumulated cost matrix
    for i in range(1, rows):
        accumulated_cost[i, 0] = cost_matrix[i, 0] + accumulated_cost[i - 1, 0]

    for j in range(1, cols):
        accumulated_cost[0, j] = cost_matrix[0, j] + accumulated_cost[0, j - 1]

    # Fill the rest of the accumulated cost matrix
    for i in range(1, rows):
        for j in range(1, cols):
            accumulated_cost[i, j] = cost_matrix[i, j] + min(
                accumulated_cost[i - 1, j],         # insertion
                accumulated_cost[i, j - 1],         # deletion
                accumulated_cost[i - 1, j - 1]      # match
            )

    # Return the DTW distance
    return accumulated_cost[rows - 1, cols - 1]

In [40]:
template_feature_matrix=[]
for i in range(10):
    template_feature=get_cepstrum_features(str(i)+'_template.wav')
    template_feature_matrix.append(template_feature)
dtw_result=np.zeros((10,2))
for i in range(10):
    for j in range(1,3):
        training_feature=get_cepstrum_features(str(i)+'_training_'+str(j)+'.wav')
        dtw=DTW(training_feature,template_feature_matrix[0])
        type=0
        for k in range(1,10):
            if DTW(training_feature,template_feature_matrix[k])<dtw:
                dtw=DTW(training_feature,template_feature_matrix[k])
                type=k
        dtw_result[i][j-1]=type

In [41]:
a = np.arange(10).reshape(10, 1)
correct_classifications = np.hstack((a,a))
print(dtw_result)
differing_positions = dtw_result != correct_classifications
print('DTW Accuracy:',1-np.sum(differing_positions)/10)

[[0. 0.]
 [1. 4.]
 [2. 2.]
 [3. 3.]
 [4. 4.]
 [5. 5.]
 [6. 6.]
 [7. 7.]
 [8. 8.]
 [9. 9.]]
DTW Accuracy: 0.9


## Time-synchronous DTW

In [90]:

def Time_synchronous_DTW(input,templates):
    template_len=[]
    for i in range(10):
        if i==0: template_len.append(np.shape(templates[i])[0])
        else: template_len.append(template_len[-1]+np.shape(templates[i])[0])
    print(template_len)
    new_array=[]
    old_array=[]
    for i in range(len(input)):
        if i==0:
            for k in range(len(template_len)):
                if k==0:
                    for j in range(template_len[k]):
                        if j==0:
                            old_array.append(euclidean_distance(templates[k][j],input[i]))
                        else:
                            old_array.append(euclidean_distance(templates[k][j],input[i])+old_array[j-1])
                else:
                    for j in range(template_len[k-1],template_len[k]):
                        if j==template_len[k-1]:
                            old_array.append(euclidean_distance(templates[k][0],input[i]))
                        else:
                            old_array.append(euclidean_distance(templates[k][j-template_len[k-1]],input[i])+old_array[j-1])
        else:
            for k in range(len(template_len)):
                if k==0:
                    for j in range(template_len[k]):
                        if j==0:
                            new_array.append(euclidean_distance(templates[k][j],input[i])+old_array[j])
                        else:
                            dist=euclidean_distance(templates[k][j],input[i])
                            left=dist+old_array[j]
                            bottom=dist+new_array[j-1]
                            bottom_left=dist+old_array[j-1]
                            new_array.append(min(left,bottom,bottom_left))
                else:
                    for j in range(template_len[k-1],template_len[k]):
                        if j==template_len[k-1]:
                            new_array.append(euclidean_distance(templates[k][0],input[i])+old_array[j])
                        else:
                            dist=euclidean_distance(templates[k][j-template_len[k-1]],input[i])
                            left=dist+old_array[j]
                            bottom=dist+new_array[j-1]
                            bottom_left=dist+old_array[j-1]
                            new_array.append(min(left,bottom,bottom_left))
            old_array=new_array
            new_array=[]
    type=0
    dtw=old_array[template_len[0]-1]
    for i in range(1,len(template_len)):
        dist=old_array[template_len[i]-1]
        if dist<dtw:
            dtw=dist
            type=i
    return type
        


In [125]:
training_feature=get_cepstrum_features('1_training_2.wav')
Time_synchronous_DTW(training_feature,template_feature_matrix)
print(DTW(training_feature,template_feature_matrix[4]))

[99, 248, 347, 446, 545, 644, 743, 842, 941, 1040]
299.41819868137617


## TS-DTW with pruning

In [107]:
class Node:
    def __init__(self, cost, i, j):
        self.cost = cost
        self.previous = i
        self.current = j

In [146]:
def TS_DTW_with_pruning(input, templates):
    template_len = []
    for i in range(10):
        if i == 0:
            template_len.append(np.shape(templates[i])[0])
        else:
            template_len.append(template_len[-1] + np.shape(templates[i])[0])

    print(template_len)

    new_array = []
    old_array = []

    for i in range(len(input)):
        if i == 0:
            for k in range(len(template_len)):
                if k == 0:
                    for j in range(template_len[k]):
                        if j == 0:
                            old_array.append(Node(euclidean_distance(templates[k][j], input[i]), None, (i,j)))
                        else:
                            old_array.append(Node(euclidean_distance(templates[k][j], input[i]) + old_array[j - 1].cost,
                                                 (i,j-1), (i,j)))
                else:
                    for j in range(template_len[k - 1], template_len[k]):
                        if j == template_len[k - 1]:
                            old_array.append(Node(euclidean_distance(templates[k][0], input[i]), None, (i,j)))
                        else:
                            old_array.append(Node(
                                euclidean_distance(templates[k][j - template_len[k - 1]], input[i]) + old_array[j - 1].cost,
                                (i,j-1), (i,j)))
        else:
            for k in range(len(template_len)):
                if k == 0:
                    for j in range(template_len[k]):
                        if j == 0:
                            dist = euclidean_distance(templates[k][j], input[i])
                            new_array.append(Node(dist + old_array[j].cost, (i-1,j), (i,j)))
                        else:
                            dist = euclidean_distance(templates[k][j], input[i])
                            left = dist + old_array[j].cost
                            bottom = dist + new_array[j - 1].cost
                            bottom_left = dist + old_array[j - 1].cost
                            if min(left, bottom, bottom_left)==left:
                                new_array.append(Node(min(left, bottom, bottom_left), (i-1,j), (i,j)))
                            elif min(left, bottom, bottom_left)==bottom:
                                new_array.append(Node(min(left, bottom, bottom_left), (i,j-1), (i,j)))
                            else:
                                new_array.append(Node(min(left, bottom, bottom_left), (i-1,j-1), (i,j)))
                else:
                    for j in range(template_len[k - 1], template_len[k]):
                        if j == template_len[k - 1]:
                            dist = euclidean_distance(templates[k][0], input[i])
                            new_array.append(Node(dist + old_array[j].cost, (i-1,j), (i,j)))
                        else:
                            dist = euclidean_distance(templates[k][j - template_len[k - 1]], input[i])
                            left = dist + old_array[j].cost
                            bottom = dist + new_array[j - 1].cost
                            bottom_left = dist + old_array[j - 1].cost
                            if min(left, bottom, bottom_left)==left:
                                new_array.append(Node(min(left, bottom, bottom_left), (i-1,j), (i,j)))
                            elif min(left, bottom, bottom_left)==bottom:
                                new_array.append(Node(min(left, bottom, bottom_left), (i,j-1), (i,j)))
                            else:
                                new_array.append(Node(min(left, bottom, bottom_left), (i-1,j-1), (i,j)))

            old_array = new_array
            new_array = []

    min_cost_node = old_array[template_len[0] - 1]
    type=0
    for i in range(1, len(template_len)):
        if old_array[template_len[i] - 1].cost < min_cost_node.cost:
            min_cost_node = old_array[template_len[i] - 1]
            type=i
    print(old_array[template_len[type] - 1].cost)
    return type  # Return the template type


In [147]:
training_feature=get_cepstrum_features('1_training_2.wav')
TS_DTW_with_pruning(training_feature,template_feature_matrix)

[99, 248, 347, 446, 545, 644, 743, 842, 941, 1040]
299.41819868137617


4