In [0]:
import os
import glob
import numpy as np
import pandas as pd
import soundfile as sf
import matplotlib.pyplot as plt
from pathlib import Path
from scipy.signal import resample, hilbert, find_peaks
from skimage import util
from tqdm.notebook import tqdm
from collections import Counter

In [0]:
sampling_rate = 44100
frame_bits = 16
left_trim_sec = 0.1
right_trim_sec = 0.1
peak_thresh = 0.5
peak_sep_sec = 0.2
left_segment_sec = 0.005
right_segment_sec = 0.02
window_sec = 0.02
window_stride = 0.001
train_folder = 'all_data'

def get_signal_from_file(filepath, ref_samp_rate, frame_bits):
    df = pd.read_csv(filepath, header=None)
    frame_max = 2**(frame_bits - 1)
    signal = df.values[:,1] / frame_max
    assert np.all((-1 <= signal) & (signal <= 1)), 'Invalid PCM data'
#     plt.plot(signal)
#     plt.show()
    return signal

def trim_signal(signal, left_time, right_time, sampling_rate):
    start = int(round(left_time * sampling_rate))
    end = -int(round(right_time * sampling_rate))
#     plt.plot(signal[start:end])
#     plt.show()
    return signal[start:end]

def get_peaks(signal, num_peaks, peak_thresh, peak_sep_sec, sampling_rate):
    envelope = np.abs(hilbert(signal))
    distance = peak_sep_sec * sampling_rate
    peaks, _ = find_peaks(envelope, height=peak_thresh, distance=distance)
    assert len(peaks) == num_peaks, "Found {} instead of {} peaks".format(len(peaks), num_peaks)
    return peaks

def get_segments_from_peaks(signal, peaks, left_segment_sec, right_segment_sec, sampling_rate):
    segments = []
    for peak in peaks:
        sample_start = peak - int(round(left_segment_sec * sampling_rate))
        sample_end = peak + int(round(right_segment_sec * sampling_rate))
        segment = signal[sample_start:sample_end + 1]
        segments.append(segment)
#         plt.plot(signal[sample_start - 100:sample_end + 101])
#         plt.axvline(x=100, color='green')
#         plt.axvline(x=100 + sample_end - sample_start, color='red')
#         plt.show()
        
    return segments

def get_fft_features(segment, window_sec, window_stride, sampling_rate):
    N = 2**int(np.ceil(np.log2(window_sec * sampling_rate)))
    assert N <= len(segment), "Segment too short"
    hann = np.hanning(N)
    stride = int(round(window_stride * sampling_rate))
    windows = util.view_as_windows(segment, window_shape=N, step=stride)
    windows = hann * windows
    spectrum = np.fft.fft(windows)
