In [1]:
import numpy as np
import scipy.io.wavfile as wav
from pynq.lib import dma
from pynq import Overlay
from pynq import Xlnk
import matplotlib.pyplot as plt 
import time

In [2]:
##########      zero rate cal        ############
def calEnergy(wave_data,frmae_len):
    """
    :param wave_data: binary data of audio file
    :return: energy
    """
    energy = []
    sum = 0
    frmae_len=np.int32(frmae_len)
    for i in range(len(wave_data)):
        sum = sum + (int(wave_data[i]) * int(wave_data[i]))
        if (i + 1) % frmae_len == 0:
            energy.append(sum)
            sum = 0
        elif i == len(wave_data) - 1:
            energy.append(sum)
    return energy

In [3]:
def calZeroCrossingRate(wave_data,frmae_len):
    """
    :param wave_data: binary data of audio file
    :return: ZeroCrossingRate
    """
    zeroCrossingRate = []
    sum = 0
    frmae_len=np.int32(frmae_len)
    for i in range(len(wave_data)):
        sum = sum + np.abs(int(wave_data[i] >= 0) - int(wave_data[i - 1] >= 0))
        if (i + 1) % frmae_len == 0:
            zeroCrossingRate.append(float(sum) / (frmae_len-1))
            sum = 0
        elif i == len(wave_data) - 1:
            zeroCrossingRate.append(float(sum) / (frmae_len-1))
    return zeroCrossingRate

In [4]:
# 利用短时能量，短时过零率，使用双门限法进行端点检测
def endPointDetect(energy, zeroCrossingRate) :
    sum = 0
    energyAverage = 0
    for en in energy :
        sum = sum + en
    energyAverage = sum / len(energy)

    sum = 0
    for en in energy[:5] :
        sum = sum + en
    ML = sum / 5                        
    MH = energyAverage / 4              #较高的能量阈值
    ML = (ML + MH) / 4    #较低的能量阈值
    sum = 0
    for zcr in zeroCrossingRate[:5] :
        sum = float(sum) + zcr             
    Zs = sum / 5                     #过零率阈值

    A = []
    B = []
    C = []

    # 首先利用较大能量阈值 MH 进行初步检测
    flag = 0
    for i in range(len(energy)):
        if len(A) == 0 and flag == 0 and energy[i] > MH :
            A.append(i)
            flag = 1
        elif flag == 0 and energy[i] > MH and i - 21 > A[len(A) - 1]:
            A.append(i)
            flag = 1
        elif flag == 0 and energy[i] > MH and i - 21 <= A[len(A) - 1]:
            A = A[:len(A) - 1]
            flag = 1

        if flag == 1 and energy[i] < MH :
            A.append(i)
            flag = 0
    # print("较高能量阈值，计算后的浊音A:" + str(A))

    # 利用较小能量阈值 ML 进行第二步能量检测
    for j in range(len(A)) :
        i = A[j]
        if j % 2 == 1 :
            while i < len(energy) and energy[i] > ML :
                i = i + 1
            B.append(i)
        else :
            while i > 0 and energy[i] > ML :
                i = i - 1
            B.append(i)
    # print("较低能量阈值，增加一段语言B:" + str(B))

    # 利用过零率进行最后一步检测
    for j in range(len(B)) :
        i = B[j]
        if j % 2 == 1 :
            while i < len(zeroCrossingRate) and zeroCrossingRate[i] >= 3 * Zs :
                i = i + 1
            C.append(i)
        else :
            while i > 0 and zeroCrossingRate[i] >= 3 * Zs :
                i = i - 1
            C.append(i)
    # print("过零率阈值，最终语音分段C:" + str(C))
    return C

In [5]:
##########      for recognize        ############
def pre_operate(sig,frame_len):
    '''预处理集总'''
    # 过零率检测
    zr=calZeroCrossingRate(sig,frame_len)
    en=calEnergy(sig,frame_len)
    C=endPointDetect(en,zr)
    if((len(C)%2)==1):
            C.append(int(len(sig)/frame_len))
    D=np.array(C)*frame_len
    
    print(D)
    dif=list(D[1::2]-D[0::2])
    loc=dif.index(max(dif))*2
    sig0=np.array(sig[D[loc]:D[loc+1]])
    print("有效语音段:",C[loc],C[loc+1])
       
    min0=np.min(sig0)
    max0=np.max(sig0)
    scale =np.double(max0 - min0)/32767.0
    data_int = ((sig0 - min0) / scale)
    sig2 =np.array(np.rint(data_int),dtype=np.int16)
    
    return sig2

In [6]:
def mfcccal(overlaypath,signal):
    """
    使用Overlay调用MFCC提取硬件部分电路
    """
    np.set_printoptions(suppress=True) 

    overlay0=Overlay(overlaypath)
    x=overlay0.axi_dma_0
    xlnk = Xlnk()
        
    len0=len(signal)
    len1=(int(np.ceil(len0/500))*2-1)*12
    
    mfcc=np.zeros(len1, dtype=np.int16)
    
    input_buffer = xlnk.cma_array(shape=(len0*2,), dtype=np.int16)
    output_buffer = xlnk.cma_array(shape=(len1,), dtype=np.int16)  
    
    for i in range(len0):
        input_buffer[i] = signal[i]

