- competition/dataset : [https://www.kaggle.com/c/tensorflow-speech-recognition-challenge/data](https://www.kaggle.com/c/tensorflow-speech-recognition-challenge/data)
- date : 2021/02/19
- original : [https://www.kaggle.com/alphasis/light-weight-cnn-lb-0-74](https://www.kaggle.com/alphasis/light-weight-cnn-lb-0-74)

## Light-Weight CNN LB 0.74

**✏ 필사 1회** 

### Preface
이 노트북은 가벼운 CNN을 구축하는데 초점을 두었습니다. 이것은 입력으로 리샘플링된 wav 파일의 spectrogram을 사용합니다. kaggle 클라우드의 하드웨어 제한 때문에 원래 버전에 비해 이 스크립트는 모자란 버전입니다.  

LB 0.74를 기록하기 위해서는 epoch를 5로 설정하고 chop_audio(num=1000)도 설정해야 하며, 모든 Conv layer 파라미터를 두배로 해야합니다.  

이 스크립트가 비록 Alex Ozerin의 베이스라인보다 조금 우수한 정도이지만, 원래의 wav 파일(16000 sample rate)을 사용하면 훨씬 더 높은 점수를 얻을 수 있을 것이라고 확신합니다.

### Improve This Script
오로지 경량화된 CNN만 사용하기 때문에 성능에 제한이 있습니다. 다음과 같은 방법들로 성능을 개선할 수 있습니다:  
1. 리샘플된 데이터 대신 원래의 wav 파일 사용
2. chop_audio를 사용하여 더욱 silence한 wav 파일 생성
3. 더 깊은 CNN을 구축하거나 RNN 사용
4. epochs 횟수를 늘려서 훈련

### After Words
LB 0.88을 도달하려면 아직도 멀었습니다. 사실, CNN이 그렇게 높은 곳에 도달할 수 있을지도 의심스럽습니다.  

In [13]:
import os
import numpy as np
from scipy.fftpack import fft
from scipy import signal
from glob import glob
import re
import pandas as pd
import gc
from scipy.io import wavfile

from keras import optimizers, losses, activations, models
from keras.layers import Convolution2D, Dense, Input, Flatten, Dropout, MaxPooling2D, BatchNormalization
from sklearn.model_selection import train_test_split
import keras

원래의 sample rate는 16000이며, 데이터 크기를 줄이기 위해 8000으로 resampling할 것입니다.

In [14]:
L = 16000
legal_labels = 'yes no up down left right on off stop go silence unknown'.split()

# src folders
root_path = 'data'
out_path = 'data'
model_path = 'data'
train_data_path = os.path.join(root_path, 'train', 'audio')
test_data_path = os.path.join(root_path, 'test', 'audio')

In [15]:
wavfile.read(os.path.join(train_data_path, 'on', '00b01445_nohash_0.wav'))

(16000, array([  16,    9,   52, ..., -170, -152, -176], dtype=int16))

In [16]:
def custom_fft(y, fs):
    T = 1.0 / fs
    N = y.shape[0]
    yf = fft(y)
    xf = np.linspace(0.0, 1.0/(2.0*T), N//2)
    # FFT는 대칭형태이기 때문에 절반만 사용
    
    vals = 2.0/N * np.abs(yf[0:N//2])
    return xf, vals

In [17]:
def log_specgram(audio, sample_rate, window_size=20, step_size=10, eps=1e-10):
    nperseg = int(round(window_size * sample_rate / 1e3))
    noverlap = int(round(step_size * sample_rate / 1e3))
    freqs, times, spec = signal.spectrogram(
        audio, fs=sample_rate, window='hann', nperseg=nperseg,
        noverlap=noverlap, detrend=False
    )
    return freqs, times, np.log(spec.T.astype(np.float32) + eps)

In [18]:
def list_wavs_fname(dirpath, ext='wav'):
    print(dirpath)
    fpaths = glob(os.path.join(dirpath, r'*/*' + ext))
    pat = r'.+\\(\w+)\\\w+\.' + ext + '$'
    labels = []
    for fpath in fpaths:
        r = re.match(pat, fpath)
        if r:
            labels.append(r.group(1))
    pat = r'.+\\(\w+\.' + ext + ')$'
    fnames = []
    for fpath in fpaths:
        r = re.match(pat, fpath)
        if r:
            fnames.append(r.group(1))
    return labels, fnames

**pad_audio**: 16000 이하인 오디오들에 대해 같은 길이를 갖도록 0으로 padding.  
**chop_audio**: 16000 이상인 오디오들을 16000으로 잘라냄. 주어진 매개변수 'num'에 따라 하나의 큰 wav 파일로부터 여러 개의 청크를 생성.
**label_transform**: 더미 값들의 레이블 변경. 레이블을 예측하기 위해 softmax와 조합할 때 사용

In [19]:
def pad_audio(samples):
    if len(samples) >= L:
        return samples
    else:
        return np.pad(samples, pad_width=(L - len(samples), 0),
                      mode='constant', constant_values=(0, 0))

In [20]:
def chop_audio(samples, L=16000, num=20):
    for i in range(num):
        beg = np.random.randint(0, len(samples) - L)
        yield samples[beg:beg+L]

In [21]:
def label_transform(labels):
    nlabels = []
    for label in labels:
        if label == '__background_noise_':
            nlabels.append('silence')
        elif label not in legal_labels:
            nlabels.append('unknown')
        else:
            nlabels.append(label)
    return pd.get_dummies(pd.Series(nlabels))

x_train, y_train을 생성하기 위해 위에서 만든 함수들을 사용합니다. label_index는 pandas로 더미 값을 생성하기 위한 인덱스이므로 나중에 사용하겠습니다.

In [22]:
labels, fnames = list_wavs_fname(train_data_path)

data\train\audio


In [23]:
new_sample_rate = L/2
y_train = []
x_train = []

for label, fname in zip(labels, fnames):
    sample_rate, samples = wavfile.read(os.path.join(train_data_path, label, fname))
    samples = pad_audio(samples)
    if len(samples) > L:
        n_samples = chop_audio(samples)
    else:
        n_samples = [samples]
    for samples in n_samples:
        resampled = signal.resample(
            samples, int(new_sample_rate/sample_rate*samples.shape[0])
        )
        _, _, specgram = log_specgram(resampled, sample_rate=new_sample_rate)
        y_train.append(label)
        x_train.append(specgram)

x_train = np.array(x_train)
x_train = x_train.reshape(tuple(list(x_train.shape) + [1]))
y_train = label_transform(y_train)
label_index = y_train.columns.values
y_train = y_train.values
y_train = np.array(y_train)
del labels, fnames
gc.collect()

  sample_rate, samples = wavfile.read(os.path.join(train_data_path, label, fname))


8

이제 CNN을 선언합니다. 생성된 specgram은 형태가 (99, 81)이지만, Conv2D layer에 fitting하기 위해서는 변형시켜야 합니다.

In [24]:
input_shape = (99, 81, 1)
nclass = 11
inp = Input(shape=input_shape)
norm_inp = BatchNormalization()(inp)
img_1 = Convolution2D(8, kernel_size=2, activation=activations.relu)(norm_inp)
img_1 = Convolution2D(8, kernel_size=2, activation=activations.relu)(img_1)
img_1 = MaxPooling2D(pool_size=(2, 2))(img_1)
img_1 = Convolution2D(16, kernel_size=3, activation=activations.relu)(img_1)
img_1 = Convolution2D(16, kernel_size=3, activation=activations.relu)(img_1)
img_1 = MaxPooling2D(pool_size=(2, 2))(img_1)
img_1 = Dropout(rate=0.2)(img_1)
img_1 = Convolution2D(32, kernel_size=3, activation=activations.relu)(img_1)
img_1 = MaxPooling2D(pool_size=(2, 2))(img_1)
img_1 = Dropout(rate=0.2)(img_1)
img_1 = Flatten()(img_1)

dense_1 = BatchNormalization()(Dense(128, activation=activations.relu)(img_1))
dense_1 = BatchNormalization()(Dense(128, activation=activations.relu)(dense_1))
dense_1 = Dense(nclass, activation=activations.softmax)(dense_1)

model = models.Model(inputs=inp, outputs=dense_1)
opt = optimizers.Adam()

model.compile(optimizer=opt, loss=losses.binary_crossentropy)
model.summary()

x_train, x_valid, y_train, y_valid = train_test_split(x_train, y_train, test_size=0.1, random_state=2021)
model.fit(x_train, y_train, batch_size=16, validation_data=(x_valid, y_valid), epochs=3, shuffle=True, verbose=2)

model.save(os.path.join(model_path, 'cnn.model'))

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 99, 81, 1)]       0         
_________________________________________________________________
batch_normalization (BatchNo (None, 99, 81, 1)         4         
_________________________________________________________________
conv2d (Conv2D)              (None, 98, 80, 8)         40        
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 97, 79, 8)         264       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 48, 39, 8)         0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 46, 37, 16)        1168      
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 44, 35, 16)        2320  