#     xf = np.fft.fftfreq(N, d=1/sampling_rate)[1:N//2 + 1]
#     yf = np.mean(np.abs(spectrum)[:, 1:(N - 1)//2 + 1], axis=0)
#     plt.plot(xf, yf)
    return np.mean(np.abs(spectrum)[:, 1:(N - 1)//2 + 1], axis=0)

def get_data_from_folder(folder, debug=False):
    data = []
    labels = []
    filepaths = glob.glob(os.path.join(folder, '*', '*.wave.csv'))
    total = len(filepaths)
    discarded = 0
    for filepath in tqdm(filepaths):
        try:
            chars = Path(filepath).stem.split('.')[0]
            signal = get_signal_from_file(filepath, sampling_rate, frame_bits)
            signal = trim_signal(signal, left_trim_sec, right_trim_sec, sampling_rate)
            peaks = get_peaks(signal, len(chars), peak_thresh, peak_sep_sec, sampling_rate)
            segments = get_segments_from_peaks(signal, peaks, left_segment_sec, right_segment_sec, sampling_rate)
            for i, segment in enumerate(segments):
                features = get_fft_features(segment, window_sec, window_stride, sampling_rate)
                data.append(features)
                labels.append(chars[i])
        except AssertionError as e:
            if debug:
                print('Error in {}: {}'.format(filepath, e))
                
            discarded += 1
    
    print('{} of {} ({:.1f}%) samples discarded'.format(discarded, total, discarded / total * 100))            
    data = np.array(data)
    labels = np.array(labels)
    return data, labels

print('Loading train data...')
data_train_all, labels_train_all = get_data_from_folder(train_folder)
print('Train data shape:', data_train_all.shape)
print('Train labels shape:', labels_train_all.shape)

Loading train data...


HBox(children=(FloatProgress(value=0.0, max=5152.0), HTML(value='')))


343 of 5152 (6.7%) samples discarded
Train data shape: (4809, 511)
Train labels shape: (4809,)


In [0]:
def inspect_labels(labels):
    dist = Counter(labels)
    items = sorted(dist.items())
    for label, count in items:
        print('{}: {}'.format(repr(label), count))
        
print('Train label distribution:')
inspect_labels(labels_train_all)

Train label distribution:
'0': 122
'1': 123
'2': 124
'3': 132
'4': 120
'5': 142
'6': 125
'7': 135
'8': 141
'9': 128
'a': 133
'b': 122
'c': 125
'd': 143
'e': 133
'f': 137
'g': 121
'h': 135
'i': 136
'j': 143
'k': 122
'l': 132
'm': 147
'n': 125
'o': 171
'p': 146
'q': 129
'r': 127
's': 125
't': 130
'u': 141
'v': 138
'w': 145
'x': 134
'y': 137
'z': 140


In [0]:
import keras
from sklearn.preprocessing import LabelBinarizer, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.models import load_model
from keras.metrics import top_k_categorical_accuracy
from keras.utils import to_categorical
from keras import backend as K
from tqdm.keras import TqdmCallback

from keras.backend.tensorflow_backend import set_session
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(gpus[0], True)

batch_size = 128
epochs = 100
patience = 100
val_size = 0.15
test_size = 0.15
num_models = 5
model_name = 'key_model'

def key_model(num_classes):
    model = Sequential()
    model.add(Dense(64, activation='relu', kernel_initializer='he_normal'))
    model.add(Dense(64, activation='relu', kernel_initializer='he_normal'))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))

    model.compile(loss=keras.losses.categorical_crossentropy,
                  optimizer=keras.optimizers.Adam(),
                  metrics=['accuracy'])

    return model

def train_model(model, x_train, y_train, x_val, y_val, filename):
    es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=patience)
    mc = ModelCheckpoint(filename, monitor='val_accuracy', mode='max', verbose=0, save_best_only=True)
    tc = TqdmCallback(verbose=0)

    model.fit(x_train, y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=0,
              validation_data=(x_val, y_val),
              callbacks=[es, mc, tc])

    return load_model(filename)

def preprocess_data_test(data_train, labels_train, data_test, labels_test):
    encoder = LabelBinarizer()
    onehot = encoder.fit_transform(labels_train_all)
    num_classes = onehot.shape[1]
    x_train, x_val, y_train, y_val, labels_train, labels_val = train_test_split(data_train_all, onehot, labels_train_all, test_size=val_size, stratify=labels_train_all)
    
    input_shape = data_train_all[0].shape
    if debug:
        print(input_shape)

    scaler = StandardScaler()
    scaler.fit(x_train)
    x_train = scaler.transform(x_train)
    x_val = scaler.transform(x_val)
    x_test = scaler.transform(data_test)
    y_test = encoder.transform(labels_test)
    if debug:
        print('x_train shape:', x_train.shape)
        print('y_train shape:', y_train.shape)
        print(x_train.shape[0], 'train samples')
        print(x_val.shape[0], 'val samples')
        print(x_test.shape[0], 'test samples')

