# Try triplet loss for speech command

In [2]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import hashlib
import math, time, datetime
import os.path
import random
import re
import sys
import tarfile

#print(sys.executable)
import matplotlib.pyplot as plt
import numpy as np
import librosa as rosa
import librosa.display
from six.moves import urllib
from six.moves import xrange  # pylint: disable=redefined-builtin
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, Dense, Conv2D, MaxPooling2D, Dropout, Flatten, Lambda, BatchNormalization, Activation, LSTM, GRU
#from tensorflow.contrib.framework.python.ops import audio_ops as contrib_audio
#from tensorflow.python.ops import io_ops
#from tensorflow.python.platform import gfile
#from tensorflow.python.util import compat

default_number_of_mfcc=128
default_sample_rate=16000
default_hop_length=512 
default_wav_duration=1 # 1 second
default_train_samples=10000
default_test_samples=100
default_epochs=10
default_batch_size=32
default_wanted_words=["one", "two", "bed", "backward", "bird", "cat", "dog", "eight", "five", "follow", "forward", "four", "go", "happy", "house", "learn", "left", "marvin", "nine", "no", "off", "right", "seven", "sheila", "stop", "three", "tree", "visual", "wow", "zero","up"]
#for mac
#speech_data_dir="/Users/hermitwang/Downloads/speech_dataset"
#default_model_path="/Users/hermitwang/Downloads/pretrained/speech_siamese"
#for ubuntu
#speech_data_dir="/home/hermitwang/TrainingData/datasets/speech_dataset"
#default_model_path="/home/hermitwang/TrainingData/pretrained/speech_siamese"
#for windows of work

default_feature_dim = 4096

speech_data_dir="/home/zhangjun/tensorflow/speech_siamese_zj/speech_dataset"
default_model_path="/home/zhangjun/tensorflow/speech_siamese_zj/trained"

In [3]:
def load_wav_mfcc(filename):
    wav_loader, sample_rate = rosa.load(filename, sr=default_sample_rate)
    #print(rosa.get_duration(wav_loader, sample_rate))
    wav_mfcc = rosa.feature.mfcc(y=wav_loader, sr=default_sample_rate, n_mfcc=default_number_of_mfcc)
    return wav_mfcc

def get_default_mfcc_length(default_wav_duration=1):
    length = int(math.ceil(default_wav_duration * default_sample_rate / default_hop_length))
    return length


