In [2]:
import numpy as np
import math
from scipy.io.wavfile import read
from scipy.signal import hamming
from scipy.fftpack import fft, fftshift, dct

In [3]:
train_dir = "train\\"
test_dir = "test\\"

## Feature Extraction

In [4]:
#calculate LPC coefficients from sound file

def auto_correlation(x, lag):
  n = len(x)
  x_mean = np.mean(x)
  y = x - x_mean
  r = np.zeros((lag+1,1))
  for k in range(lag+1):
    top = 0
    bottom = 0
    for i in range(n-k):
      top += y[i]*y[i+k]
    for i in range(n):
      bottom += y[i]*y[i]
    if bottom==0:
      r[k][0] = 0.0001
    else:  
      r[k][0] = top/bottom
  return r

"""Function to calculate symmetric matrix for matrix multiplication in further step to calculate alpha."""

def sym_matrix(n):
  size = (n,n)
  ret_mat = np.zeros(size)
  for i in range(0,n):
    for j in range(0,n):
      ret_mat[i][j] = (np.abs(i-j))
  
  return ret_mat

"""#Function to calculate Linear Prediction Coefficient."""

def lpc(signal, sampling_freq, no_coeff):
  #print('Stage1: Time Framing')
  
  #Time Framing: Standard-25ms
  no_sample = int(0.025*sampling_freq)
  no_delay = int(0.010*sampling_freq)
  no_frame = int(math.ceil(len(signal)/(no_sample-no_delay)))

  #Padding
  padding = ((no_sample - no_delay)*no_frame) - len(signal)
  if padding > 0:
    s = np.append(signal, np.zeros(padding))
  else:
    s = signal
  #print('Stage1: Done')
  
  
  #print('Stage2: Segmentation')
  #segmenting signal in frames
  start = 0
  count = 0
  for i in range(no_frame):
    if start == 0:
      seg_mat = np.zeros((1, no_sample))
      seg_mat[0] = s[start:no_sample]
      start = no_sample
      count += 1
    else:
      if s.shape[0] - start >= 1200:
        temp_mat = np.zeros((1, no_sample))
        temp_mat[0][:] = s[start-no_delay:start-no_delay+no_sample]
        start = start - no_delay + no_sample
        seg_mat = np.vstack((seg_mat, temp_mat))
        count += 1
  #print('Stage2: Done')
  #Hamming Window operation (Optional)
  x = np.zeros_like(seg_mat)
  for i in range(count):
    x = seg_mat[i]*np.hamming(no_sample)

  #print('Stage3: LPC with Yule-walker algorithm')
  #print('Please Wait')
  #calculating LPC with Yule-walker algorithm
  lpc_coeff = np.zeros((count, no_coeff))
  for i in range(count):
    r1 = auto_correlation(seg_mat[i], no_coeff)
    temp = r1[1:][0]
    r = np.resize(temp,(no_coeff,1))
    r = (-1)*r
    R = sym_matrix(no_coeff)
    for j in range(no_coeff):
      for k in range(no_coeff):
        val = int(R[k][j])
        R[k][j] = r1[val]
    pro_mat = np.zeros((no_coeff,1))
    pro_mat = np.dot(np.linalg.pinv(R),r)
    lpc_coeff[i] = np.resize(pro_mat, (1, no_coeff)) 
    #Converting in the range -1 to +1
    lpc_coeff[i] = lpc_coeff[i][:]/np.max(np.abs(lpc_coeff[i]))

  #print('Stage3: Done')
  return lpc_coeff

In [5]:
# calculate LPCC coefficients from LPC coefficients

def normi(arr):
    maxi = np.max(arr) 
    mini = np.min(arr)
    diffr = maxi - mini
    for i in range(len(arr[:, 0])):
        for k in range(len(arr[0, :])):
            arr[i][k] = (arr[i][k] - mini) / diffr
    return arr

