## Preface
This notebooks aims to build a light-weight CNN.
It uses specgrams of resampled wav files(rate 8000) as inputs.

## File Structure
This script assumes data are stored in following strcuture:
speech
├── test            
│   └── audio #test wavfiles
├── train           
│   ├── audio #train wavfiles

In [94]:
import os
import re
import gc
import keras
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F

from torch.autograd import Variable
from scipy.fftpack import fft
from scipy.io import wavfile
from scipy import signal
from glob import glob
from scipy.io import wavfile
from keras import optimizers, losses, activations, models
from keras.layers import Convolution2D, Dense, Input, Flatten, Dropout, MaxPooling2D, BatchNormalization
from sklearn.model_selection import train_test_split

The original sample rate is 16000, and we will resample it to 8000 to reduce data size.

In [95]:
L = 16000
legal_labels = 'yes no up down left right on off stop go silence unknown'.split()

root_path = r'.'
out_path = r'.'
model_path = r'.'
train_data_path = os.path.join(root_path, 'data', 'train', 'audio')
test_data_path = os.path.join(root_path, 'data', 'test', 'audio')

Here are custom_fft and log_specgram functions written by __DavidS__.

In [96]:
def custom_fft(y, fs):
    T = 1.0 / fs
    N = y.shape[0]
    yf = fft(y)
    xf = np.linspace(0.0, 1.0/(2.0*T), N//2)
    # FFT is simmetrical, so we take just the first half
    # FFT is also complex, to we take just the real part (abs)
    vals = 2.0/N * np.abs(yf[0:N//2])
    return xf, vals

def log_specgram(audio, sample_rate, window_size=20,
                 step_size=10, eps=1e-10):
    nperseg = int(round(window_size * sample_rate / 1e3))
    noverlap = int(round(step_size * sample_rate / 1e3))
    freqs, times, spec = signal.spectrogram(audio,
                                    fs=sample_rate,
                                    window='hann',
                                    nperseg=nperseg,
                                    noverlap=noverlap,
                                    detrend=False)
    return freqs, times, np.log(spec.T.astype(np.float32) + eps)

Following is the utility function to grab all wav files inside train data folder.

In [97]:
def list_wavs_fname(dirpath, ext='wav'):
    print(dirpath)
    fpaths = glob(os.path.join(dirpath, r'*/*' + ext))
    pat = r'.+\\(\w+)\\\w+\.' + ext + '$'
    labels = []
    for fpath in fpaths:
        r = re.match(pat, fpath)
        if r:
            labels.append(r.group(1))
    pat = r'.+\\(\w+\.' + ext + ')$'
    fnames = []
    for fpath in fpaths:
        r = re.match(pat, fpath)
        if r:
            fnames.append(r.group(1))
    return labels, fnames

__pad_audio__ will pad audios that are less than 16000(1 second) with 0s to make them all have the same length.

__chop_audio__ will chop audios that are larger than 16000(eg. wav files in background noises folder) to 16000 in length. In addition, it will create several chunks out of one large wav files given the parameter 'num'.

__label_transform__ transform labels into dummies values. It's used in combination with softmax to predict the label.

In [98]:
def pad_audio(samples):
    if len(samples) >= L: return samples
    else: return np.pad(samples, pad_width=(L - len(samples), 0), mode='constant', constant_values=(0, 0))

def chop_audio(samples, L=16000, num=20):
    for i in range(num):
        beg = np.random.randint(0, len(samples) - L)
        yield samples[beg: beg + L]

def label_transform(labels):
    nlabels = []
    for label in labels:
        if label == '_background_noise_':
            nlabels.append('silence')
        elif label not in legal_labels:
            nlabels.append('unknown')
        else:
            nlabels.append(label)
    return pd.get_dummies(pd.Series(nlabels))

Next, we use functions declared above to generate x_train and y_train.
label_index is the index used by pandas to create dummy values, we need to save it for later use.

In [115]:
labels, fnames = list_wavs_fname(train_data_path)

new_sample_rate = 8000
y_train = []
x_train = []

current = 0
for label, fname in zip(labels, fnames):
    sample_rate, samples = wavfile.read(os.path.join(train_data_path, label, fname))
    samples = pad_audio(samples)
    if len(samples) > 16000:
        n_samples = chop_audio(samples)
    else: n_samples = [samples]
    for samples in n_samples:
        resampled = signal.resample(samples, int(new_sample_rate / sample_rate * samples.shape[0]))
        _, _, specgram = log_specgram(resampled, sample_rate=new_sample_rate)
        y_train.append(label)
        x_train.append(specgram)
    current += 1
    if current % 1000 == 0:
        print(current)

x_train = np.array(x_train)
x_train = x_train.reshape(tuple(list(x_train.shape) + [1]))
y_train = label_transform(y_train)
label_index = y_train.columns.values
y_train = y_train.values
y_train = np.array(y_train)
del labels, fnames
gc.collect()

.\data\train\audio
1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
11000
12000
13000
14000
15000
16000
17000
18000
19000
20000
21000
22000
23000
24000
25000
26000
27000
28000
29000
30000
31000
32000
33000
34000
35000
36000
37000
38000
39000
40000
41000
42000
43000
44000
45000
46000
47000
48000
49000
50000
51000
52000
53000
54000
55000
56000
57000
58000
59000
60000
61000
62000
63000
64000




3380

In [135]:
x_train, x_valid, y_train, y_valid = train_test_split(x_train, y_train, test_size=0.1, random_state=2017)

In [117]:
class CNN(nn.Module):
    def __init__(self, nclass):
        super(CNN, self).__init__()
        self.bn1 = nn.BatchNorm2d(1)
        self.conv1 = nn.Conv2d(1, 8, 2)
        self.conv2 = nn.Conv2d(8, 8, 2)
        self.drop1 = nn.Dropout(p=0.2)
        self.conv3 = nn.Conv2d(8, 16, 3)
        self.conv4 = nn.Conv2d(16, 16, 3)
        self.drop2 = nn.Dropout(p=0.2)
        self.conv5 = nn.Conv2d(16, 32, 3)
        self.drop3 = nn.Dropout(p=0.2)
        self.fc1 = nn.Linear(2240, 128)
        self.bn2 = nn.BatchNorm1d(128)
        self.fc2 = nn.Linear(128, 128)
        self.bn3 = nn.BatchNorm1d(128)
        self.fc3 = nn.Linear(128, nclass)
        
    def forward(self, x):
        x = self.bn1(x)
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(F.relu(self.conv2(x)), 2)
        x = self.drop1(x)
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(F.relu(self.conv4(x)), 2)
        x = self.drop2(x)
        x = F.max_pool2d(F.relu(self.conv5(x)), 2)
        x = self.drop3(x)
        x = x.view(-1, self.num_flat_features(x))
        x = F.relu(self.fc1(x))
        x = self.bn2(x)
        x = F.relu(self.fc2(x))
        x = self.bn3(x)
        x = F.softmax(self.fc3(x))        
        return x

    def num_flat_features(self, x):
        size = x.size()[1:]
        num_features = 1
        for s in size:
            num_features *= s
        return num_features

In [139]:
num_epochs = 5
learning_rate = 0.001
nclass = 12
batch_size = 16

cnn = CNN(nclass)
print(cnn)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(cnn.parameters(), lr=learning_rate)

for epoch in range(num_epochs):
    for i in range(len(y_train) // batch_size):
        start = i * batch_size
        x = Variable(torch.from_numpy(x_train[start : start + batch_size]).permute(0, 3, 1, 2))
        y = Variable(torch.from_numpy(np.argmax(y_train[start : start + batch_size], axis=1)))
        optimizer.zero_grad()
        outputs = cnn(x)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()
        if (i + 1) % 100 == 0:
            print ('Epoch [%d/%d], Iter [%d/%d] Loss: %.4f' 
                   %(epoch + 1, num_epochs, i + 1, len(y_train) // batch_size, loss.data[0]))

CNN(
  (bn1): BatchNorm2d(1, eps=1e-05, momentum=0.1, affine=True)
  (conv1): Conv2d(1, 8, kernel_size=(2, 2), stride=(1, 1))
  (conv2): Conv2d(8, 8, kernel_size=(2, 2), stride=(1, 1))
  (drop1): Dropout(p=0.2)
  (conv3): Conv2d(8, 16, kernel_size=(3, 3), stride=(1, 1))
  (conv4): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1))
  (drop2): Dropout(p=0.2)
  (conv5): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1))
  (drop3): Dropout(p=0.2)
  (fc1): Linear(in_features=2240, out_features=128, bias=True)
  (bn2): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True)
  (fc2): Linear(in_features=128, out_features=128, bias=True)
  (bn3): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True)
  (fc3): Linear(in_features=128, out_features=12, bias=True)
)