In [4]:
class WavMFCCLoader(object):
    def __init__(self, data_dir, wanted, validation_percentage=0, testing_percentage=0):
        self.data_dir = data_dir
        self.wanted = wanted
        self.default_mfcc_length=get_default_mfcc_length(default_wav_duration)
        self.wav_files = dict()
        self.wav_file_index()
        
    def wav_file_index(self):
        for dirpath, dirnames, files in os.walk(self.data_dir):
            for name in files:
                if name.lower().endswith('.wav'):
                    word_name = dirpath.rsplit('/', 1)[1];
                    if word_name in self.wanted:
                        file_name = os.path.join(dirpath, name)
                        #print(file_name, dirpath, word_name)
    
                        if word_name in self.wav_files.keys():
                            self.wav_files[word_name].append(file_name)
                        else:
                            self.wav_files[word_name] = [file_name]
                    
        return self.wav_files


    def wavs_to_mfcc_pair(self):
        how_many_words = len(self.wanted)
        a_index = random.randint(0, how_many_words - 1)
        b_index = random.randint(0, how_many_words - 1)
        a_wav_index = b_wav_index = -1
        mfcc_pair = np.array([3, 1])
        if (a_index > b_index):
            a_wav_index = random.randint(0, len(self.wav_files[self.wanted[a_index]]) - 1)
            b_wav_index = random.randint(0, len(self.wav_files[self.wanted[b_index]]) - 1)
            mfcc_1 = load_wav_mfcc(self.wav_files[self.wanted[a_index]][a_wav_index])
            mfcc_2 = load_wav_mfcc(self.wav_files[self.wanted[b_index]][b_wav_index])
            mfcc_pair = 0            
        else:
            a_wav_index = random.randint(0, len(self.wav_files[self.wanted[a_index]]) - 1)
            b_wav_index = random.randint(0, len(self.wav_files[self.wanted[a_index]]) - 1)
            mfcc_1 = load_wav_mfcc(self.wav_files[self.wanted[a_index]][a_wav_index])
            mfcc_2 = load_wav_mfcc(self.wav_files[self.wanted[a_index]][b_wav_index])
            mfcc_pair = 1
            
        #print("aaa", mfcc_1.shape, mfcc_2.shape)    
        return mfcc_1, mfcc_2, mfcc_pair
        
    def get_mfcc_pairs(self, how_many):
        mfcc1_data = np.zeros((how_many, default_number_of_mfcc, self.default_mfcc_length))
        mfcc2_data = np.zeros((how_many, default_number_of_mfcc, self.default_mfcc_length))
        same_data = np.zeros(how_many)
        for i in range(0, how_many - 1):
            
            mfcc1_data_, mfcc2_data_, same_data[i] = self.wavs_to_mfcc_pair()
            mfcc1_data[i, :, 0:mfcc1_data_.shape[1]] = mfcc1_data_
            mfcc2_data[i, :, 0:mfcc2_data_.shape[1]] = mfcc2_data_
            #np.append(mfcc1_data, mfcc1_)
            #np.append(mfcc2_data, mfcc2_)
            #np.append(same_data, same_)          
        #print(mfcc_pairs)
        return mfcc1_data, mfcc2_data, same_data
 
    def get_mfccs(self, how_many):
        mfccs = np.zeros((how_many, default_number_of_mfcc, self.default_mfcc_length))
        mfcc_words = []
        words_num = len(self.wanted)
        for i in range(how_many):
            word_index = random.randint(0, words_num - 1)
            wav_index = random.randint(0, len(self.wav_files[self.wanted[word_index]]) - 1)
            mfcc_ = load_wav_mfcc(self.wav_files[self.wanted[word_index]][wav_index])
            mfccs[i, :, 0:mfcc_.shape[1]] = mfcc_
            mfcc_words.append(self.wanted[word_index])
        return mfccs, mfcc_words

    def get_mfcc_triplet(self, how_many):
        how_many_words = len(self.wanted)
        #n = min([len(self.wav_files[self.wanted[d]]) for d in range(how_many_words)]) - 1
        n = how_many
        sample_number = how_many_words * n
        print("each words select:", n)
        print("words number:", how_many_words)
        print("total sample number:", sample_number)
        anchor_data = np.zeros((sample_number, default_number_of_mfcc, self.default_mfcc_length))
        positive_data = np.zeros((sample_number, default_number_of_mfcc, self.default_mfcc_length))
        negative_data = np.zeros((sample_number, default_number_of_mfcc, self.default_mfcc_length))
        
        index = 0
        anchor_words = []
        for d in range(how_many_words):
            for i in range(n):
                np.random.shuffle(self.wav_files[self.wanted[d]])
                anchor_index = self.wav_files[self.wanted[d]][i]
                positive_index = self.wav_files[self.wanted[d]][i + 1]
                inc = random.randrange(1, how_many_words)
                dn = (d + inc) % how_many_words
                negative_index = self.wav_files[self.wanted[dn]][i]
                
                anchor_ = load_wav_mfcc(anchor_index)
                positive_ = load_wav_mfcc(positive_index)
                negative_ = load_wav_mfcc(negative_index)
                
                anchor_data[index, :, 0:anchor_.shape[1]] = anchor_
                positive_data[index, :, 0:positive_.shape[1]] = positive_
                negative_data[index, :, 0:negative_.shape[1]] = negative_
                
                index = index + 1
                anchor_words.append(self.wanted[d])
        return anchor_data, positive_data, negative_data, anchor_words
            
