In [1]:
from scipy import signal
import scipy.io.wavfile
import numpy as np
from sklearn.preprocessing import LabelEncoder
import random
import os

In [2]:
def log_specgram(audio, sample_rate, window_size=20,
                 step_size=10, eps=1e-10):
    nperseg = int(round(window_size * sample_rate / 1e3))
    noverlap = int(round(step_size * sample_rate / 1e3))
    freqs, times, spec = signal.spectrogram(audio,
                                    fs=sample_rate,
                                    window='hann',
                                    nperseg=nperseg,
                                    noverlap=noverlap,
                                    detrend=False)
    return freqs, times, np.log(spec.T.astype(np.float32) + eps)

def pad_audio(samples, L=16000):
    if len(samples) >= L: return samples
    else: return np.pad(samples, pad_width=(L - len(samples), 0), mode='constant', constant_values=(0, 0))

def chop_audio(samples, L=16000, num=20):
    for i in range(num):
        beg = np.random.randint(0, len(samples) - L)
        yield samples[beg: beg + L]

In [3]:
audio_location = os.listdir('/home/husein/Desktop/convolutional-neural-network/audio')
audio_location

['seven', 'one', 'five', 'nine', 'down', 'four', 'eight']

In [4]:
X, Y = [], []
new_sample_rate = 8000
for i in audio_location:
    audios = os.listdir('/home/husein/Desktop/convolutional-neural-network/audio/%s'%(i))
    for k in audios:
        sample_rate, samples = scipy.io.wavfile.read(os.path.join('/home/husein/Desktop/convolutional-neural-network/audio', i, k))
        samples = pad_audio(samples)
        if len(samples) > 16000:
            n_samples = chop_audio(samples)
        else: 
            n_samples = [samples]
        for samples in n_samples:
            resampled = signal.resample(samples, int(new_sample_rate / sample_rate * samples.shape[0]))
            _, _, specgram = log_specgram(resampled, sample_rate=new_sample_rate)
            Y.append(i)
            X.append(np.expand_dims(scipy.misc.imresize(specgram,[45, 40]),axis=2))

In [5]:
labels = np.unique(Y)
Y = LabelEncoder().fit_transform(Y)
c = list(zip(X, Y))
random.shuffle(c)
X, Y = zip(*c)
X, Y = np.array(X), np.array(Y)
print(X.shape)
onehot = np.zeros((X.shape[0],labels.shape[0]))
onehot[np.arange(Y.shape[0]), Y] = 1.0

(70, 45, 40, 1)


In [6]:
def get_padding(filter_shape, pad='same'):
    if pad == 'valid':
        return (0, 0), (0, 0)
    if pad == 'same':
        filter_height, filter_width = filter_shape
        pad_h_min = int(np.floor((filter_height - 1)/2))
        pad_h_max = int(np.ceil((filter_height - 1)/2))
        pad_w_min = int(np.floor((filter_width - 1)/2))
        pad_w_max = int(np.ceil((filter_width - 1)/2))
        return (pad_h_min, pad_h_max), (pad_w_min, pad_w_max)
    
def images_to_column_indices(images_shape, filter_shape, padding, rate, stride=1):
    batch_size, height, width, channels = images_shape
    filter_height, filter_width = filter_shape
    pad_h, pad_w = padding
    out_height = int(np.ceil((height + np.sum(pad_h) - rate * (filter_height-1)) / stride) + 1)
    out_width = int(np.ceil((width + np.sum(pad_w) - rate * (filter_width-1)) / stride) + 1)
    i0 = np.repeat(np.arange(filter_height), filter_width)
    i0 = np.tile(i0, channels)
    i1 = stride * np.repeat(np.arange(out_height), out_width)
    j0 = np.tile(np.arange(filter_width), filter_height * channels)
    j1 = stride * np.tile(np.arange(out_width), out_height)
    i = i0.reshape(-1, 1) + i1.reshape(1, -1)
    j = j0.reshape(-1, 1) + j1.reshape(1, -1)
    k = np.repeat(np.arange(channels), filter_height * filter_width).reshape(-1, 1)
    return (i, j, k)
    
