In [49]:
# remove stupid fucking warnings
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import matplotlib.pyplot as plt
import os
import librosa
import IPython.display as ipd
import math
import json
from sklearn.model_selection import train_test_split
import tensorflow.keras as keras
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from keras.layers import Conv2D, LeakyReLU
from keras_visualizer import visualizer 

In [50]:
DATASET_PATH = "Training-Audio"
TRAIN_JSON_PATH = "Json\\data.json"
SAMPLE_RATE = 22050
PREDICT_JSON_PATH = "Json\\predict.json"
#threshold = 80
#waitAfter = 150

In [51]:
def fftAndSum(signal):
    
    #Transform waveform into frequency domain using fast Fourier transform
    X = np.fft.fft(signal)
    
    #Get 'loudness' of the 10 ms interval
    X_mag = np.absolute(X)
    sum_of_fft = sum(X_mag)
    return sum_of_fft

In [52]:
def save_data(dataset_path, json_path, threshold, waitAfter, n_mfcc=13, n_fft=2048, hop_length=512):
    #dictionary to store data
    data = {
        "mapping": [],
        "mfcc": [],
        "labels": []
    }
    
    #number of mfcc vectors per segment is supposed to be an integer number
    #'0.12' is 120 ms transformed into seconds. 
    #data vectors for the AI are considered on touchPeak-20ms to touchPeak=100ms - 120ms time window
    
    expeceted_number_of_mfcc_vectors_per_segment = math.ceil((SAMPLE_RATE * 0.12)/hop_length)
    
    #Loop through all audio files
    for root, dirnames, filenames in os.walk(dataset_path):         
        
        #i is to label each click in a file with a label (e.g. recording: a a a a a, these all then labeled 0 0 0 0 0)
        for i, (f) in enumerate(filenames):
            
            #Load audio files
            file_path = os.path.join(root, f)
            signal, sr = librosa.load(file_path)
            
            #Label data with semantic labels (mappings)
            semantic_label = f.removesuffix(".wav")
            data["mapping"].append(semantic_label)
            
            print(semantic_label + ":")
            #Find the touch peaks in the audio file
            touch_peaks = detect_touch_peaks(signal, sr, threshold, waitAfter, False)
            
            for peak in touch_peaks:
                
                #Start sample is calculated to start 20ms before the detected touch peak
                #peak(ms)-20(ms) is devided by 1000, to get the time our start_sample started in seconds, not ms
                #time when the sample started * sampleRate gives which sample it is (e.g. 501th sample in waveform)
                start_sample = int(sr * (peak-19)/1000)
                end_sample = int(sr * (peak+100)/1000)
                

                #Extract MFCC features from the signal, but only from the touch peaks (start_sample:finish_sample)
                mfcc = librosa.feature.mfcc(signal[start_sample:end_sample],
                                           sr=sr,
                                           n_fft=n_fft,
                                           n_mfcc=n_mfcc,
                                           hop_length=hop_length)
                #Transpose mfcc
                mfcc = mfcc.T
                
                #store mfcc for segment if it has the expected length
                if(len(mfcc) == expeceted_number_of_mfcc_vectors_per_segment):
                    data["mfcc"].append(mfcc.tolist())
                    data["labels"].append(i)

    #write data into a json file                
    with open(json_path, "w") as fp:
        json.dump(data, fp, indent=4)
        
    return i + 1

In [53]:
def save_data_to_predict(predict_dataset_file_path, predict_json_path, threshold, waitAfter, n_mfcc=13, n_fft=2048, hop_length=512):
    #dictionary to store data
    predict_data = {
        "mfcc": [],
    }
    #number of mfcc vectors per segment is supposed to be an integer number
    expeceted_number_of_mfcc_vectors_per_segment = math.ceil((SAMPLE_RATE * 0.12)/hop_length)
    

    #Load audio file
    signal, sr = librosa.load(predict_dataset_file_path)


    #Find the touch peaks in the audio file
    touch_peaks = detect_touch_peaks(signal, sr, threshold, waitAfter, False)
    
    for peak in touch_peaks:

        #Start sample is calculated to start 20ms before the detected touch peak
        #peak(ms)-20(ms) is devided by 1000, to get the time our start_sample started in seconds, not ms
        #time when the sample started * sampleRate gives which sample it is (e.g. 501th sample in waveform)
        start_sample = int(sr * (peak-20)/1000)
        end_sample = int(sr * (peak+100)/1000)


        #Extract MFCC features from the signal, but only from the touch peaks (start_sample:finish_sample)
        mfcc = librosa.feature.mfcc(signal[start_sample:end_sample],
                                   sr=sr,
                                   n_fft=n_fft,
                                   n_mfcc=n_mfcc,
                                   hop_length=hop_length)
        #Transpose mfcc
        mfcc = mfcc.T

        #store mfcc for segment if it has the expected length
        if(len(mfcc) == expeceted_number_of_mfcc_vectors_per_segment):
            predict_data["mfcc"].append(mfcc.tolist())

    with open(predict_json_path, "w") as fp:
        json.dump(predict_data, fp, indent=4)
    return predict_data

