In [682]:
%matplotlib inline
from functools import lru_cache
import os
import numpy as np
import scipy.io.wavfile
import scipy.fftpack
import scipy.linalg
import matplotlib.pyplot as plt
import librosa

In [683]:
PATH = '../input/dtw3'
COMMANDS = f'{PATH}/audio'
VALIDATION = f'{PATH}/validation'
TEST = f'{PATH}/test'
SPOKEN_DIGIT = f'{PATH}/free-spoken-digit-dataset/recordings'

In [684]:
# Folder 'audio': 10 recordings of each digit at 16Khz
commands10x10 = []
for root, dirs, files in os.walk(COMMANDS):
    for name in files:
        filename = os.path.join(root, name)
        command = root.split('/')[-1]
        speaker = name.split('_')[0]
        commands10x10.append({
            'wav': filename,
            'text': command,
            'speaker': speaker
        })
print(commands10x10[-1])

In [685]:
# Folder 'validation': 100 recordings of each digit at 16Khz
commands10x100 = []
for root, dirs, files in os.walk(VALIDATION):
    for name in files:
        filename = os.path.join(root, name)
        command = root.split('/')[-1]
        speaker = name.split('_')[0]
        commands10x100.append({
            'wav': filename,
            'text': command,
            'speaker': speaker
        })
print(commands10x100[-1])

In [686]:
# Folder 'free-spoken-digit-dataset': 4 recordings of each digit for each of the four selected speakers
free10x4x4 = []
SPEAKERS = ['jackson', 'nicolas', 'theo', 'yweweler']
N_DIGITS = 10
N_REPETITIONS = 4
for digit in range(N_DIGITS):
    for speaker in SPEAKERS:
        for repetition in range(N_REPETITIONS):
            free10x4x4.append({
                'wav': f'{SPOKEN_DIGIT}/{digit}_{speaker}_{repetition}.wav',
                'text': str(digit),
                'speaker': speaker
            })
print(free10x4x4[-1])

In [687]:
# Hamming window
@lru_cache(maxsize=10)
def get_window(n, type='hamming'):
    coefs = np.arange(n)
    window = 0.54 - 0.46 * np.cos(2 * np.pi * coefs / (n - 1))
    return window
plt.plot(get_window(512))

In [688]:
# Preemphasis filter
def apply_preemphasis(y, preemCoef=0.97):
    y[1:] = y[1:] - preemCoef*y[:-1]
    y[0] *= (1 - preemCoef)
    return y

In [689]:
def freq_to_mel(freq):
    return 2595.0 * np.log10(1.0 + freq / 700.0)
plt.plot(freq_to_mel(np.arange(8000)))

In [690]:
def mel_to_freq(mels):
    return 700.0 * (np.power(10.0, mels / 2595.0) - 1.0)

In [691]:
@lru_cache(maxsize=10)
def get_filterbank(numfilters, filterLen, lowFreq, highFreq, samplingFreq):
    minwarpfreq = freq_to_mel(lowFreq)
    maxwarpfreq = freq_to_mel(highFreq)
    dwarp = (maxwarpfreq - minwarpfreq) / (numfilters + 1)
    f = mel_to_freq(np.arange(numfilters + 2) * dwarp + minwarpfreq) * (filterLen - 1) * 2.0 / samplingFreq
    i = np.arange(filterLen)[None, :]
    f = f[:, None]
    hislope = (i - f[:numfilters]) / (f[1:numfilters+1] - f[:numfilters])
    loslope = (f[2:numfilters+2] - i) / (f[2:numfilters+2] - f[1:numfilters+1])
    H = np.maximum(0, np.minimum(hislope, loslope))
    return H
H = get_filterbank(numfilters=20, filterLen=257, lowFreq=0, highFreq=8000, samplingFreq=16000)
fig = plt.figure(figsize=(20,10))
for h in H:
  plt.plot(h)

In [692]:
def normalized(y, threshold=0):
    y -= y.mean()
    stddev = y.std()
    if stddev > threshold:
        y /= stddev
    return y