def preprocess_data(data_train_all, labels_train_all):
    encoder = LabelBinarizer()
    onehot = encoder.fit_transform(labels_train_all)
    num_classes = onehot.shape[1]
    x_train, x_test, y_train, y_test, labels_train, labels_test = train_test_split(data_train_all, onehot, labels_train_all, test_size=val_size + test_size, stratify=labels_train_all)
    x_val, x_test, y_val, y_test, labels_val, labels_test = train_test_split(x_test, y_test, labels_test, test_size=test_size / (val_size + test_size), stratify=labels_test)

    input_shape = data_train_all[0].shape

    scaler = StandardScaler()
    scaler.fit(x_train)
    x_train = scaler.transform(x_train)
    x_val = scaler.transform(x_val)
    x_test = scaler.transform(x_test)
    y_test = encoder.transform(labels_test)
    print('x_train shape:', x_train.shape)
    print('y_train shape:', y_train.shape)
    print(x_train.shape[0], 'train samples')
    print(x_val.shape[0], 'val samples')
    print(x_test.shape[0], 'test samples')
    return x_train, y_train, x_val, y_val, x_test, y_test
    
x_train, y_train, x_val, y_val, x_test, y_test = preprocess_data(data_train_all, labels_train_all)

models = []
for i in range(num_models):
    filename = '{}_{}.h5'.format(model_name, i)
    print('Training model {} of {}...'.format(i + 1, num_models))
    num_classes = y_train.shape[1]
    model = key_model(num_classes)
    model = train_model(model, x_train, y_train, x_val, y_val, filename)
    models.append(model)
    
print('Finished training')

Using TensorFlow backend.


x_train shape: (3366, 511)
y_train shape: (3366, 36)
3366 train samples
721 val samples
722 test samples
Training model 1 of 5...


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))


Training model 2 of 5...


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))


Training model 3 of 5...


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))


Training model 4 of 5...


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))


Training model 5 of 5...


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))


Finished training


In [0]:
def top_k_accuracy_score(y_pred, y_test, k):
    top_k_marks = top_k_categorical_accuracy(y_test, y_pred, k=k)
    return sum(top_k_marks) / len(top_k_marks)

k = 5
avg_pred = sum(model.predict(x_val) for model in models)
labels_pred = np.argmax(avg_pred, axis=1)
y_pred = to_categorical(labels_pred)
score = accuracy_score(y_pred, y_val)
top_k_score = top_k_accuracy_score(avg_pred, y_val, k)
print('Val accuracy: {:.1f}%'.format(score * 100))
print('Top-{} accuracy: {:.1f}%'.format(k, top_k_score * 100))

Val accuracy: 99.0%
Top-5 accuracy: 99.9%


In [0]:
for i, model in enumerate(models):
    score = model.evaluate(x_val, y_val, verbose=0)
    print('Model {} val accuracy: {:.1f}%'.format(i + 1, score[1] * 100))

Model 1 val accuracy: 97.8%
Model 2 val accuracy: 97.4%
Model 3 val accuracy: 98.1%
Model 4 val accuracy: 97.5%
Model 5 val accuracy: 98.5%


In [0]:
avg_pred = sum(model.predict(x_test) for model in models)
labels_pred = np.argmax(avg_pred, axis=1)
y_pred = to_categorical(labels_pred)
score = accuracy_score(y_pred, y_test)
top_k_score = top_k_accuracy_score(avg_pred, y_test, k)
print('Test accuracy: {:.1f}%'.format(score * 100))
print('Top-{} accuracy: {:.1f}%'.format(k, top_k_score * 100))

Test accuracy: 99.2%
Top-5 accuracy: 100.0%


In [0]:
for i, model in enumerate(models):
    score = model.evaluate(x_test, y_test, verbose=0)
    print('Model {} test accuracy: {:.1f}%'.format(i + 1, score[1] * 100))

Model 1 test accuracy: 97.9%
Model 2 test accuracy: 98.6%
Model 3 test accuracy: 97.9%
Model 4 test accuracy: 98.3%
Model 5 test accuracy: 97.9%