In [54]:
def load_data(json_dataset_path):
    with open(json_dataset_path, "r") as fp:
        data = json.load(fp)
        
    #Convert lists into np arrays
    inputs = np.array(data["mfcc"])
    targets = np.array(data["labels"])
    
    return inputs, targets

In [55]:
def load_mapping_for_predict(JSON_PATH):
    with open(JSON_PATH, "r") as jp:
        data_for_maping = json.load(jp)
    mapping = np.array(data_for_maping["mapping"])
    return mapping

In [56]:
def prepare_datasets(test_size, validation_size):
    
    #Load data
    inputs, targets = load_data(TRAIN_JSON_PATH)
    
    #Create train/test arrays
    inputs_train, inputs_test, targets_train, targets_test = train_test_split(inputs, targets, test_size=test_size)
    
    #Create train/validation split
    inputs_train, inputs_validation, targets_train, targets_validation = train_test_split(inputs_train, 
                                                                                          targets_train, 
                                                                                          test_size=validation_size)
    
    #to use data for a CNN we need to convert arrays into 3d arrays to imitate being an image
    inputs_train=inputs_train[..., np.newaxis] # 4d array
    inputs_validation=inputs_validation[..., np.newaxis]
    inputs_test=inputs_test[..., np.newaxis]

    return inputs_train, inputs_validation, inputs_test, targets_train, targets_validation, targets_test

In [57]:
def build_model(outputs):
    
    inputs_train, inputs_validation, inputs_test, targets_train, targets_validation, targets_test = prepare_datasets(0.25, 0.2)
    input_shape = (inputs_train.shape[1], inputs_train.shape[2], inputs_train.shape[3])
    
    #Create model
    model = keras.Sequential()
    
    #1st conv layer         kernals  grid_size
    model.add(keras.layers.Conv2D(32, (3,3), activation='relu', input_shape=input_shape))
    model.add(keras.layers.MaxPool2D((3,3), strides=(1,1), padding='same'))
    model.add(keras.layers.BatchNormalization())
    
    #2nd conv layer
    #model.add(keras.layers.Conv2D(64, (2,2), activation=LeakyReLU(), input_shape=input_shape))
    #model.add(keras.layers.MaxPool2D((2,2), strides=(1,1), padding='same'))
    #model.add(keras.layers.BatchNormalization())
    
    #3rd conv layer
    #model.add(keras.layers.Conv2D(32, (2,2), activation='relu', input_shape=input_shape))
    #model.add(keras.layers.MaxPool2D((2,2), strides=(2,2), padding='same'))
    #model.add(keras.layers.BatchNormalization())
    
    #flatten the output and feed it into dense layer for classification
    model.add(keras.layers.Flatten())
    #model.add(keras.layers.Dense(64, activation=LeakyReLU()))
    model.add(keras.layers.Dropout(0.3)) #lessen overfitting
    
    #output layer
    model.add(keras.layers.Dense(outputs, activation='softmax'))
    
    #Compile CNN
    optimizer = keras.optimizers.Adam(learning_rate=0.0001)
    model.compile(optimizer=optimizer,
                 loss="sparse_categorical_crossentropy",
                 metrics=["accuracy"])

    model.summary()
    
    #Train CNN
    model.fit(inputs_train, targets_train, 
              validation_data=(inputs_validation, targets_validation), 
              batch_size=8, 
              epochs=50)
    
    #Evaluate the CNN on the test set
    test_error, test_accuracy = model.evaluate(inputs_test, targets_test, verbose=1)
    window.Element("-OUTPUT-").update("\n\nAccuracy on test set is: {} \n".format(test_accuracy), append=True)
    return model