def lpcc(s, fs, seq, order):

    #divide into segments of 25 ms with overlap of 10ms
    nSamples = np.int32(0.025*fs)
    overlap = np.int32(0.01*fs)
    nFrames = np.int32(np.ceil(len(s)/(nSamples-overlap)))

    #zero padding to make signal length long enough to have nFrames
    padding = ((nSamples-overlap)*nFrames) - len(s)
    if padding > 0:
        signal = np.append(s, np.zeros(padding))
    else:
        signal = s
    segment = np.empty((nSamples, nFrames))
    start = 0
    for i in range(nFrames):
        segment[:,i] = signal[start:start+nSamples]
        start = (nSamples-overlap)*i

    lpcc_coeffs = np.empty((order, nFrames))
    for n in range(1, order + 1):
        # Use order + 1 as upper bound for the last iteration
        upbound = (order + 1 if n > order else n)
        lpcc_coef = -sum(i * seq[:, n - i - 1] for i in range(1, upbound)) * 1. / upbound
        lpcc_coef -= seq[:, n - 1] if n <= len(seq[:, n]) else 0
        np.vstack((lpcc_coeffs[:, n], lpcc_coef))
    lpcc_coeffs = np.nan_to_num(lpcc_coeffs)
    lpcc_coeffs = np.float64(lpcc_coeffs)
    lpcc_coeffs = normi(lpcc_coeffs)
    
    return lpcc_coeffs.T

In [6]:
def hertz_to_mel(freq):
    return 1125*np.log(1 + freq/700)
    
def mel_to_hertz(m):
    return 700*(np.exp(m/1125) - 1)
    
#calculate mel frequency filter bank
def mel_filterbank(nfft, nfiltbank, fs):
     
    #set limits of mel scale from 300Hz to 8000Hz
    lower_mel = hertz_to_mel(300)
    upper_mel = hertz_to_mel(8000)
    mel = np.linspace(lower_mel, upper_mel, nfiltbank+2)
    hertz = [mel_to_hertz(m) for m in mel]
    fbins = [int(hz * int(nfft/2+1)/fs) for hz in hertz]
    fbank = np.empty((int(nfft/2+1),nfiltbank))
    for i in range(1,nfiltbank+1):
        for k in range(int(nfft/2 + 1)):
            if k < fbins[i-1]:
                fbank[k, i-1] = 0
            elif k >= fbins[i-1] and k < fbins[i]:
                fbank[k,i-1] = (k - fbins[i-1])/(fbins[i] - fbins[i-1])
            elif k >= fbins[i] and k <= fbins[i+1]:
                fbank[k,i-1] = (fbins[i+1] - k)/(fbins[i+1] - fbins[i])
            else:
                fbank[k,i-1] = 0
     
#    plotting mel frequency filter banks           
#    plt.figure(1)
#    xbins = fs*np.arange(0,nfft/2+1)/(nfft/2+1)
#    for i in range(nfiltbank):
#        plt.plot(xbins, fbank[:,i])
#    plt.axis(xmax = 8000)
#    plt.xlabel('Frequency in Hz')
#    plt.ylabel('Amplitude')
#    plt.title('Mel Filterbank')
#    plt.show()
    return fbank
            
def mfcc(s,fs, nfiltbank):
  
    #divide into segments of 25 ms with overlap of 10ms
    nSamples = np.int32(0.025*fs)
    overlap = np.int32(0.01*fs)
    nFrames = np.int32(np.ceil(len(s)/(nSamples-overlap)))
    #zero padding to make signal length long enough to have nFrames
    padding = ((nSamples-overlap)*nFrames) - len(s)
    if padding > 0:
        signal = np.append(s, np.zeros(padding))
    else:
        signal = s
      
    #segmenting signal in frames    
    segment = np.empty((nSamples, nFrames))
    start = 0
    for i in range(nFrames):
        segment[:,i] = signal[start:start+nSamples]
        start = (nSamples-overlap)*i
    
    #compute periodogram
    nfft = 512
    periodogram = np.empty((nFrames, int(nfft/2 + 1)))
    for i in range(nFrames):
        x = segment[:,i] * hamming(nSamples)
        spectrum = fftshift(fft(x,nfft))
        periodogram[i,:] = abs(spectrum[int(nfft/2-1):])/nSamples
        
    #obtain filterbank  
    fbank = mel_filterbank(nfft, nfiltbank, fs)
    
    #nfiltbank MFCCs for each frame
    mel_coeff = np.empty((nfiltbank,nFrames))
    for i in range(nfiltbank):
        for k in range(nFrames):
            mel_coeff[i,k] = np.sum(periodogram[k,:]*fbank[:,i])
            
    mel_coeff = np.log10(mel_coeff)
    mel_coeff = dct(mel_coeff)
    #exclude 0th order coefficient (much larger than others)
    mel_coeff[0,:]= np.zeros(nFrames)
    return mel_coeff.T