#     start = time.time()
    x.sendchannel.transfer(input_buffer)    
    
    x.recvchannel.transfer(output_buffer)
    
#     end = time.time()
    
    for i in range(len1):
         mfcc[i]=output_buffer[i] 

#     print('time cost ' + str(round(end - start, 8)) + 's')
#     cost = round(end - start, 8)
#     speed = round(len0/cost,8)
#     print('transfer speed: ' + str(speed) + 'sample/s')
#     input_buffer.close()
#     output_buffer.close()
    return mfcc

In [7]:
def mfccusing(ovlpath,sig):
    """
    连接MFCC提取硬件部分
    """
    part0=np.int(np.ceil(len(sig)/4000))*4000
    
    sig0=(pre_operate(np.array(sig,dtype=np.double),500))
    b=[]
    a1=[]
    c=[]
    
    for i in range(0,part0,4000):
        sig1=np.insert(sig0[i:i+3999],0,0)
        l0=len(sig1)
        a=np.array(mfcccal(ovlpath,sig1),dtype=np.int16)
        a1=a/32
        b.extend(a1)
    b=np.array(b)
    for i in range(0,len(b),12):
        c.append(b[0+i:i+12])
    print('mfccusing complete!')
    c=np.array(c)
    return c

In [8]:
def calEuclidDist(A, B):
    """
    :param A, B: two vectors
    :return: the Euclidean distance of A and B
    """
    return np.sqrt(sum([(a - b) ** 2 for (a, b) in zip(A, B)]))

In [9]:
def dtw(M1, M2):
    """
    Compute Dynamic Time Warping(DTW) of two mfcc sequences.
    :param M1, M2: two mfcc sequences
    :return: the minimum distance and the wrap path
    """
    # length of two sequences
    M1_len = len(M1)
    M2_len = len(M2)
    cost_0 = np.zeros((M1_len + 1, M2_len + 1))
    cost_0[0, 1:] = np.inf
    cost_0[1:, 0] = np.inf
    
    # Initialize the array size to M1_len * M2_len
    cost = cost_0[1:, 1:]
    for i in range(M1_len):
        for j in range(M2_len):
            cost[i, j] = calEuclidDist(M1[i], M2[j])
            
    # dynamic programming to calculate cost matrix
    for i in range(M1_len):
        for j in range(M2_len):
            cost[i, j] += min([cost_0[i, j], \
                               cost_0[min(i + 1, M1_len - 1), j], \
                               cost_0[i, min(j + 1, M2_len - 1)]])
    # calculate the warp path
    if len(M1) == 1:
        path = np.zeros(len(M2)), range(len(M2))
    elif len(M2) == 1:
        path = range(len(M1)), np.zeros(len(M1))
    else:
        i, j = np.array(cost_0.shape) - 2
        path_1, path_2 = [i], [j]
        # path_1, path_2 with the minimum cost is what we want
        while (i > 0) or (j > 0):
            arg_min = np.argmin((cost_0[i, j], cost_0[i, j + 1], cost_0[i + 1, j]))
            if arg_min == 0:
                i -= 1
                j -= 1
            elif arg_min == 1:
                i -= 1
            else:
                j -= 1
            path_1.insert(0, i)
            path_2.insert(0, j)
        # convert to array
        path = np.array(path_1), np.array(path_2)
    return path,cost[M1_len-1][M2_len-1]

In [10]:
##########      trainmodel         ############
def modeltrain(mfcc_list,model):
    """开始DTW模型训练"""
    # set the first sequence as standard, merge all training sequences
    mfcc_count = np.zeros(len(mfcc_list[0]))
    mfcc_all = np.zeros(mfcc_list[0].shape)
    for i in range(len(mfcc_list)):
        # calculate the wrap path between standard and each template
        path,_ = dtw(mfcc_list[0], mfcc_list[i])
        for j in range(len(path[0])):
            mfcc_count[int(path[0][j])] += 1
            mfcc_all[int(path[0][j])] += mfcc_list[i][path[1][j]]

    # Generalization by averaging the templates
    model_digit = np.zeros(mfcc_all.shape)
    for i in range(len(mfcc_count)):
        for j in range(len(mfcc_all[i])):
            model_digit[i][j] = mfcc_all[i][j] / mfcc_count[i]
            
    # return model with models of 0-9 digits
    model.append(model_digit)

In [11]:
def retrainpre(path_model,path_init,ovlpath):
    """模型训练预准备"""
    model=[]
    for i in range(0,10):
        mfcc_list=[]
        for j in range(0,8,2):
            (rate,sig) = wav.read(path_init+str(i)+str(j)+".wav")
            m0=np.zeros(len(sig),dtype=np.int16)
            m0=mfccusing(ovlpath,sig)
            
            mfcc_list.append(m0)
        
        modeltrain(mfcc_list,model) #调用训练函数
        print("---数字"+str(i)+"训练完成！---")
    np.save((path_model+"model.npy"),model)

