## Preface

이 노트는 가벼운 CNN을 만드는 것을 목표로 합니다.

재샘플링된 wav 파일의 스펙그램(rate 8000)을 입력으로 사용합니다.

Kaggle 클라우드 하드웨어 제한으로 인해 이 스크립트는 원래 스크립트의 '크립' 버전입니다.

LB 0.74를 얻으려면 epoch를 5로 설정하고 chop_pairs(num=1000)를 설정하고 모든 Conv 계층 매개 변수를 두 배로 해야 합니다.

이 스크립트는 Alex Ozerin의 기준보다 약간 개선된 것이지만, 원본 wav 파일(16000 샘플링 속도)을 사용하면 더 높은 점수를 얻을 수 있다고 생각합니다.


## File Structure

이 스크립트는 데이터가 다음 구조에 저장되어 있다고 가정합니다.

speech

├── test            

│   └── audio #test wavfiles

├── train           

│   ├── audio #train wavfiles

└── model #store models

│

└── out #store sub.csv


## Improve This Script
이것은 단지 가벼운 CNN이기 때문에 성능이 제한적입니다.
다음은 성능을 개선할 수 있는 몇 가지 방법입니다.

1. 재샘플링된 파일 대신 원본 wav 파일을 사용합니다.
2. chop_audio를 사용하여 더 많은 '사일런트' wav 파일을 만듭니다.
3. 더 깊은 CNN, RNN을 사용합니다.
4. 더 긴 epoch을 위해 훈련합니다.


In [1]:
import os
import numpy as np
import pandas as pd

from scipy.fftpack import fft
from scipy.io import wavfile
from scipy import signal
from glob import glob # glob는 파일들의 리스트를 뽑을 때 사용하는데, 파일의 경로명을 이용해서 입맛대로 요리할 수 있답니다.
import re # re(gex)가 제공하는 함수 : match(), fullmatch(), findall(), search() 등 - 인덱스, 원하는 문자열 찾기
import gc # 직접 할당/해제하지 않아도 객체가 현재 쓰이는곳이 없을 경우 자동으로 해제

from keras import optimizers, losses, activations, models
from keras.layers import Convolution2D, Dense, Input, Flatten, Dropout, MaxPooling2D, BatchNormalization
from sklearn.model_selection import train_test_split
import keras

원래 샘플링 속도는 16000이며 데이터 크기를 줄이기 위해 8000으로 다시 샘플링하겠습니다.


In [2]:
L = 16000
legal_labels = 'yes no up down left right on off stop go silence unknown'.split()

#src folders
root_path = r'.'
out_path = r'.'
model_path = r'.'
train_data_path = os.path.join(root_path, 'input', 'train', 'train','audio')
test_data_path = os.path.join(root_path, 'input', 'test', 'audio')

In [2]:
legal_labels = 'yes no up down left right on off stop go silence unknown'.split()
legal_labels

['yes',
 'no',
 'up',
 'down',
 'left',
 'right',
 'on',
 'off',
 'stop',
 'go',
 'silence',
 'unknown']

In [3]:
root_path = r'.'
root_path

'..'