In [693]:
def mfsc(y, sfr, window_size=0.025, window_stride=0.010, window='hamming', normalize=False, log=True, n_mels=20, preemCoef=0, melfloor=1.0):
    # window length in samples
    win_length = int(sfr * window_size)
    
    # window shift in samples
    hop_length = int(sfr * window_stride)
    
    # number of points of the DFT
    #n_fft = 2048
    n_fft = 512
    
    # frequency analysis limits
    lowfreq = 0
    highfreq = sfr/2
    
    # get window
    window = get_window(win_length)
    padded_window = np.pad(window, (0, n_fft - win_length), mode='constant')[:, None]
    
    # preemphasis
    y = apply_preemphasis(y.copy(), preemCoef)

    # scale wave signal
    y *= 32768
    
    # get overlaped frames using numpy stride_tricks
    num_frames = 1 + (len(y) - win_length) // hop_length
    pad_after = num_frames*hop_length + (n_fft - hop_length) - len(y)
    if pad_after > 0:
        y = np.pad(y, (0, pad_after), mode='constant')
    frames = np.lib.stride_tricks.as_strided(y, shape=(n_fft, num_frames), strides=(y.itemsize, hop_length * y.itemsize), writeable=False)
    windowed_frames = padded_window * frames
    
    # compute the modulus of the DFT of each frame
    D = np.abs(np.fft.rfft(windowed_frames, axis=0))

    # apply mel filterbank
    filterbank = get_filterbank(n_mels, n_fft/2 + 1, lowfreq, highfreq, sfr)
    mf = np.dot(filterbank, D)
    
    if log:
        mf = np.log(np.maximum(melfloor, mf))
    if normalize:
        mf = normalized(mf)

    return mf

In [694]:
def mfsc2mfcc(S, n_mfcc=12, dct_type=2, norm='ortho', lifter=22, cms=True, cmvn=True):
    # Discrete Cosine Transform
    M = scipy.fftpack.dct(S, axis=0, type=dct_type, norm=norm)[:n_mfcc]

    # Ceptral mean subtraction (CMS) 
    if cms or cmvn:
        M -= M.mean(axis=1, keepdims=True)

    # Ceptral mean and variance normalization (CMVN)
    if cmvn:
        M /= M.std(axis=1, keepdims=True)
    
    # Liftering
    elif lifter > 0:
        lifter_window = 1 + (lifter / 2) * np.sin(np.pi * np.arange(1, 1 + n_mfcc, dtype=M.dtype) / lifter)[:, np.newaxis]
        M *= lifter_window

    return M

In [695]:
from IPython.display import Audio
Audio(filename=free10x4x4[0]['wav'], autoplay=True)

Plot two examples

In [696]:
import librosa
import librosa.display
fig = plt.figure(figsize=(20,15))
for i in range(2):
    wav = free10x4x4[i]['wav']
    sfr, y = scipy.io.wavfile.read(wav)
    y = y/32768
    print(wav, 'Sampling frequency: ', sfr)
    fig = plt.subplot(4,2,i+1)
    plt.plot(y)

    # Liner frequency spectrogram
    D = librosa.amplitude_to_db(np.abs(librosa.stft(y)), ref=np.max)
    fig = plt.subplot(4,2,i+3)
    librosa.display.specshow(D, y_axis='linear', sr=sfr)
    plt.title('Linear-frequency power spectrogram')

    # Mel-scaled spectrogram (20 bank filters)
    S = mfsc(y, sfr)
    fig = plt.subplot(4,2,i+5)
    librosa.display.specshow(S - S.min())
    plt.title('Mel-scaled power spectrogram')

    # MFCC(5)
    M = mfsc2mfcc(S)
    fig = plt.subplot(4,2,i+7)
    plt.plot(M[1,:])

Fast DTW version using scipy.spatial.distance.cdist to compute distance and numba (compiled python)

