In [48]:
# 可自行添加/删除所需要的库
import wave  
import numpy as np
import pylab as plt 
import scipy.signal as signal
from python_speech_features import *
import scipy.io.wavfile
from matplotlib import pyplot as plt
import time
import os

In [None]:
# 定义一个类，输入wav路径，输出滑窗后的mfcc特征数组，维度参数，窗长均可自定，这边只是一个default
class WavtoMfcc(object):
    def __init__(self, url, numceps=13, segment_len = 1000, hop_len = 1000):
        """
        Input:
        url - wav文件路径
        numcep - 倒频谱返回的数量，默认13，可调
        segment_len - 窗长，一个窗包含多少个采样点
        hop_len - 窗移，相邻两窗之间的间隔，一般小于窗长，这里选的是等于窗长
        
        Output:
        None
        """
        self.numceps = numceps
        self.url = url
        self.sample_rate, self.signal = scipy.io.wavfile.read(self.url)
        self.signal = np.array(self.signal)
        self.segment=[]
        signal_len = len(self.signal)
        """
        实现采样点滑窗，存入self.segment
        """
        ### START HERE ###
        self.hop_len = hop_len
        for i in range(0, signal_len - segment_len + 1, hop_len):
            self.segment.append(self.signal[i:i + segment_len])
        ### END HERE ###
        self.segment_len = len(self.segment)
        self.segment = np.array(self.segment)
        self.feature = self.get_segment_mfcc()
        self.feature = np.array(self.feature)

    def get_segment_mfcc(self):
        '''
        Input:
        无 - 对self.signal进行处理
        
        Output:
        feature - 对应窗的mfcc，返回维度可自动设计，一般为39

        Parameters:
        signal - 需要用来计算特征的音频信号，应该是一个N*1的数组
        samplerate - 我们用来工作的信号的采样率
        winlen - 分析窗口的长度，按秒计，默认0.025s(25ms)
        winstep - 连续窗口之间的步长，按秒计，默认0.01s（10ms）
        numcep - 倒频谱返回的数量，默认13
        nfilt - 滤波器组的滤波器数量，默认26
        nfft - FFT的大小，默认512
        lowfreq - 梅尔滤波器的最低边缘，单位赫兹，默认为0
        highfreq - 梅尔滤波器的最高边缘，单位赫兹，默认为采样率/2
        preemph - 应用预加重过滤器和预加重过滤器的系数，0表示没有过滤器，默认0.97
        ceplifter - 将升降器应用于最终的倒谱系数。 0没有升降机。默认值为22。
        appendEnergy - 如果是true，则将第0个倒谱系数替换为总帧能量的对数。 
        '''
        data = self.signal

        # You can try different parameters or change the function?
        wav_feature = mfcc(data, self.sample_rate, numcep=self.numceps, winlen=0.025, winstep=0.01,
                           nfilt=26, nfft=2048, lowfreq=0, highfreq=None, preemph=0.97)
        """
        根据需要处理出最终特征
        """
        ### START HERE ###
        feature = []
        # for s in self.segment:
        #     feature.append(
        #         mfcc(
        #             s,
        #             self.sample_rate,
        #             numcep=self.numceps,
        #             winlen=0.025,
        #             winstep=0.01,
        #             nfilt=26,
        #             nfft=2048,
        #             lowfreq=0,
        #             highfreq=None,
        #             preemph=0.97,
        #         )
        #     )
        for i in range(0, len(wav_feature) - self.segment_len + 1, self.segment_len):
            feature.append(wav_feature[i:i + self.segment_len])
        ### END HERE ###
        return np.array(feature)

In [66]:
# 定义两个mfcc特征匹配的cost
def cost(x,y):
    return np.sum(np.power(x-y,2))

In [67]:
wav_path1 = r"./data_en_train/digit_6/9_6.wav"
wav_path2 = r"./data_en_train/digit_6/10_6.wav"

In [None]:
# 简单测试WavtoMfcc类和cost的结果
Wav1=WavtoMfcc(wav_path1)
# f1=Wav1.get_segment_mfcc(np.array([1]))
# print(f1.shape)
Wav2=WavtoMfcc(wav_path2)
# f2=Wav2.get_segment_mfcc(np.array([1,2]))
# print(f2)
# print(cost(f1,f2))

In [69]:
# 预处理两两匹配的cost数组（PS：在这里其实并不非常必要，复杂度与直接调用仍然是一致的，差个常数）
def getDist(x, y):
    row, col = x.segment_len, y.segment_len
    Dist = np.zeros((row, col))
    for i in range(row):
        for j in range(col):
            Dist[i,j] = cost(x.feature[i], y.feature[j])
    return Dist

In [70]:
# 得到预处理出的两两匹配的代价
Dist = getDist(Wav1,Wav2)

ValueError: operands could not be broadcast together with shapes (22,13) (25,13) 