In [3]:
def custom_fft(y, fs):
    T = 1.0 / fs
    N = y.shape[0]
    yf = fft(y)
    xf = np.linspace(0.0, 1.0/(2.0*T), N//2)
    # FFT is simmetrical, so we take just the first half
    # FFT is also complex, to we take just the real part (abs)
    vals = 2.0/N * np.abs(yf[0:N//2])
    return xf, vals

def log_specgram(audio, sample_rate, window_size=20,
                 step_size=10, eps=1e-10):
    nperseg = int(round(window_size * sample_rate / 1e3))
    noverlap = int(round(step_size * sample_rate / 1e3))
    freqs, times, spec = signal.spectrogram(audio,
                                    fs=sample_rate,
                                    window='hann',
                                    nperseg=nperseg,
                                    noverlap=noverlap,
                                    detrend=False)
    return freqs, times, np.log(spec.T.astype(np.float32) + eps)

다음은 train 데이터 폴더 내의 모든 wav 파일을 캡처하는 유틸리티 기능입니다.


In [4]:
def list_wavs_fname(dirpath, ext='wav'): # dirpath = train_data_path = '.\\input\\train\\train\\audio'
    fpaths = glob(os.path.join(dirpath, r'*/*' + ext)) # '.\\input\\train\\train\\audio\\bed\\004ae714_nohash_1.wav'
    pat = r'.+/(\w+)/\w+\.' + ext + '$' # '.+/(\\w+)/(\\w+)/\\w+\\.wav$'
    
    labels = []
    for fpath in fpaths:
        r = re.match(pat, fpath) # re.match 함수는“문자열의 처음”부터 시작하여 패턴이 일치되는 것이 있는지를 확인한다.
        
        if r:
            labels.append(r.group(1))
            
    pat = r'.+/(\w+\.' + ext + ')$'
    fnames = []
    
    for fpath in fpaths:
        r = re.match(pat, fpath)
        if r:
            fnames.append(r.group(1))
            
    return labels, fnames

In [5]:
train_data_path

'.\\input\\train\\train\\audio'

In [5]:
fpaths = glob(os.path.join(train_data_path, r'*/*' + 'wav')) # */* : 전체 / 전체+wav
fpaths

['.\\input\\train\\train\\audio\\bed\\00176480_nohash_0.wav',
 '.\\input\\train\\train\\audio\\bed\\004ae714_nohash_0.wav',
 '.\\input\\train\\train\\audio\\bed\\004ae714_nohash_1.wav',
 '.\\input\\train\\train\\audio\\bed\\00f0204f_nohash_0.wav',
 '.\\input\\train\\train\\audio\\bed\\00f0204f_nohash_1.wav',
 '.\\input\\train\\train\\audio\\bed\\012c8314_nohash_0.wav',
 '.\\input\\train\\train\\audio\\bed\\012c8314_nohash_1.wav',
 '.\\input\\train\\train\\audio\\bed\\0132a06d_nohash_0.wav',
 '.\\input\\train\\train\\audio\\bed\\0135f3f2_nohash_0.wav',
 '.\\input\\train\\train\\audio\\bed\\0137b3f4_nohash_0.wav',
 '.\\input\\train\\train\\audio\\bed\\014f9f65_nohash_0.wav',
 '.\\input\\train\\train\\audio\\bed\\01648c51_nohash_0.wav',
 '.\\input\\train\\train\\audio\\bed\\01648c51_nohash_1.wav',
 '.\\input\\train\\train\\audio\\bed\\016e2c6d_nohash_0.wav',
 '.\\input\\train\\train\\audio\\bed\\01b4757a_nohash_0.wav',
 '.\\input\\train\\train\\audio\\bed\\01b4757a_nohash_1.wav',
 '.\\inp

In [6]:
ext = 'wav'
pat = r'.+/(\w+)/(\w+)/\w+\.' + ext + '$'
pat

'.+/(\\w+)/(\\w+)/\\w+\\.wav$'

In [7]:
labels = []

for fpath in fpaths:
    r = re.match(pat, fpath)
    #print(r)
    
    if r:
        labels.append(r.group(1))

print(labels)            

[]


In [8]:
pat = r'.+/(\w+\.' + ext + ')$'
fnames = []

for fpath in fpaths:
    r = re.match(pat, fpath)
    if r:
        fnames.append(r.group(1))
fnames        

[]

__pad_audio__ 는 16000(1초) 미만의 오디오를 0으로 패딩하여 모두 동일한 길이를 갖도록 합니다.


__chop_audio__ 16000보다 큰 오디오(예: 백그라운드 노이즈 폴더의 wav 파일)를 16000으로 잘라냅니다. 또한 'num' 매개 변수가 지정된 하나의 큰 wav 파일로 여러 개의 청크를 생성합니다.


__label_transform__ 레이블을 더미 값으로 변환합니다. 라벨을 예측하기 위해 softmax와 조합하여 사용합니다.


In [23]:
L = 16000
def pad_audio(samples):
    if len(samples) >= L: 
        return samples
    else: 
        return np.pad(samples, pad_width=(L - len(samples), 0), mode='constant', constant_values=(0, 0))

def chop_audio(samples, L=16000, num=20):
    for i in range(num):
        beg = np.random.randint(0, len(samples) - L)
        yield samples[beg: beg + L]

def label_transform(labels):
    nlabels = []
    for label in labels:
        if label == '_background_noise_':
            nlabels.append('silence')
        elif label not in legal_labels:
            nlabels.append('unknown')
        else:
            nlabels.append(label)
    return pd.get_dummies(pd.Series(nlabels))

In [21]:
beg = np.random.randint(0, 8)
beg

3

다음으로 위에서 선언한 함수를 사용하여 x_train과 y_train을 생성합니다.
label_index는 팬더가 더미 값을 생성하기 위해 사용하는 인덱스이므로 나중에 사용하기 위해 저장해야 합니다.


In [20]:
labels, fnames = list_wavs_fname(train_data_path)
labels, fnames

.\input\train\train\audio


([], [])

In [24]:
labels, fnames = list_wavs_fname(train_data_path)

new_sample_rate = 8000
y_train = []
x_train = []

for label, fname in zip(labels, fnames):
    sample_rate, samples = wavfile.read(os.path.join(train_data_path, label, fname))
    samples = pad_audio(samples)
    
    if len(samples) > 16000:
        n_samples = chop_audio(samples)
    else: 
        n_samples = [samples]
    
    for samples in n_samples:
        resampled = signal.resample(samples, int(new_sample_rate / sample_rate * samples.shape[0]))
        _, _, specgram = log_specgram(resampled, sample_rate=new_sample_rate)
        y_train.append(label)
        x_train.append(specgram)
        
x_train = np.array(x_train)
x_train = x_train.reshape(tuple(list(x_train.shape) + [1]))
y_train = label_transform(y_train)
label_index = y_train.columns.values
y_train = y_train.values
y_train = np.array(y_train)

del labels, fnames
gc.collect()

.\input\train\train\audio


  return pd.get_dummies(pd.Series(nlabels))


49

CNN은 다음과 같이 선언했습니다.
생성된 스펙그램은 모양(99, 81)이 되지만 Conv2D 레이어에 적합하려면 모양을 변경해야 합니다.


In [25]:
from tensorflow.keras.optimizers import Adam

input_shape = (99, 81, 1)
nclass = 12
inp = Input(shape=input_shape)
norm_inp = BatchNormalization()(inp)

img_1 = Convolution2D(8, kernel_size=2, activation=activations.relu)(norm_inp)
img_1 = Convolution2D(8, kernel_size=2, activation=activations.relu)(img_1)
img_1 = MaxPooling2D(pool_size=(2, 2))(img_1)
img_1 = Dropout(rate=0.2)(img_1)

img_1 = Convolution2D(16, kernel_size=3, activation=activations.relu)(img_1)
img_1 = Convolution2D(16, kernel_size=3, activation=activations.relu)(img_1)
img_1 = MaxPooling2D(pool_size=(2, 2))(img_1)
img_1 = Dropout(rate=0.2)(img_1)

img_1 = Convolution2D(32, kernel_size=3, activation=activations.relu)(img_1)
img_1 = MaxPooling2D(pool_size=(2, 2))(img_1)
img_1 = Dropout(rate=0.2)(img_1)

img_1 = Flatten()(img_1)

dense_1 = BatchNormalization()(Dense(128, activation=activations.relu)(img_1))
dense_1 = BatchNormalization()(Dense(128, activation=activations.relu)(dense_1))
dense_1 = Dense(nclass, activation=activations.softmax)(dense_1)

model = models.Model(inputs=inp, outputs=dense_1) # from keras import models
opt = Adam()

model.compile(optimizer=opt, loss=losses.binary_crossentropy) #midmcal
model.summary()

x_train, x_valid, y_train, y_valid = train_test_split(x_train, y_train, test_size=0.1, random_state=2017)
model.fit(x_train, y_train, batch_size=16, validation_data=(x_valid, y_valid), epochs=3, shuffle=True, verbose=2)

model.save(os.path.join(model_path, 'cnn.model'))

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 99, 81, 1)]       0         
                                                                 
 batch_normalization (BatchN  (None, 99, 81, 1)        4         
 ormalization)                                                   
                                                                 
 conv2d (Conv2D)             (None, 98, 80, 8)         40        
                                                                 
 conv2d_1 (Conv2D)           (None, 97, 79, 8)         264       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 48, 39, 8)        0         
 )                                                               
                                                                 
 dropout (Dropout)           (None, 48, 39, 8)         0     