def image_to_column(images, filter_shape, rate, stride, pad='same'):
    filter_height, filter_width = filter_shape
    pad_h, pad_w = get_padding(filter_shape, pad)
    images_padded = np.pad(images, ((0, 0), pad_h, pad_w, (0, 0)), mode='constant')
    i, j, k = images_to_column_indices(images.shape, filter_shape, (pad_h, pad_w), rate, stride)
    cols = images_padded[:, i, j, k]
    channels = images.shape[3]
    return cols.reshape(filter_height * filter_width * channels, -1)

def get_shape(x, filter_shape, rate, stride, pad):
    _, height, width, _ = x.shape
    pad_h, pad_w = get_padding(filter_shape, pad)
    output_height = int(np.ceil((height + np.sum(pad_h) - rate * (filter_shape[0]-1)) / stride) + 1)
    output_width = int(np.ceil((width + np.sum(pad_w) - rate * (filter_shape[1]-1)) / stride) + 1)
    return int(output_height), int(output_width)

def column_to_image(cols, images_shape, filter_shape, rate, stride, pad='same'):
    batch_size, height, width, channels = images_shape
    pad_h, pad_w = get_padding(filter_shape, pad)
    height_padded = height + np.sum(pad_h)
    width_padded = width + np.sum(pad_w)
    images_padded = np.empty((batch_size, height_padded, width_padded, channels))
    i, j, k = images_to_column_indices(images_shape, filter_shape, (pad_h, pad_w), rate, stride)
    cols = cols.reshape(channels * np.prod(filter_shape), -1, batch_size)
    cols = cols.transpose(2,0,1)
    np.add.at(images_padded, (slice(None), i, j, k), cols)
    return images_padded[:, pad_h[0]:height+pad_h[0], pad_w[0]:width+pad_w[0], :]

def conv_forward(X, W, rate, stride=1, pad='same'):
    filter_shape = (W.shape[0], W.shape[1])
    n_filter = W.shape[3]
    X_col = image_to_column(X, filter_shape, rate, stride=stride, pad=pad)
    W_col = W.reshape((n_filter, -1))
    output = W_col.dot(X_col)
    out=get_shape(X, filter_shape, rate, stride, pad)
    return output.reshape((X.shape[0],out[0],out[1],n_filter)), (filter_shape, n_filter, X_col, W_col,stride, pad, rate)

def conv_backward(X, W, dout, cached):
    filter_shape, n_filter, X_col, W_col,stride, pad, rate = cached
    db = np.sum(dout, axis=(0, 1, 2)).reshape(n_filter)
    dout_reshaped = dout.transpose(1, 2, 3, 0).reshape(n_filter, -1)
    dW = dout_reshaped.dot(X_col.T)
    dW = dW.reshape(W.shape)
    W_reshape = W.reshape(n_filter, -1)
    dX_col = W_reshape.T.dot(dout_reshaped)
    dX = column_to_image(dX_col, X.shape, filter_shape, rate, stride, pad)
    return dX, dW, db

def cross_entropy(Y_hat, Y, epsilon=1e-12):
    Y_hat = np.clip(Y_hat, epsilon, 1. - epsilon)
    N = Y_hat.shape[0]
    return -np.sum(np.sum(Y * np.log(Y_hat+1e-9))) / N

def softmax(x):
    exp_scores = np.exp(x - np.max(x))
    return exp_scores / (np.sum(exp_scores, axis=1, keepdims=True) + 1e-8)

def relu_forward(X):
    out = np.maximum(X, 0)
    cached = X
    return out, cached

def relu_backward(X, cached):
    X[cached <= 0] = 0
    return X

In [7]:
filter_size = 3
rate = 3
stride = 2
epoch = 20
learning_rate = 0.00001
starting_dimension = X[0].shape[2]
kernel_1 = np.random.randn(filter_size, filter_size, starting_dimension, 16) / np.sqrt(starting_dimension)
bias_1 = np.zeros(16)
kernel_2 = np.random.randn(filter_size, filter_size, 16, 32) / np.sqrt(16)
bias_2 = np.zeros(32)
kernel_3 = np.random.randn(filter_size, filter_size, 32, 64) / np.sqrt(32)
bias_3 = np.zeros(64)
w_1 = None
b_1 = np.zeros(128)
w_2 = np.random.randn(128, labels.shape[0]) / np.sqrt(128)
b_2 = np.zeros(labels.shape[0])