## Feature Match

In [7]:
def EuclideanDistance(m1, m2):
  """Takes in two matrices as parameters (m1, m2)
  and returns the Euclidean Distance Matrix between them
  """
  
  r = np.shape(m1)[0] # Number of rows in the Euclidean Distance Matrix will be number of rows in m1
  c = np.shape(m2)[0] # Number of columns in the Euclidean Distance Matrix will be number of rows in m2
 
  EDM = np.empty((r, c)) #Euclidean Distance Matrix EDM (empty) created
 
 
  for i in range(r):
    for j in range(c):
      EDM[i][j] = math.sqrt(np.sum(np.square(np.subtract(m1[i], m2[j]))))
 
  return EDM

VQ - LBG

In [8]:
def lbg_codebook(fvs, M):
  """
  Uses the LBG Algorithm to generate the codebook using the features
  fvs are the feature vectors
  M is the number of centroids
  """
  
  #INITIALIZATION
  no_centroids = 1
  centroid_coord = np.mean(fvs, axis = 0) #Centroid of features
  r = no_centroids   #number of rows in codebook (initially)
  c = len(centroid_coord)   #number of columns in codebook (initially)
  codebook = np.empty((r,c))  #codebook created
  codebook[0] = centroid_coord #first and only codevector will be the centroid of the given features

  e = 0.01
  
  while no_centroids < M: 
    #DOUBLING THE CODEBOOK according to the formula y(n)+ = y(n)*(1+e) , y(n)- = y(n)*(1-e)
    newcodebook = np.empty((2*r, c)) #Creating a temporary codebook that is to be updated

    for i in range(no_centroids):
      newcodebook[2*i] = codebook[i] * (1+e)    #y(n)+ = y(n)*(1+e)
      newcodebook[2*i+1] = codebook[i] * (1-e)  #y(n)- = y(n)*(1-e)


    codebook = newcodebook #codebook updated
    r = np.shape(codebook)[0] #updating the value of centroid // again, the number of centroids = number of codewords
    no_centroids = r #Again, the number of centroids is the number of rows
  
    Distance = EuclideanDistance(fvs, codebook) #Distance Matrix
    distortion = 1
    
    while np.abs(distortion) > e: 
      #NEAREST NEIGHBOUR SEARCH
      previousDistance = np.mean(Distance)
      nearestcentroidID = np.argmin(Distance,axis = 1)  #Contains the indices of the closest codeword for each feature
 
      #SET NEW CENTROID TO THE CENTROID OF ALL FEATURES CLOSE TO CENTROID i
      for i in range(no_centroids):
        codebook[i] = np.nan_to_num( np.nanmean(fvs[np.where(nearestcentroidID == i)], axis = 0) ) # THROWS RUN-TIME WARNING
      
      Distance = EuclideanDistance(fvs, codebook)
      newDistance = np.mean(Distance)
      distortion = (previousDistance - newDistance)/previousDistance  #updating distortion
  return codebook 

## Helper Functions

In [1]:
def closestSpeaker(spkr_features, all_features, fm_model_type):
    minDistance = np.inf
    for name in all_features:
        if fm_model_type == 'LBG':
            dist = EuclideanDistance(spkr_features, all_features[name])
            Distance = np.sum(np.min(dist, axis = 1))/(np.shape(dist)[0])

        if Distance < minDistance:
            minDistance = Distance
            speaker = name

    return speaker

In [9]:
def feature_extract(s, fs, model_type, orderLPC = 15, nfiltbank = 12):
    if model_type == 'LPCC':
        lpc_coeff = lpc(s, fs, orderLPC)
        lpcc_coeff = lpcc(s, fs, lpc_coeff, orderLPC)
        return lpcc_coeff
    elif model_type == 'LPC':
        lpc_coeff = lpc(s, fs, orderLPC)
        return lpc_coeff
    elif model_type == 'MFCC':
        mfcc_coeff = mfcc(s, fs, nfiltbank)
        return mfcc_coeff
    else:
        print("Invalid model type! Model type should be 'LPCC' or 'LPC' or 'MFCC'")
        exit()


