In [1]:
import os
from scipy.io import wavfile as wav
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import RBF
from collections import defaultdict
from sklearn.mixture import GaussianMixture
import sounddevice as sd
import numpy as np
from python_speech_features import *
import random
import time
import pickle
from matplotlib import pyplot as plt



In [2]:
person_list = ['p1', 'p2', 'p3', 'p4', 'p5']
directory_list = ['number', 'sentence', 'word']
data_path = 'data/final_data/'

trainset = {}
trainset['number'] = random.sample(range(10), 5)
trainset['sentence'] = random.sample(range(30), 15)
trainset['word'] = random.sample(range(20), 10)

testset = {}
testset['number'] = [x for x in range(10) if not x in trainset['number']]
testset['sentence'] = [x for x in range(30) if not x in trainset['sentence']]
testset['word'] = [x for x in range(20) if not x in trainset['word']]

def calculate_nfft(samplerate, winlen):
    window_length_samples = winlen * samplerate
    nfft = 1
    while nfft < window_length_samples:
        nfft *= 2
    return nfft

def preprocess(energy, feat):
    bound = 1.5 * sum(energy[:100]) / 100
    ret = []
    for i in range(len(energy)):
        if energy[i] >= bound:
            ret.append(np.log(feat[i, :]))
    return np.array(ret)

In [3]:
person_feature = defaultdict(list)
for person in person_list:
    for directory in directory_list:
        filelist = os.listdir('{}{}/{}/'.format(data_path, person, directory))
        for i in trainset[directory]:
            signal = np.load('{}{}/{}/{}'.format(data_path, person, directory, filelist[i]))
            fs = 44100
            feat, energy = fbank(signal, samplerate = fs, nfft=calculate_nfft(fs, 0.025))
            ffeat = preprocess(energy, feat)
            person_feature[person].extend(ffeat)

In [4]:
person_gmms = []
start_time = time.time()

for name, feats in person_feature.items():
    gmm = GaussianMixture(n_components=20, max_iter = 10000)
    gmm.fit(feats, name)
    person_gmms.append(gmm)

print(time.time() - start_time)

6.403000116348267


In [5]:
cnt = 0
vec_list = []
features = defaultdict(list)
for person in person_list:
    for directory in directory_list:
        filelist = os.listdir('{}{}/{}/'.format(data_path, person, directory))
        for i in testset[directory]:
            signal = np.load('{}{}/{}/{}'.format(data_path, person, directory, filelist[i]))
            fs = 44100
            feat, energy = fbank(signal, samplerate = fs, nfft=calculate_nfft(fs, 0.025))
            ffeat = preprocess(energy, feat)
            a = [pow(e, np.sum(gmm.score(ffeat)) / len(ffeat)) for gmm in person_gmms]
            #print([gmm.score_samples(ffeat).shape for gmm in person_gmms])
            if int(person[1]) == a.index(max(a)) + 1:
                cnt += 1
            print(person, a.index(max(a)), filelist[i])
print(cnt)

p1 0 Ashley_1528934.mp3.npy
p1 0 Ashley_2951876.mp3.npy
p1 0 Ashley_5960178.mp3.npy
p1 0 Ashley_7429316.mp3.npy
p1 0 Ashley_9675431.mp3.npy
p1 0 Ashley_All_I_need_is_some_rest.mp3.npy
p1 0 Ashley_Are_you_done_with_the_report.mp3.npy
p1 0 Ashley_Can_you_give_me_a_chance.mp3.npy
p1 0 Ashley_Even_miracles_take_a_little_time.mp3.npy
p1 0 Ashley_I_am_just_about_to_go_to_bed.mp3.npy
p1 0 Ashley_I_ate_his_liver_with_some_fava_beans_and_a_nice_Chianti.mp3.npy
p1 0 Ashley_I_don’t_think_we_are_in_Kansas_anymore.mp3.npy
p1 0 Ashley_I_want_to_access.mp3.npy
p1 0 Ashley_That_is_why_I´m_so_tired.mp3.npy
p1 0 Ashley_What_do_you_usually_do_in_your_free_time.mp3.npy
p1 0 Ashley_What_do_you_want_to_do_today.mp3.npy
p1 0 Ashley_Which_one_do_you_want.mp3.npy
p1 0 Ashley_Why_are_you_always_putting_me_down.mp3.npy
p1 0 Ashley_Would_you_please_be_quiet.mp3.npy
p1 0 Ashley_You're_never_wrong_to_do_the_right_things.mp3.npy
p1 0 Ashley_education.mp3.npy
p1 0 Ashley_explanation.mp3.npy
p1 0 Ashley_homogeneous.mp

In [None]:
def calculate_nfft(samplerate, winlen):
    window_length_samples = winlen * samplerate
    nfft = 1
    while nfft < window_length_samples:
        nfft *= 2
    return nfft

def preprocess(energy, feat):
    bound = 1.5 * sum(energy[:100]) / 100
    ret = []
    for i in range(len(energy)):
        if energy[i] >= bound:
            ret.append(np.log(feat[i, :]))
    return np.array(ret)

def log_text(string):
    print("> ML: " + string)
    with open("log.txt", "a") as f:
        f.write("> ML: " + string + "\n")
        f.close()
    return

def load_list(path_and_file):
    with open(path_and_file, "r") as f:
        output_list = eval(f.readline())
        f.close()
    return output_list

def save_list(path_and_file, input_list):
    with open(path_and_file, "w") as f:
        f.write(str(input_list))
        f.close()
    return
person_list = load_list('ML/model/trained_list.txt')
def mkdir(path, directory):        
    if (not os.path.exists(path + directory)) or (not os.path.isdir(path + directory)):
        os.mkdir(path + directory)
    return

def listdir(path):
    file_list = os.listdir(path)
    if '.ipynb_checkpoints' in file_list:
        file_list.remove('.ipynb_checkpoints')
    return file_list
with open('ML/model/model', 'rb') as f:
    person_gmms = pickle.load(f)
    f.close()
        
predict_list = []
elements = []
filelist = listdir('ML/test_data/')
for file in filelist:
    signal = np.load('ML/test_data/{}'.format(file))
    fs = 44100
    feat, energy = fbank(signal, samplerate = fs, nfft=calculate_nfft(fs, 0.025))
    ffeat = preprocess(energy, feat)
    if len(ffeat) == 0:
        continue
    out = [pow(2.71, np.sum(gmm.score(ffeat)) / len(ffeat)) for gmm in person_gmms]
    output = out.index(max(out)) + 1
    predict_list.append(output)
    if not output in elements:
        elements.append(output)
print('Test result: {}'.format(predict_list))
ret = 0
for e in elements:
    if predict_list.count(e) / len(predict_list) > 0.6:
        ret = e
print(ret)

In [None]:
lll = os.listdir('ML/test_data/')

In [None]:
s1 = np.load('ML/test_data/' + lll[0])
s2 = np.load('ML/test_data/' + lll[1])