loader = WavMFCCLoader(speech_data_dir, wanted=["one", "two", "bed", "backward", "bird", "cat", "dog", "eight", "five", "follow", "forward", "four", "go", "happy", "house", "learn", "left", "marvin", "nine", "no", "off", "right", "seven", "sheila", "stop", "three", "tree", "visual", "wow", "zero"])
a, p, n, w = loader.get_mfcc_triplet(2)
print(w)
print(a.shape)

each words select: 2
words number: 30
total sample number: 60
['one', 'one', 'two', 'two', 'bed', 'bed', 'backward', 'backward', 'bird', 'bird', 'cat', 'cat', 'dog', 'dog', 'eight', 'eight', 'five', 'five', 'follow', 'follow', 'forward', 'forward', 'four', 'four', 'go', 'go', 'happy', 'happy', 'house', 'house', 'learn', 'learn', 'left', 'left', 'marvin', 'marvin', 'nine', 'nine', 'no', 'no', 'off', 'off', 'right', 'right', 'seven', 'seven', 'sheila', 'sheila', 'stop', 'stop', 'three', 'three', 'tree', 'tree', 'visual', 'visual', 'wow', 'wow', 'zero', 'zero']
(60, 128, 32)


In [5]:
def create_keras_model(fingerprint_shape, is_training=True):
    model = Sequential()
    model.add(Conv2D(input_shape=fingerprint_shape, filters=64, kernel_size=3, use_bias=False))
    model.add(BatchNormalization())
    model.add(Activation("relu"))
    model.add(MaxPooling2D())
    #if (is_training):
    #    model.add(Dropout(0.5))
    model.add(Conv2D(filters=64, kernel_size=3, use_bias=False)) 
    model.add(BatchNormalization())
    model.add(Activation("relu"))

    model.add(MaxPooling2D())
    #if (is_training):
    #    model.add(Dropout(0.5))
    model.add(Conv2D(filters=64, kernel_size=3, use_bias=False))
    model.add(BatchNormalization())
    model.add(Activation("relu"))

    model.add(MaxPooling2D())
    
    model.add(Flatten())
    model.add(Dense(default_feature_dim))
    model.add(BatchNormalization())
    model.add(Activation("sigmoid"))    
    if (is_training):
        model.add(Dropout(0.5))
    #model.add(Dense(labels_count, activation="softmax"))
    
    return model

In [6]:
def create_siamese_triplet_model(input_shape):
    anchor_input = Input(input_shape)
    positive_input = Input(input_shape)
    negative_input = Input(input_shape)
    
    keras_model = create_keras_model(input_shape)
    
    anchor_encoder = keras_model(anchor_input)
    positive_encoder = keras_model(positive_input)
    negative_encoder = keras_model(negative_input)
    
    merged_vector = []
    merged_vector.append(anchor_encoder)
    merged_vector.append(positive_encoder)
    merged_vector.append(negative_encoder)
    
    siamese_triplet_model = Model(inputs = [anchor_input, positive_input, negative_input], outputs = merged_vector)
    return siamese_triplet_model, keras_model

In [7]:
def triplet_loss_1(y_true, y_pred, alpha = 0.2):
    anchor, positive, negative = y_pred[0], y_pred[1], y_pred[2]
    pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, positive)), -1)
    neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, negative)), -1)
    basic_loss = tf.add(tf.subtract(pos_dist, neg_dist), alpha)
    loss = tf.reduce_sum(tf.maximum(basic_loss, 0.0))
    return loss

def triplet_loss(y_true, y_pred, N = default_feature_dim, beta = default_feature_dim, epsilon = 1e-8):
    anchor, positive, negative = y_pred[0], y_pred[1], y_pred[2]
    pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, positive)), -1)
    neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, negative)), -1)
    
    # -ln(-x/N + 1)
    pos_dist = -tf.log(-tf.divide((pos_dist), beta) + 1 + epsilon)
    neg_dist = -tf.log(-tf.divide((N - neg_dist), beta) + 1 + epsilon)
    
    loss = neg_dist + pos_dist
    return loss