ValueError: With n_samples=0, test_size=0.1 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.

테스트 데이터가 RAM에 들어가기에는 너무 커서 하나씩 처리해야 합니다.
생성기 test_data_generator는 CNN에 제공할 테스트 wav 파일 배치를 생성합니다.


In [None]:
def test_data_generator(batch=16):
    fpaths = glob(os.path.join(test_data_path, '*wav'))
    i = 0
    for path in fpaths:
        if i == 0:
            imgs = []
            fnames = []
        i += 1
        rate, samples = wavfile.read(path)
        samples = pad_audio(samples)
        resampled = signal.resample(samples, int(new_sample_rate / rate * samples.shape[0]))
        _, _, specgram = log_specgram(resampled, sample_rate=new_sample_rate)
        imgs.append(specgram)
        fnames.append(path.split('\\')[-1])
        
        if i == batch:
            i = 0
            imgs = np.array(imgs)
            imgs = imgs.reshape(tuple(list(imgs.shape) + [1]))
            yield fnames, imgs
            
    if i < batch:
        imgs = np.array(imgs)
        imgs = imgs.reshape(tuple(list(imgs.shape) + [1]))
        yield fnames, imgs
    raise StopIteration()

교육받은 모형을 사용하여 검정 데이터의 레이블을 예측합니다.
그러나 Kaggle은 테스트 데이터를 제공하지 않기 때문에 다음 섹션은 여기에서 실행되지 않습니다.

In [None]:
exit() #delete this
del x_train, y_train
gc.collect()

index = []
results = []
for fnames, imgs in test_data_generator(batch=32):
    predicts = model.predict(imgs)
    predicts = np.argmax(predicts, axis=1)
    predicts = [label_index[p] for p in predicts]
    index.extend(fnames)
    results.extend(predicts)

df = pd.DataFrame(columns=['fname', 'label'])
df['fname'] = index
df['label'] = results
df.to_csv(os.path.join(out_path, 'sub.csv'), index=False)