In [71]:
def dtw(x, y, Dist=None, K=2):
    """
    Computes Dynamic Time Warping (DTW) of two sequences.
    Input:
    :param array x: N1*M array
    :param array y: N2*M array
    :param func dist: distance used as cost measure
    :param int K: flexible
    Output:
    Returns the normalized minimum distance D[-1, -1] / sum(D.shape), and the wrap path ans.
    """
    row, col = x.segment_len, y.segment_len
    D = np.zeros((row + 1, col + 1))
    D[0, 1:] = np.inf
    D[1:, 0] = np.inf
    ans_path_x = np.zeros((row + 1, col + 1))
    ans_path_y = np.zeros((row + 1, col + 1))
    ### START HERE ###
    for i in range(1, row + 1):
        for j in range(1, col + 1):
            # 更新最短距离(DP)
            min_d = min(D[i - 1, j], D[i, j - 1], D[i - 1, j - 1])
            D[i, j] = Dist[i - 1, j - 1] + min_d
            # 记录路径
            if min_d == D[i - 1, j]:
                ans_path_x[i, j] = i - 1
                ans_path_y[i, j] = j
            elif min_d == D[i, j - 1]:
                ans_path_x[i, j] = i
                ans_path_y[i, j] = j - 1
            else:
                ans_path_x[i, j] = i - 1
                ans_path_y[i, j] = j - 1

    # 路径回溯
    ans = []
    i, j = row, col
    while i > 0 and j > 0:
        ans.append((i - 1, j - 1))
        # TODO 直接写i, j?
        prev_i, prev_j = int(ans_path_x[i, j]), int(ans_path_y[i, j])
        i, j = prev_i, prev_j
    ans.reverse()
    ### END HERE ###
    return D[-1, -1] / sum(D.shape), ans

In [72]:
# 计算两段wav的DTW score
time_start=time.time()
Dist = getDist(Wav1, Wav2)
Cost, path = dtw(Wav1, Wav2, Dist, K = 2)
print(Cost)
print(path)
print("Time for one run: %f"%(time.time()-time_start))

ValueError: operands could not be broadcast together with shapes (22,13) (25,13) 

In [10]:
# 对于同一个word的所有wav数据去重新寻找其中最好的一个，通过dtw得到的alignment构建master template，选择其中最好的一个，以提升识别的鲁棒性
def Choice_Master(word, wav_arr):
    """
    Computes master template from dataset.
    Input:
    word: a number in [0-9]
    wav-arr: wav url which is corresponding with word
    Output:
    Returns the average master_template.
    """
    master_temp = []
    for i in range(len(wav_arr)):
        master_wav = wav_arr[i]
        """
        modified master_wav inplace to construct candidate master template for each wav file
        """
        ### START HERE ###
        
        
        
        ### END HERE ###
        master_temp.append(master_wav)
    """
    calculate the cost for each candidate master template and pick the best as master template
    """
    ave_cost = []
    for i in range(len(master_temp)):
        master_wav = master_temp[i]
        cost_sum = 0
        ### START HERE ###
        
        
        
        ### END HERE ###
        ave_cost.append(cost_sum)
    ave_cost = np.array(ave_cost)
    idx = np.argmin(ave_cost)
    
    print(" the best master template for word <"+word+"> is the "+str(idx)+"-th wav file.")
    return master_temp[idx]

In [None]:
# 构建master template
wordlist = range(10)
masterwav = []

for word in wordlist:
    wordpath=r"./data_en_train/digit_"+str(word)+"/"
    wav_arr = []
    for wavpath in os.listdir(wordpath):
        wav_arr.append(WavtoMfcc(wordpath+wavpath))
    #print(wav_arr)
    masterwav.append(Choice_Master(str(word),wav_arr))


In [12]:
# 存储每个word的master template
np.savez('dtw_master_arr',masterwav=masterwav) 

In [13]:
npzfile = np.load('dtw_master_arr.npz',allow_pickle=True)

In [14]:
masterwav = npzfile['masterwav']

In [15]:
# 利用在训练集上得到的master template，去计算在test集上word识别的效果
def evaluation(masterwav):
    acc = 0
    cnt = 0
    rootpath = "./data_en_test/data_en/"
    rootdir = os.listdir(rootpath)
    for file in rootdir:
        word = int(file.split('.')[0].split('_')[1])
        wavpath = os.path.join(rootpath, file)
        """
        用master template和dtw做识别
        """
        ### START HERE ###
        
        
        
        ### END HERE ###
    return acc/cnt

In [None]:
# 输出整体识别效果，例如0.75
evaluation(masterwav)

In [17]:
# 用master template做识别，输出识别单个语音的结果，结果为[0-9]
def inference(masterwav, wav):
    costs = []
    ### START HERE ###
        
        
        
    ### END HERE ###
    costs = np.array(costs)
    ans_label = np.argmin(costs)
    return ans_label

In [18]:
wav_path = r"./data_en_test/data_en/18_3.wav"
wav_i = WavtoMfcc(wav_path)
inference(masterwav, wav_i)

3

In [19]:
# Bonus task:
# 通过尝试不同策略修改Choice_Master函数（不要修改底层的DTW逻辑），获得更优秀的匹配分数