In [58]:
def detect_touch_peaks(signal, sr, threshold, waitAfter, drawGraph):
    
    # splitting sound into 10 ms chunks
    soundLength = len(signal) / sr # length in seconds
    howManyParts = soundLength * 100 # how many chunks to cut up into
    tenMsParts = np.array_split(signal, howManyParts) # create new array containing 10ms parts
    
    currentTime = 0
    energyArray = []
    detectedClicks = []
    detectedClicksVisualize = [] #Used for plotting FFTs
    ArrayOfMss = []
    passedFirstPeak = False
    detectedKeyPressTime = 0

    for oneSegment in tenMsParts:
        
        ArrayOfMss.append(currentTime) # append current time
        sum_of_fft = round(fftAndSum(oneSegment)) # call fftAndSum and store returned value
        energyArray.append(sum_of_fft)

        #sum_of_fft > (Threshold) which is considered as a peak
        if(sum_of_fft > threshold and not(passedFirstPeak)):
            
            #passedFirstPeak bool is used to skip over the button's release peak when detected the touch peak
            passedFirstPeak = True
            detectedKeyPressTime = currentTime
            
            detectedClicks.append(currentTime) # add time of detected click
            detectedClicksVisualize.append(int(currentTime/10))
            
        elif(currentTime - detectedKeyPressTime > waitAfter): # wait x milliseconds after first peak
            passedFirstPeak = False
            
        currentTime += 10
    
    if drawGraph == True:
        prepareAndOpenGraphWindow(ArrayOfMss, energyArray, detectedClicksVisualize)
    
    return detectedClicks

In [59]:
import PySimpleGUI as sg
import os
import traceback
import sys
import sounddevice as sd
from scipy.io.wavfile import write
import wavio as wv
import time

In [60]:
def prepareAndOpenGraphWindow(ArrayOfMss, energyArray, detectedClicksVisualize):
    window = sg.Window("Graph", 
    [[sg.Text('Audio peaks and touch peaks')],
    [sg.Canvas(key='-CANVAS-')]],
    finalize=True, element_justification='center', font='Monospace 18')
    
    figure = plt.figure(figsize=(20, 8))
    plt.plot(ArrayOfMss, energyArray, '-bo', markevery = detectedClicksVisualize) # create plot with markers at detected click
    plt.xlabel('Milliseconds')
    plt.xticks(np.arange(0, max(ArrayOfMss),1000))
    plt.ylabel('Strength')
    plt.title('Detecting keypresses')
    
    fig_agg = draw_figure(window['-CANVAS-'].TKCanvas, figure)
    
    while True:
        event, values = window.read()
        if event == "Exit" or event == sg.WIN_CLOSED:
            break
        
    window.close()

In [61]:
def cleanup_models():
    numOfFiles = 0
    directory = os.getcwd()
    for File in os.listdir(directory):
        if File.endswith(".pb"):
            os.remove(File)
            numOfFiles += 1
    window.Element("-OUTPUT-").update(str(numOfFiles) + " files deleted \n", append=True)

In [62]:
def printTrainingFiles():
    directory = values["-TRAINING_DIRECTORY-"]
    for File in os.listdir(directory):
        if File.endswith(".wav"):  
            filename = os.fsdecode(File)
            print(filename)
            window.Element("-OUTPUT-").update(filename + "\n", append=True)
            window.refresh()

In [63]:
def modelExists():
    directory = os.getcwd()
    for File in os.listdir(directory):
        if File.endswith(".pb"):
            return True
    return False

In [64]:
def prepareAndTrainModel():
    window.Element("-OUTPUT-").update("Saving data to JSON... \n", append=True)
    outputs = save_data(values["-TRAINING_DIRECTORY-"], TRAIN_JSON_PATH, values["-TRAINING_THRESHOLD-"], values["-TRAINING_WAIT_AFTER-"])
    window.Element("-OUTPUT-").update("Training model... \n", append=True)
    print(outputs)
    model = build_model(outputs)
    window.Element("-OUTPUT-").update("Saving model... \n", append = True)
    model.save(os.getcwd())

In [65]:
def prepareAndTestModel():
    window.Element("-OUTPUT-").update("\n", append=True)
    tmp = save_data_to_predict(values["-TESTING_DIRECTORY-"], PREDICT_JSON_PATH, values["-TESTING_THRESHOLD-"], values["-TESTING_WAIT_AFTER-"])
    data = np.array(tmp["mfcc"])
    mapping = load_mapping_for_predict(TRAIN_JSON_PATH)
    model = keras.models.load_model(os.getcwd())
    predictions = model.predict(data)
    predicted_index = np.argmax(predictions, axis=1)
    window.Element("-OUTPUT-").update("predicted mapping: " + str(mapping[predicted_index]) + "\n", append=True)

In [66]:
def trainingDirectoryIsGood():
    directory = values["-TRAINING_DIRECTORY-"]
    if os.path.isdir(directory):
        for File in os.listdir(directory):
            if File.endswith(".wav"):
                return True
    return False

In [67]:
def testingDirectoryIsGood():
    directory = values["-TESTING_DIRECTORY-"]
    if os.path.isfile(directory):
        if directory.endswith(".wav"):
            return True
        return False
    return False