In [697]:
def _traceback(D):
    """ Compute the alignment in a recursive way, starting on the 
    bottom right cell and moving up and left. """
    bottom_right = [D.shape[0] - 1, D.shape[1] - 1] # Position of the bottom right element
    # Start considering the base cases
    if D.shape == (1,1):
        return [bottom_right]
    elif D.shape[0] == 1:
        return _traceback(D[:,:-1]) + [bottom_right]
    elif D.shape[1] == 1:
        return _traceback(D[:-1,:]) + [bottom_right]
    # Move up
    elif D[-2,-1] <= D[-1,-2] and D[-2,-1] <= D[-2,-2]:
        return _traceback(D[:-1,:]) + [bottom_right]
    # Move to the left
    elif D[-1,-2] <= D[-2,-1] and D[-1,-2] <= D[-2,-2]:
        return _traceback(D[:,:-1]) + [bottom_right]
    # Move up and left
    else:
        return _traceback(D[:-1,:-1]) + [bottom_right]

    


from numba import jit
@jit
def dtw(x, y, metric='sqeuclidean', return_path = False, dtw_type = 2, delta = 1):
    """
    Computes Dynamic Time Warping (DTW) of two sequences.
    :param array x: N1*M array
    :param array y: N2*M array
    :param func dist: distance used as cost measure
    """
    #print(x.shape, y.shape)
    r, c = len(x), len(y)

    D = np.zeros((r + 1, c + 1))
    D[0, 1:] = np.inf
    D[1:, 0] = np.inf

    # Initialize the matrix with dist(x[i], y[j])
    D[1:, 1:] = scipy.spatial.distance.cdist(x, y, metric)

    for i in range(r):
        for j in range(c):
            if dtw_type == 2:
                min_prev = min(D[i, j], D[i+1, j], D[i, j+1])
            elif dtw_type == 1:
                min_prev = min(D[i, j], D[i, j-1] + delta, D[i-1, j]+delta)
            #print(x[i].shape, y[j].shape)
            #D[i+1, j+1] = min_prev + np.linalg.norm(x[i]-y[j])**2
            D[i+1, j+1] += min_prev

    if return_path:
        if len(x) == 1:
            path = zeros(len(y)), range(len(y))
        elif len(y) == 1:
            path = range(len(x)), zeros(len(x))
        else:
            path = _traceback(D)
        return D[-1, -1], path
    else:
        return D[-1, -1]




In [698]:
@jit
def dtw_match_anywhere(x, y, metric='sqeuclidean', return_path = False, dtw_type = 2, delta = 1):
    """
    Computes Dynamic Time Warping (DTW) of two sequences.
    :param array x: N1*M array
    :param array y: N2*M array
    :param func dist: distance used as cost measure
    """
    #print(x.shape, y.shape)
    r, c = len(x), len(y)

    D = np.zeros((r + 1, c + 1))
    #D[0, 1:] = np.inf
    #D[0, D.shape[1]//2:] = np.inf
    D[1:, 0] = np.inf

    # Initialize the matrix with dist(x[i], y[j])
    D[1:, 1:] = scipy.spatial.distance.cdist(x, y, metric)
    

    for i in range(r):
        for j in range(c):
            if dtw_type == 2:
                min_prev = min(D[i, j], D[i+1, j], D[i, j+1])
            elif dtw_type == 1:
                min_prev = min(D[i, j], D[i, j-1] + delta, D[i-1, j]+delta)
            #print(x[i].shape, y[j].shape)
            #D[i+1, j+1] = min_prev + np.linalg.norm(x[i]-y[j])**2
            D[i+1, j+1] += min_prev

    if return_path:
        if len(x) == 1:
            path = zeros(len(y)), range(len(y))
        elif len(y) == 1:
            path = range(len(x)), zeros(len(x))
        else:
            path = _traceback(D)
        #return D[-1, -1], path
        return min(D[-1,:]), path
        #return min(D[-1,D.shape[1]//2:]), path
    else:
        #return min(D[-1,D.shape[1]//2:])
        return min(D[-1,:])
        #return D[-1, -1]

In [699]:
from functools import lru_cache