In [8]:
for i in range(epoch):
    conv1, cached1 = conv_forward(X, kernel_1, rate, stride)
    conv1 = conv1 + bias_1
    z1, relu_cached1 = relu_forward(conv1)
    conv2, cached2 = conv_forward(z1, kernel_2, rate, stride)
    conv2 = conv2 + bias_2
    z2, relu_cached2 = relu_forward(conv2)
    conv3, cached3 = conv_forward(z2, kernel_3, rate, stride)
    conv3 = conv3 + bias_3
    z3, relu_cached3 = relu_forward(conv3)
    h, w = z3.shape[1], z3.shape[2]
    z3_reshape = z3.reshape((-1, h * w * 64))
    if w_1 is None:
        w_1 = np.random.randn(h * w * 64, 128) / np.sqrt(h * w * 64)
    fully1 = np.dot(z3_reshape, w_1) + b_1
    z4, relu_cached4 = relu_forward(fully1)
    logits = np.dot(z4, w_2) + b_2
    probs = softmax(logits)
    accuracy = np.mean(np.argmax(probs,axis=1) == Y)
    loss = cross_entropy(probs, onehot)
    delta = probs
    delta[range(Y.shape[0]), Y] -= 1
    dw_2 = np.dot(z4.T, delta)
    db_2 = np.sum(delta,axis=0)
    dz4 = np.dot(delta,w_2.T)
    dfully1 = relu_backward(dz4, relu_cached4)
    dw_1 = np.dot(z3_reshape.T, dfully1)
    db_1 = np.sum(dfully1,axis=0)
    dz3_reshape = np.dot(dfully1,w_1.T)
    dz3 = dz3_reshape.reshape((-1, h, w, 64))
    dconv3 = relu_backward(dz3, relu_cached3)
    dz2, dkernel_3, dbias_3 = conv_backward(z2, kernel_3, dconv3, cached3)
    dconv2 = relu_backward(dz2, relu_cached2)
    dz1, dkernel_2, dbias_2 = conv_backward(z1, kernel_2, dconv2, cached2)
    dconv1 = relu_backward(dz1, relu_cached1)
    _, dkernel_1, dbias_1 = conv_backward(X, kernel_1, dconv1, cached1)
    kernel_1 -= learning_rate * dkernel_1
    bias_1 -= learning_rate * dbias_1
    kernel_2 -= learning_rate * dkernel_2
    bias_2 -= learning_rate * dbias_2
    kernel_3 -= learning_rate * dkernel_3
    bias_3 -= learning_rate * dbias_3
    w_1 -= learning_rate * dw_1
    b_1 -= learning_rate * db_1
    w_2 -= learning_rate * dw_2
    b_2 -= learning_rate * db_2
    print('epoch %d, loss %f, accuracy %f'%(i+1, loss, accuracy))

epoch 1, loss 20.722266, accuracy 0.114286
epoch 2, loss 20.426234, accuracy 0.157143
epoch 3, loss 1.946156, accuracy 0.142857
epoch 4, loss 1.945995, accuracy 0.142857
epoch 5, loss 1.945940, accuracy 0.142857
epoch 6, loss 1.945921, accuracy 0.142857
epoch 7, loss 1.945915, accuracy 0.142857
epoch 8, loss 1.945913, accuracy 0.142857
epoch 9, loss 1.945912, accuracy 0.142857
epoch 10, loss 1.945911, accuracy 0.142857
epoch 11, loss 1.945911, accuracy 0.142857
epoch 12, loss 1.945911, accuracy 0.142857
epoch 13, loss 1.945911, accuracy 0.142857
epoch 14, loss 1.945910, accuracy 0.142857
epoch 15, loss 1.945910, accuracy 0.142857
epoch 16, loss 1.945910, accuracy 0.142857
epoch 17, loss 1.945910, accuracy 0.142857
epoch 18, loss 1.945910, accuracy 0.142857
epoch 19, loss 1.945910, accuracy 0.142857
epoch 20, loss 1.945910, accuracy 0.142857