테스트 데이터는 RAM으로 fitting시키기에는 너무 크기 때문에 하나씩 처리해야 합니다. test_data_generator는 CNN에 입력하기 위한 테스트 wav 파일의 batch를 생성합니다.

In [25]:
def test_data_generator(batch=16):
    fpaths = glob(os.path.join(test_data_path, '*wav'))
    i = 0
    for path in fpaths:
        if i == 0:
            imgs = []
            fnames = []
        i += 1
        rate, samples = wavfile.read(path)
        samples = pad_audio(samples)
        resampled = signal.resample(samples, int(new_sample_rate / rate * samples.shape[0]))
        _, _, specgram = log_specgram(resampled, sample_rate=new_sample_rate)
        imgs.append(specgram)
        fnames.append(path.split('\\')[-1])
        if i == batch:
            i = 0
            imgs = np.array(imgs)
            imgs = imgs.reshape(tuple(list(imgs.shape) + [1]))
            yield fnames, imgs
    if i < batch:
        imgs = np.array(imgs)
        imgs = imgs.reshape(tuple(list(imgs.shape) + [1]))
        yield fnames, imgs
    raise StopIteration()

훈련된 모델을 사용하여 테스트 데이터의 레이블을 예측합니다.

In [26]:
exit()
del x_train, y_train
gc.collect()

index = []
results = []
for fnames, imgs in test_data_generator(batch=32):
    predicts = model.predict(imgs)
    predicts = np.argmax(predicts, axis=1)
    predicts = [label_index[p] for p in predicts]
    index.extend(fname)
    results.extend(predicts)

RuntimeError: generator raised StopIteration

In [None]:
df = pd.DataFrame(columns=['fname', 'label'])
df['fname'] = index
df['label'] = results
df.to_csv(os.path.join(out_path, 'submission_2_sub.csv'), index=False)