def lev_dist(a, b):
    '''
    This function will calculate the levenshtein distance between two input
    strings a and b
    
    params:
        a (String) : The first string you want to compare
        b (String) : The second string you want to compare
        
    returns:
        This function will return the distnace between string a and b.
        
    example:
        a = 'stamp'
        b = 'stomp'
        lev_dist(a,b)
        >> 1.0
    '''
    
    @lru_cache(None)  # for memorization
    def min_dist(s1, s2):

        if s1 == len(a) or s2 == len(b):
            return len(a) - s1 + len(b) - s2

        # no change required
        norm = np.linalg.norm(a[s1] - b[s2])
        #print(norm)
        if norm == 0:
            return min_dist(s1 + 1, s2 + 1)

        return 1 + min(
            min_dist(s1, s2 + 1),      # insert character
            min_dist(s1 + 1, s2),      # delete character
            min_dist(s1 + 1, s2 + 1),  # replace character
        )

    return min_dist(0, 0)

In [700]:
# Do not show numba warnings
import warnings
from numba.core.errors import NumbaWarning
warnings.simplefilter("ignore", category=NumbaWarning)


In [701]:
a = np.array([1,4,4,5,9,3,1,8,8], dtype=np.float32)[:,np.newaxis]
b = np.array([1,2,3,8,8,3,1,8], dtype=np.float32)[:,np.newaxis]
lev_dist(a,b)
#dtw(a, b, metric='sqeuclidean')

In [702]:
def delta(M):
    ''' Obtain the first derivates of the MFCC'''
    DM = np.zeros(M.shape) # Initialize matrix of the same shape
    for i in range(M.shape[0]):
        if i == 0: # With the first sample, previous can't be used
            DM[i] = (M[i+1] - M[i])/2
        elif i == M.shape[0]-1: # With the last sample, next can't be used
            DM[i] = (M[i] - M[i-1])/2
        else:
            DM[i] = (M[i+1] - M[i-1])/2
    return DM



# Compute MFCC
def get_mfcc(dataset, n_mfcc = 12, lifter = 22, cms=True, cmvn=True, first_der = False, second_der = False, **kwargs):
    mfccs = []
    for sample in dataset:
        sfr, y = scipy.io.wavfile.read(sample['wav'])
        y = y/32768
        S = mfsc(y, sfr, **kwargs)
        # Compute the mel spectrogram
        M = mfsc2mfcc(S, n_mfcc = n_mfcc, lifter = lifter, cms=cms, cmvn=cmvn)
        # Move the temporal dimension to the first index
        M = M.T
        if first_der:
            DM = delta(M)
            M = np.hstack((M, DM))
            if second_der:
                DM2 = delta(DM)
                M = np.hstack((M, DM2))
        mfccs.append(M.astype(np.float32))
    return mfccs

In [703]:
def librosa_mfcss(wavs):
    mfccs = []
    for wav in wavs:
        y, sr = librosa.load(wav['wav'])
        mfcc = librosa.feature.mfcc(y=y, sr=sr)
        mfccs.append(mfcc.T)
        
    return mfccs

from scipy.spatial.distance import euclidean, sqeuclidean

# Word Error Rate (Accuracy)
def wer(test_dataset, ref_dataset=None, same_spk=False, n_mfcc = 12, lifter = 22, cms=True, cmvn=True, 
                        first_der = False, second_der = False, return_path = False):
    # Compute mfcc
    test_mfcc = get_mfcc(test_dataset, n_mfcc=n_mfcc, lifter = lifter, cms=cms, cmvn=cmvn, 
                             first_der = first_der, second_der = second_der)
    #test_mfcc = librosa_mfcss(test_dataset)
    if ref_dataset is None:
        ref_dataset = test_dataset
        ref_mfcc = test_mfcc
    else:
        ref_mfcc = get_mfcc(ref_dataset, n_mfcc=n_mfcc, lifter = lifter, cms=cms, cmvn=cmvn, 
                                first_der = first_der, second_der=second_der)
        #ref_mfcc = librosa_mfcss(ref_dataset)
    
    
    err = 0
    dif_digit_path, same_digit_path = None, None
    
    for i, test in enumerate(test_dataset):
        mincost = np.inf
        minref = None
        minpath = None
        for j, ref in enumerate(ref_dataset):
            if not same_spk and test['speaker'] == ref['speaker']:
                # Do not compare with refrence recordings of the same speaker
                continue
            if test['wav'] != ref['wav']:
                distance, path = dtw(test_mfcc[i], ref_mfcc[j], return_path = return_path, metric = "sqeuclidean", dtw_type = 2)
                #distance = lev_dist(test_mfcc[i], ref_mfcc[j])
                #path = 1
                if distance < mincost:
                    mincost = distance
                    minpath = path
                    minref = ref
        if test['text'] != minref['text']: # Different digits
            err += 1
            dif_digit_path = minpath
            dif_digit_dist = mincost
        else: # Same digits
            same_digit_path = minpath
            same_digit_cost = mincost
    wer = 100*err/len(test_dataset)
    
    if return_path:
        return wer, dif_digit_path, dif_digit_dist, same_digit_path, same_digit_cost
    else:
        return wer