Epoch [1/5], Iter [100/3647] Loss: 2.3632
Epoch [1/5], Iter [200/3647] Loss: 1.9987
Epoch [1/5], Iter [300/3647] Loss: 1.9348
Epoch [1/5], Iter [400/3647] Loss: 2.0570
Epoch [1/5], Iter [500/3647] Loss: 1.9319
Epoch [1/5], Iter [600/3647] Loss: 1.8069
Epoch [1/5], Iter [700/3647] Loss: 2.1813
Epoch [1/5], Iter [800/3647] Loss: 1.8066
Epoch [1/5], Iter [900/3647] Loss: 1.9939
Epoch [1/5], Iter [1000/3647] Loss: 1.8689
Epoch [1/5], Iter [1100/3647] Loss: 2.1188
Epoch [1/5], Iter [1200/3647] Loss: 2.0563
Epoch [1/5], Iter [1300/3647] Loss: 1.9938
Epoch [1/5], Iter [1400/3647] Loss: 1.9313
Epoch [1/5], Iter [1500/3647] Loss: 1.7438
Epoch [1/5], Iter [1600/3647] Loss: 2.0563
Epoch [1/5], Iter [1700/3647] Loss: 2.0563
Epoch [1/5], Iter [1800/3647] Loss: 2.0563
Epoch [1/5], Iter [1900/3647] Loss: 1.8688
Epoch [1/5], Iter [2000/3647] Loss: 1.8063
Epoch [1/5], Iter [2100/3647] Loss: 2.0563
Epoch [1/5], Iter [2200/3647] Loss: 2.0563
Epoch [1/5], Iter [2300/3647] Loss: 1.8688
Epoch [1/5], Iter [2

In [140]:
cnn.eval() 
correct = 0
total = 0
for i in range(len(y_valid) // batch_size):
    start = i * batch_size
    x = Variable(torch.from_numpy(x_valid[start : start + batch_size]).permute(0, 3, 1, 2))
    y = torch.from_numpy(np.argmax(y_valid[start : start + batch_size], axis=1))
    outputs = cnn(x)
    _, predicted = torch.max(outputs.data, 1)
    total += y.size(0)
    correct += (predicted == y).sum()
    
print('Test Accuracy of the model: %d %%' % (100 * correct / total))    



Test Accuracy of the model: 63 %
