Required libraries

In [43]:
# display and threading libraries
from IPython.display import display, clear_output
import ipywidgets as widgets
from threading import Thread

# audio libraries
# import pyaudio # I hate this library
import sounddevice as sd
import torch
import torchaudio
from scipy.io.wavfile import write as writeAudio
from speechbrain.inference.classifiers import EncoderClassifier
from speechbrain.inference.enhancement import SpectralMaskEnhancement

# supporting libraries
import time
from queue import Queue
import json
import numpy as np
import scipy as sc
import random
import os

Initialized parameters

In [44]:
# core parameters in env file

env_jsonFile = open("env.json", "r")
env = json.load(env_jsonFile)
print(env)
env_jsonFile.close()

# ui parameters
recordRecog_button = widgets.Button(
    description='Record recognization',
    disabled=False,
    button_style='success',
    tooltip='Record',
    icon='microphone'
)

recordAdd_button = widgets.Button(
    description='Record add dataset',
    disabled=False,
    button_style='success',
    tooltip='Record',
    icon='microphone'
)

stop_button = widgets.Button(
    description='Stop',
    disabled=False,
    button_style='warning',
    tooltip='Stop',
    icon='stop'
)

audio_enhancer = SpectralMaskEnhancement.from_hparams(
    source = "speechbrain/metricgan-plus-voicebank"
)

audio_classifier = EncoderClassifier.from_hparams(
    source="speechbrain/spkrec-ecapa-voxceleb"
)

{'CHANNELS': 1, 'FRAME_RATE': 44100, 'RECORD_SECONDS': 5, 'DATASET': 'voice_dataset.json', 'AUDIO_DIRECTORY': 'voice_dataset/', 'EMBED_THRESHOLD': 0.28, 'USER_THRESHOLD': 0.5, 'TEMP_AUDIOCHECK': False, 'TEMP_DIRECTORY': 'voiceClass_temp/', 'USERNAME': 'ID1'}


Supporting functions

In [45]:
def generate_randStr(length=15):
    if length <= 0:
        return None
    
    temp = ""
    for _ in range(length):
        a = random.randint(0, 61)
        ch = None
        if a < 26:
            ch = chr(a + ord('A'))
        elif a < 52:
            ch = chr(a - 26 + ord('a'))
        else:
            ch = chr(a - 52 + ord('0'))

        temp += str(ch)
    
    return "audio_"+ temp + "_" + str(int(time.time()*10000))

Core functions

In [46]:
def process_audio(recording):
    # convert TEMP_REC into 1d vector
    recording = np.array(recording).flatten()

    # save to a temporary file
    temp_file = generate_randStr() + ".wav"
    if not os.path.exists(env["TEMP_DIRECTORY"]):
        os.mkdir(env["TEMP_DIRECTORY"])
    writeAudio(env["TEMP_DIRECTORY"] + temp_file, env["FRAME_RATE"], recording)

    # enhance audio with a Spectral Mask model
    audio_enhancer.enhance_file(filename=env["TEMP_DIRECTORY"] + temp_file, 
                                output_filename=env["TEMP_DIRECTORY"] + temp_file)

    return temp_file

def add_audio(audio_data, username):
    # open dataset
    datasetFile = open(env["DATASET"], "r")
    dataset = json.load(datasetFile)
    datasetFile.close()

    # check user exists in dataset
    if username not in dataset:
        dataset[username] = {"num_recordings": 0, "recordings":[]}

    # get credentials
    user_recdir = env["AUDIO_DIRECTORY"] + username + "/"
    audio_name = username + "_" + str(dataset[username]["num_recordings"]) + ".wav"

    # update change
    if not os.path.exists(user_recdir):
        os.mkdir(user_recdir)
    writeAudio(user_recdir + audio_name, env["FRAME_RATE"], np.array(audio_data).flatten())
    dataset[username]["num_recordings"] += 1
    dataset[username]["recordings"].append(audio_name)

    # update dataset file
    datasetFile = open(env["DATASET"], "w")
    json.dump(dataset, datasetFile, indent=4)
    datasetFile.close()


def recog_audio(audio_data, username):
    # save to a temporary file
    temp_file = generate_randStr() + ".wav"
    if not os.path.exists(env["TEMP_DIRECTORY"]):
        os.mkdir(env["TEMP_DIRECTORY"])
    writeAudio(env["TEMP_DIRECTORY"] + temp_file, env["FRAME_RATE"], audio_data)

    audio_data, _ = torchaudio.load(env["TEMP_DIRECTORY"] + temp_file)

    # embed the recording
    embed_this = audio_classifier.encode_batch(audio_data).flatten()

    # in this case, we read dataset and filter only the records that match (threshold, cosine)
    datasetFile = open(env["DATASET"], "r")
    data_file = json.load(datasetFile)
    datasetFile.close()

    counter = 0
    for file_name in data_file[username]["recordings"]:
        signal, _ = torchaudio.load(env["AUDIO_DIRECTORY"] + username + "/" + file_name)
        embed_signal = audio_classifier.encode_batch(signal).flatten()

        prob = 1 - sc.spatial.distance.cosine(embed_signal, embed_this)
        display(str(prob))
        if prob > env["EMBED_THRESHOLD"]:
            counter += 1

    if counter >= env["USER_THRESHOLD"] * data_file[username]["num_recordings"]:
        display("Recognize user " + username + ": True")
    else:
        display("Recognize user " + username + ": False")

UI functions

In [47]:
output = widgets.Output()

def record_microphone():
    env["TEMP_AUDIOCHECK"] = False

    # record audio
    display("Recording...")
    temp_rec = sd.rec(int(env["RECORD_SECONDS"] * env["FRAME_RATE"]), 
                      samplerate=env["FRAME_RATE"], channels=env["CHANNELS"])
    sd.wait() #buffer
    
    display("Recording completed.")
    
    return temp_rec

def recogAudio_main(data):
    with output:
        temp_rec = record_microphone()

        if not env["TEMP_AUDIOCHECK"]: # to prevent threads colliding
            env["TEMP_AUDIOCHECK"] = True

            recog_audio(audio_data=temp_rec, username=env["USERNAME"])

def addAudio_main(data):
    with output:
        temp_rec = record_microphone()

        if not env["TEMP_AUDIOCHECK"]:
            env["TEMP_AUDIOCHECK"] = True

            #temp_file = process_audio(temp_rec)
            add_audio(audio_data = temp_rec, username=env["USERNAME"])

def stop_recording(data):
    with output:
        sd.stop() # stop recording
        sd.wait()
        display("Recording interrupted.")
        #main_job()

Execution: User input

In [48]:
clear_output(wait=True)

recordRecog_button.on_click(recogAudio_main)
recordAdd_button.on_click(addAudio_main)
#stop_button.on_click(stop_recording) # currently this doesn't work

env["USERNAME"] = input("Username: ")

display(recordRecog_button, recordAdd_button, output)


Button(button_style='success', description='Record recognization', icon='microphone', style=ButtonStyle(), too…

Button(button_style='success', description='Record add dataset', icon='microphone', style=ButtonStyle(), toolt…

Output()