In [68]:
def prepareAndStartRecording():
    freq = 44100
    duration = values["-RECORDING_LENGTH-"]
    filename = values["-RECORDING_FILE_NAME-"]
    window.Element("-OUTPUT-").update("Recording... \n", append=True)
    window.refresh()
    recording = sd.rec(int(duration * freq), samplerate=freq, channels=2)
    sd.wait()
    wv.write("Recorder-Audio/" + filename + ".wav", recording, freq, sampwidth=2)
    window.Element("-OUTPUT-").update("\nRecording finished! \n", append=True)
    return filename

In [69]:
def cleanup_audio():
    numOfFiles = 0
    directory = os.getcwd() + "/Recorder-Audio"
    if not len(directory) == 0:
        for File in os.listdir(directory):
            path = os.path.join(directory, File)    
            os.remove(path)
            numOfFiles += 1
        window.Element("-OUTPUT-").update(str(numOfFiles) + " audio files deleted... \n", append=True)

In [70]:
def draw_figure(canvas, figure):
    figure_canvas_agg = FigureCanvasTkAgg(figure, canvas)
    figure_canvas_agg.draw()
    figure_canvas_agg.get_tk_widget().pack(side='top', fill='both', expand=1)
    return figure_canvas_agg

In [71]:
menu_definition = [
    ['Tools', ['Trainer', 'Tester','Audio recorder']],
    ['More', ['About', 'Clear output', 'Delete models', 'Delete audio', 'Delete all']]
]
                 
menu = [
    sg.Menu(menu_definition, tearoff=False, pad=(200, 1), key="-MENU-")
]

middle_train = [
    [
        sg.Text("TRAINER", font = ('any 14'))
    ],
    [
        sg.Canvas(key='figCanvas')
    ],
    [
        sg.Text("Training data folder:"),
        sg.In(size=(25, 1), enable_events=True, key="-TRAINING_DIRECTORY-",disabled=True),
        sg.FolderBrowse("Open folder with files"),
    ],
    [
        sg.Text("Threshold: "),
        sg.Slider((1,300), key='-TRAINING_THRESHOLD-', orientation='h', enable_events=True, disable_number_display=False)
    ],
    [
        sg.Text("Wait after each peak (ms): "),
        sg.Slider((1,200), key='-TRAINING_WAIT_AFTER-', orientation='h', enable_events=True, disable_number_display=False)
    ],
    [
        sg.Button("TRAIN", enable_events=True, key="-START_TRAINING-"),
    ]
]

middle_test = [
    [
        sg.Text("TESTER", font = ('any 14'))
    ],
    [
        sg.Text("Test file:"),
        sg.In(size=(25, 1), enable_events=True, key="-TESTING_DIRECTORY-",disabled=True),
        sg.FileBrowse("Open file"),
    ],
    [
        sg.Text("Threshold: "),
        sg.Slider((1,300), key='-TESTING_THRESHOLD-', orientation='h', enable_events=True, disable_number_display=False)
    ],
    [
        sg.Text("Wait after each peak (ms): "),
        sg.Slider((1,200), key='-TESTING_WAIT_AFTER-', orientation='h', enable_events=True, disable_number_display=False)
    ],
    [
        sg.Button("TEST", enable_events=True, key="-START_TESTING-"),
    ]
]

middle_record = [
    [
        sg.Text("RECORDER", font = ('any 14'))
    ],
    [
        sg.Text("Audio file name: "),
        sg.In(size = (25, 1), enable_events=True, key="-RECORDING_FILE_NAME-")
    ],
    [
        sg.Text("Recording length: "),
        sg.Slider((3,30), key='-RECORDING_LENGTH-', orientation='h', enable_events=True, disable_number_display=False)
    ],
    [
        sg.Text("Threshold: "),
        sg.Slider((1,300), key='-RECORDING_THRESHOLD-', orientation='h', enable_events=True, disable_number_display=False)
    ],
    [
        sg.Text("Wait after each peak (ms): "),
        sg.Slider((1,200), key='-RECORDING_WAIT_AFTER-', orientation='h', enable_events=True, disable_number_display=False)
    ],
    [
        sg.Button("Record", enable_events=True, key="-START_RECORDING-"),
    ],
]

bottom = [
    [
        sg.Text("Output:")
    ],
    [
        sg.Multiline(size = (80, 20), enable_events=True, key="-OUTPUT-")
    ]
]

layout = [menu, 
         [sg.Column(middle_train, key='-layout_train-'), 
          sg.Column(middle_test, visible=False, key='-layout_test-'), 
          sg.Column(middle_record,visible=False, key='-layout_record-')],
          bottom]