In [8]:
def make_feature_dataset(anchor_words, x1_train, base_model):
    feature_dic = dict()
    index = 0
    for word in anchor_words:
        if not word in feature_dic.keys():
            feature_ = base_model.predict(x1_train[index].reshape(1, default_number_of_mfcc, default_mfcc_length, 1))
            feature_dic[word] = feature_[0, :]
        index = index + 1
    return feature_dic

def siamese_triplet_train(train_sample=default_train_samples, wanted_words=default_wanted_words):
    default_mfcc_length = get_default_mfcc_length(default_wav_duration)
    siamese_triplet_model, base_model = create_siamese_triplet_model((default_number_of_mfcc, default_mfcc_length, 1))
    siamese_triplet_model.compile(loss = triplet_loss, optimizer = 'adam')
    
    loader = WavMFCCLoader(speech_data_dir, wanted = wanted_words)
    mfcc1_data, mfcc2_data, mfcc3_data, anchor_words = loader.get_mfcc_triplet(train_sample)
    
    x1_train = mfcc1_data.reshape((mfcc1_data.shape[0], default_number_of_mfcc, default_mfcc_length, 1))
    x2_train = mfcc1_data.reshape((mfcc2_data.shape[0], default_number_of_mfcc, default_mfcc_length, 1))
    x3_train = mfcc1_data.reshape((mfcc3_data.shape[0], default_number_of_mfcc, default_mfcc_length, 1))
    
    y_train_ = np.zeros((mfcc1_data.shape[0], default_feature_dim))
    y_train = []
    y_train.append(y_train_)
    y_train.append(y_train_)
    y_train.append(y_train_)
    
    siamese_triplet_model.fit(x = [x1_train, x2_train, x3_train], y = y_train, epochs = default_epochs, batch_size = default_batch_size)
    
    feature_dic = make_feature_dataset(anchor_words, x1_train, base_model)   
    
    base_model.save(default_model_path + "\speech_siamese_triplet_base" + str(datetime.date.today()) + ".h5")
    
    return feature_dic

In [None]:
print("train")
feature_dic = siamese_triplet_train(train_sample=2, wanted_words=["one", "two", "cat", "dog", "bed", "backward", "eight", "five", "follow", "forward", "four", "go", "happy", "house", "learn", "left", "marvin", "nine", "no", "off", "right", "seven", "sheila", "stop", "three", "tree", "visual", "wow", "zero","up"])

train
each words select: 2
words number: 30
total sample number: 60
Epoch 1/10


In [9]:
def feature_distance(feature1, feature2):
    dist = tf.reduce_sum(tf.square(tf.subtract(feature1, feature2)), -1)
    result = tf.Session().run(dist)
    return result

def match_siamese_triplet_feature(feature_predict, word_label_predict, feature_dic):
    print("prdict word:", word_label_predict)
    for word in feature_dic.keys():
        distance = feature_distance(feature_predict, feature_dic[word])
        print("dictionary word:", word, "distance:", distance)
    return

def siamese_triplet_test(test_sample = default_test_samples, wanted_words = default_wanted_words, feature_dic = dict()):
    default_mfcc_length = get_default_mfcc_length(default_wav_duration)
    base_model = keras.models.load_model(default_model_path + "\speech_siamese_triplet_base" + str(datetime.date.today()) + ".h5")
    
    loader = WavMFCCLoader(speech_data_dir, wanted = wanted_words)
    mfccs_test, words_test = loader.get_mfccs(test_samples)
    
    x_test = mfccs_test.reshape((test_samples, default_number_of_mfcc, default_mfcc_length, 1))
    y_test = words_test
    
    features_predict = base_model.predict_on_batch(x_test)
    for i in range(len(features_predict)):
        match_result = match_siamese_triplet_feature(features_predict[i], y_test[i], feature_dic)
    return

In [10]:
print("test")
siamese_triplet_test(test_samples = 1, wanted_words = ["five", "follow"], feature_dic = feature_dic)

test


NameError: name 'feature_dic' is not defined