In [12]:
def predict_dtw(model, path_test,speech_name,ovlpath):
    """
    :param model: trained model
    :param file_path: path of .wav file
    :return: digit
    """
    # Iterate, find the digit with the minimum distance
    digit = 0
    (rate,sig) = wav.read(path_test+speech_name)
    m0=mfccusing(ovlpath,sig)
    #print(m0)
    _,min_dist = dtw(model[0], m0)
    for i in range(len(model)):
        print(i)
        _,dist= dtw(model[i], m0)
        if dist < min_dist:
            digit = i
            
            min_dist = dist
        print(dist)
    return digit

In [13]:
def speech_recognize(speech_name,path_init,path_test,path_model,ovlpath,retrain=1):
    '''DTW Recgonize'''
    if(retrain):
#         retrainpre1(path_model,path_init,ovlpath)
#         retrainpre2(path_model,path_init,ovlpath)
        retrainpre(path_model,path_init,ovlpath)

    model0=np.load(path_model+"model.npy")

    dig=predict_dtw(model0,path_test,speech_name,ovlpath)

    return dig

In [14]:
np.set_printoptions(threshold=np.inf)
np.set_printoptions(suppress=True)
ovlpath="/home/xilinx/jupyter_notebooks/test0/mfcc/design_1_wrapper.bit"
path_init="/home/xilinx/jupyter_notebooks/voice/voice/voice_base/"
path_test="/home/xilinx/jupyter_notebooks/voice/voice/voice_base/"
path_model="/home/xilinx/jupyter_notebooks/voice/voice/"
cor=0
wro=0
for i in range(1,8,2):  
    for j in range(0,10):
        speech_name=str(j)+str(i)+".wav"
        number=speech_recognize(speech_name,path_init,path_test,path_model,ovlpath,0)
        if(number==j):
            cor=cor+1
            print("数字",str(j),"正确^_^,当前语音识别率：",cor*100/(cor+wro),"%")
        else:
            wro=wro+1  
            print("数字",str(j),"错误！！,当前语音识别率：",cor*100/(cor+wro),"%")
                  
print("--------------------------------------------------------")
print("识别结果语音正确个数：",cor,"\t识别结果语音错误个数：",wro)
print("识别率：",cor*100/(cor+wro),"%")

[33500 44000]
有效语音段: 67 88
mfccusing complete!
0
641.474493122
1
821.239013299
2
1206.06599872
3
961.463385532
4
1135.68682773
5
1063.20836048
6
777.007029246
7
1016.69502043
8
1231.32939354
9
1020.52405451
数字 0 正确^_^,当前语音识别率： 100.0 %
[ 1000  2000 32000 41000]
有效语音段: 64 82
mfccusing complete!
0
872.204673067
1
635.574167513
2
1239.9895328
3
1076.80170915
4
1043.42975934
5
1087.88734477
6
1044.22772364
7
852.741028603
8
1245.38006408
9
1177.13186968
数字 1 正确^_^,当前语音识别率： 100.0 %
[33500 40500]
有效语音段: 67 81
mfccusing complete!
0
1104.2735771
1
1091.93019183
2
519.317378477
3
654.433103074
4
844.176693414
5
1147.16921681
6
860.464238006
7
1111.27472152
8
444.926060188
9
1211.93525038
数字 2 错误！！,当前语音识别率： 66.66666666666667 %
[ 1000  2000 35000 43500]
有效语音段: 70 87
mfccusing complete!
0
883.886162068
1
841.650976432
2
659.227948777
3
480.390992249
4
787.964982814
5
1050.30499422
6
823.744271167
7
919.838860196
8
576.890074361
9
986.748423581
数字 3 正确^_^,当前语音识别率： 75.0 %
[ 1000  2000 32000 43000]
有效

[33500 41000]
有效语音段: 67 82
mfccusing complete!
0
934.381851764
1
936.808926421
2
645.837740279
3
510.131605062
4
800.477559737
5
1098.72050387
6
831.893801605
7
986.911277342
8
571.839085043
9
1033.93194791
数字 3 正确^_^,当前语音识别率： 79.41176470588235 %
[ 1000  2000 31500 38000]
有效语音段: 63 76
mfccusing complete!
0
956.900088187
1
954.980983458
2
732.763759967
3
708.055949601
4
697.267600814
5
1139.56540282
6
818.491286527
7
875.054592633
8
734.612019627
9
1130.58341864
数字 4 正确^_^,当前语音识别率： 80.0 %
[ 1000  2000 28000 36500]
有效语音段: 56 73
mfccusing complete!
0
1189.72691079
1
1135.32017215
2
1103.42073631
3
1218.189804
4
1243.61509694
5
754.114917936
6
991.302437512
7
1316.12205645
8
1184.3814848
9
1132.99373614
数字 5 正确^_^,当前语音识别率： 80.55555555555556 %
[ 1000  2000 32500 40000]
有效语音段: 65 80
mfccusing complete!
0
866.722999022
1
862.678495581
2
783.645143237
3
871.911983092
4
910.900761985
5
936.503098472
6
547.691690144
7
1029.94317443
8
836.522474171
9
885.73584023
数字 6 正确^_^,当前语音识别率： 81.0810810810