# Speaker Error Rate
def ser(test_dataset, ref_dataset=None, same_spk=False):
    # Compute mfcc
    test_mfcc = get_mfcc(test_dataset)
    if ref_dataset is None:
        ref_dataset = test_dataset
        ref_mfcc = test_mfcc
    else:
        ref_mfcc = get_mfcc(ref_dataset)
        
    err = 0
    for i, test in enumerate(test_dataset):
        mincost = np.inf
        minref = None
        for j, ref in enumerate(ref_dataset):
            if not same_spk and test['speaker'] == ref['speaker']:
                # Do not compare with refrence recordings of the same speaker
                continue
            if test['wav'] != ref['wav']:
                distance = dtw(test_mfcc[i], ref_mfcc[j])
                if distance < mincost:
                    mincost = distance
                    minref = ref
        if test['speaker'] != minref['speaker']:
            err += 1

    wer = 100*err/len(test_dataset)
    return wer

In [704]:
# Free Spoken Digit Dataset
#print(f'WER including reference recordings from the same speaker: {wer(free10x4x4, same_spk=True):.1f}%')

In [705]:
# Google Speech Commands Dataset (small digit subset)
#print(f'WER using only reference recordings from other speakers: {wer(commands10x100, commands10x10):.1f}%')

In [706]:
# Speech Error Rate of Free Spoken Digit Dataset
#print(f'SER including reference recordings from the same speaker: {ser(free10x4x4, same_spk=True):.1f}%')

## Study of the number of cepstral coefficients

In [707]:
#WER using only reference recordings from other speakers
#wer_list = []
#for n_mfcc in range(25):
#    wer_list.append(wer(commands10x100, commands10x10, n_mfcc = n_mfcc))

#plt.plot(list(range(25)),wer_list)
#plt.xlabel('Number cepstrum coefficients')
#plt.ylabel('WER')
#plt.title("WER using only reference recordings from other speakers")
#plt.show()

In [708]:
#WER including reference recordings from the same speaker
#wer_list2 = []
#for n_mfcc in range(25):
#    wer_list2.append(wer(free10x4x4, same_spk=True, n_mfcc = n_mfcc))

#plt.plot(list(range(25)),wer_list2)
#plt.xlabel('Number cepstrum coefficients')
#plt.ylabel('WER')
#plt.title("WER including reference recordings from the same speaker")
#plt.show()

## Liftering, mean and variance analysis

In [709]:
# WER using only reference recordings from other speakers

#lifter_list = [0, 22]
#cms_list = [True, False]
#cmvn_list = [True, False]
#for lifter in lifter_list:
#    for cms in cms_list:
#        for cmvn in cmvn_list:
#            w = wer(commands10x100, commands10x10, n_mfcc = 9, lifter = lifter, cms=cms, cmvn=cmvn)
#            print(f"Lifter: {lifter}, cms: {cms}, cmvn: {cmvn}. Obtained wer: {w}")



In [710]:
# WER including reference recordings from the same speaker
#lifter_list = [0, 22]
#cms_list = [True, False]
#cmvn_list = [True, False]
#for lifter in lifter_list:
#    for cms in cms_list:
#        for cmvn in cmvn_list:
#            w = wer(free10x4x4, same_spk=True, n_mfcc = 9, lifter = lifter, cms=cms, cmvn=cmvn)
#            print(f"Lifter: {lifter}, cms: {cms}, cmvn: {cmvn}. Obtained wer: {w}")