In [10]:
def feature_match(spkr_features, all_features, fm_model_type, no_centroids = 5):
    if fm_model_type == 'LBG':
        # CODEBOOK GENERATION FOR VQ-LBG
        codebooks = {}
        for name in all_features:
            codebooks[name] = lbg_codebook(all_features[name], no_centroids) #features passed to lbg are the LPC/LPCC coefficients for current speaker.
        foundname = closestSpeaker(spkr_features, codebooks, fm_model_type)
    
    elif fm_model_type == 'DTW':
        foundname = closestSpeaker(spkr_features, all_features, fm_model_type)
        
    return foundname
        

In [11]:
def train(fe_model_type, name, orderLPC, no_filtbank, no_centroids, train_dir):
    directory = train_dir

    fname = name + '.wav'
    #print("Now", name+"\'s features are being trained")
    (fs,s) = read(directory + fname) #for each file, read() returns a tuple. fs is samples/second (sample rate) and s is the actual signal data read from the audio file
    features = feature_extract(s, fs, fe_model_type, orderLPC, no_filtbank)
        
    # print ("Training for " + name + " complete.\n")

    return features

# Automatic Speaker Recognition

In [17]:
# HYPERPARAMETERS

no_filtbank = 12
orderLPC = 20
no_centroids = 5
fe_types = ['MFCC', 'LPC'] # Feautre Extraction model types - 'MFCC' | 'LPC'
fm_types = ['LBG'] # Feature Match model types - 'LBG'

In [18]:
names = []
for i in range(8):
    names.append('s'+str(i+1))

no_crct = np.zeros((len(fe_types), len(fm_types)))

for i in range(len(fe_types)):
    for j in range(len(fm_types)):

        train_features = {}
        for name in names:
            train_features[name] = train(fe_types[i], name, orderLPC, no_filtbank, no_centroids, train_dir)
  
        #COMPARISON
        print("USING "+fe_types[i]+" and "+fm_types[j]+"\n")

        # TEST
        for name in names:
            fname = name + '.wav' 
            # print('Now ', name+'\'s test features are being tested')
            (fs,s) = read(test_dir + fname)
            curr_features = feature_extract(s, fs, fe_types[i], orderLPC, no_filtbank)
            
            foundname = feature_match(curr_features, train_features, fm_types[j], no_centroids)
            
            print(name+'.wav', ' in test matches with ', foundname+'.wav in train \n')
            if(name == foundname):
                no_crct[i][j]+=1

USING MFCC and LBG





s1.wav  in test matches with  s1.wav in train 

s2.wav  in test matches with  s2.wav in train 

s3.wav  in test matches with  s3.wav in train 

s4.wav  in test matches with  s4.wav in train 

s5.wav  in test matches with  s5.wav in train 

s6.wav  in test matches with  s6.wav in train 

s7.wav  in test matches with  s7.wav in train 

s8.wav  in test matches with  s8.wav in train 

USING LPC and LBG

s1.wav  in test matches with  s1.wav in train 

s2.wav  in test matches with  s2.wav in train 

s3.wav  in test matches with  s3.wav in train 

s4.wav  in test matches with  s4.wav in train 

s5.wav  in test matches with  s5.wav in train 

s6.wav  in test matches with  s6.wav in train 

s7.wav  in test matches with  s7.wav in train 

s8.wav  in test matches with  s8.wav in train 



## Result

In [19]:
#RESULT
print("RESULT", end='')
print(' ( Number of Speakers:', len(names), ')\n')
for i in range(len(fe_types)):
    for j in range(len(fm_types)):
        accuracy = np.round( (no_crct[i,j]/len(train_features.keys()))*100 , decimals = 3)
        print("Accuracy (" + fe_types[i] + " + " + fm_types[j] + "): ",accuracy, "%")

RESULT ( Number of Speakers: 8 )

Accuracy (MFCC + LBG):  100.0 %
Accuracy (LPC + LBG):  100.0 %