In [None]:
window = sg.Window("KAT", layout)
while True:
    try:
        event, values = window.read()
        
        if event == sg.WIN_CLOSED:
            break
        
        if event == "-TRAINING_DIRECTORY-":
            if trainingDirectoryIsGood():
                printTrainingFiles()
                window.Element("-OUTPUT-").update("Training directory is fine! You can proceed to train the model... \n", append=True)
            else:
                window.Element("-OUTPUT-").update("No .wav files detected in directory...\n", append=True)
                
        if event == "-TESTING_DIRECTORY-":
            if testingDirectoryIsGood():
                window.Element("-OUTPUT-").update("Testing file is good! \n", append=True)
            else:
                window.Element("-OUTPUT-").update("Not a .wav file. Make sure to find a .wav file! \n", append=True)
                
        if event == "-START_TRAINING-":
            if trainingDirectoryIsGood():
                window.perform_long_operation(lambda: prepareAndTrainModel(), "-MODEL_TRAINED-")
            else:
                window.Element("-OUTPUT-").update("Error. Training Directory does not contain any .wav files... \n", append=True)
        
        if event == "-MODEL_TRAINED-":
            window.Element("-OUTPUT-").update("Model succesfully trained and saved. You can now test it! (Check the menu up top) \n", append=True)
       
        if event == "-START_TESTING-":
            if testingDirectoryIsGood():
                if modelExists():
                    window.Element("-OUTPUT-").update("Testing model... \n", append=True)
                    window.perform_long_operation(lambda: prepareAndTestModel(), "-MODEL_TESTED-")
                else:
                    window.Element("-OUTPUT-").update("No model currently exists. Please load or train a new model \n", append=True)
            else:
                window.Element("-OUTPUT-").update("Bad directory. Make sure you selected a .wav file! \n", append=True)
              
        if event == "-MODEL_TESTED-":
            window.Element("-OUTPUT-").update("End of testing. \n", append=True)       
        
        if event == "-START_RECORDING-":
            if not values["-RECORDING_FILE_NAME-"] == "":
                if len(str(values["-RECORDING_FILE_NAME-"])) < 10:
                    window.perform_long_operation(lambda: prepareAndStartRecording(), "-RECORDING_FINISHED-")
                else:
                    window.Element("-OUTPUT-").update("Name too long... \n", append=True)
            else:
                window.Element("-OUTPUT-").update("Please enter a name for your recording... \n", append=True)

        if event == "-RECORDING_FINISHED-":
            signal, sr = librosa.load("Recorder-Audio/" + str(values["-RECORDING_FINISHED-"]) + ".wav")
            touch_peaks = detect_touch_peaks(signal, sr, values["-RECORDING_THRESHOLD-"], values["-RECORDING_WAIT_AFTER-"], True)
    
        if event == "Trainer":
            window['-layout_test-'].update(visible=False)
            window['-layout_train-'].update(visible=True)
            window['-layout_record-'].update(visible=False)
            
        if event == "Tester":
            window['-layout_train-'].update(visible=False)
            window['-layout_test-'].update(visible=True)
            window['-layout_record-'].update(visible=False)
            
        if event == "Audio recorder":
            window['-layout_record-'].update(visible=True)
            window['-layout_test-'].update(visible=False)
            window['-layout_train-'].update(visible=False)
            
        if event == 'About':
            sg.Popup(
                "Welcome to KAT. This is a simple UI for KAT - Keyboard Accoustic Translator",
                "This ui is simple to use:",
                "1. Record some audio with the recorder tool (make sure you see bubbles on spikes)",
                "2. Train the data via trainer tool(change sliders accordingly)",
                "3. Record a test file",
                "4. Test it on the tester tool"
            )
        if event == 'Clear output':
            window.Element("-OUTPUT-").update("")
            
        if event == "Delete models":
            cleanup_models()
        
        if event == "Delete audio":
            cleanup_audio()
            
        if event == "Delete all":
            window.Element("-OUTPUT-").update("")
            cleanup_models()
            cleanup_audio()
            
    except Exception as e:
        print(e)
        print(traceback.format_exc())
        break
        
window.close()

a.wav
space.wav
a:
space:
2
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 4, 11, 32)         320       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 4, 11, 32)        0         
 )                                                               
                                                                 
 batch_normalization (BatchN  (None, 4, 11, 32)        128       
 ormalization)                                                   
                                                                 
 flatten (Flatten)           (None, 1408)              0         
                                                                 
 dropout (Dropout)           (None, 1408)              0         
                                                                 
 dense (Dense)              