## Adding first derivate

In [711]:
# WER using only reference recordings from other speakers

#lifter_list = [0, 22]
#cms_list = [True, False]
#cmvn_list = [True, False]
#for lifter in lifter_list:
#    for cms in cms_list:
#        for cmvn in cmvn_list:
#            w = wer(commands10x100, commands10x10, n_mfcc = 9, lifter = lifter, cms=cms, cmvn=cmvn, first_der = True)
#            print(f"Lifter: {lifter}, cms: {cms}, cmvn: {cmvn}. Obtained wer: {w}")

In [712]:
# WER using only reference recordings from other speakers

#lifter_list = [0, 22]
#cms_list = [True, False]
#cmvn_list = [True, False]
#for lifter in lifter_list:
#    for cms in cms_list:
#        for cmvn in cmvn_list:
#            w = wer(commands10x100, commands10x10, n_mfcc = 9, lifter = lifter, cms=cms, cmvn=cmvn, first_der = True, second_der = True)
#            print(f"Lifter: {lifter}, cms: {cms}, cmvn: {cmvn}. Obtained wer: {w}")

## Print pathes

In [713]:
w, dif_digit_path, dif_digit_dist, same_digit_path, same_digit_dist = wer(commands10x100, commands10x10, n_mfcc = 9, lifter = 22, cms=True, 
                                         cmvn=True, first_der = True, second_der = True, return_path = True)
print(w)

In [714]:
# Plot paths from a different digit
numpy_path = np.transpose(np.array(dif_digit_path))
print(dif_digit_dist)
plt.plot(numpy_path[0], numpy_path[1])
plt.title("Alignment from two different digits")
plt.show()

In [715]:
# Plot paths from the same digit
numpy_path = np.transpose(np.array(same_digit_path))
print(same_digit_dist)
plt.plot(numpy_path[0], numpy_path[1])
plt.title("Alignment from two samples of the same digit")
plt.show()

## Test

In [716]:
test_wavs = []
for filename in os.listdir(TEST):
    test_wavs.append({
        'wav': TEST + '/' + filename
    })

In [717]:


def test(test_wavs, ref_wavs, n_mfcc = 12, lifter = 22, cms=True, cmvn=True, 
                        first_der = False, second_der = False, return_path = False):
    pred = []
    test_mfccs = librosa_mfcss(test_wavs)
    ref_mfccs = librosa_mfcss(ref_wavs)
    #test_mfccs = get_mfcc(test_wavs, n_mfcc=n_mfcc, lifter = lifter, cms=cms, cmvn=cmvn, 
    #                         first_der = first_der, second_der = second_der)
    #ref_mfccs = get_mfcc(ref_wavs, n_mfcc=n_mfcc, lifter = lifter, cms=cms, cmvn=cmvn, 
    #                         first_der = first_der, second_der = second_der)
    for i in range(len(test_mfccs)):
        mincost = np.inf
        jmin = -1
        for j in range(len(ref_mfccs)):
            distance = dtw(test_mfccs[i], ref_mfccs[j])
            if distance < mincost:
                mincost = distance
                jmin = j
        pred.append(ref_wavs[jmin]['text'])
        if i<10:
            print(f'{i:3}/{len(test_mfccs)}: {pred[i]}')
    return pred



In [718]:
# Use the two labeled 'Speech Commands' datasets (commands10x10 + commands10x100) as reference templates for the test prediction 
#pred = test(test_wavs, commands10x10 + commands10x100)
pred = test(test_wavs, commands10x10 + commands10x100, n_mfcc = 9, lifter = 22, cms=True, 
                                         cmvn=True, first_der = True, second_der = True, return_path = False)

In [719]:
with open('submission.csv', 'w') as f:
    print('filename,command', file=f)
    for entry, command in zip(test_wavs, pred):
        filename = entry['wav'].split('/')[-1].split('.')[0]
        print(f'{filename},{command}', file=f)