In [1]:
import IPython.display as ipd
from scipy.io import wavfile
import numpy as np
from scipy import signal
from glob import glob
import os 
import random

import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

import librosa
import librosa.display

DATA_DIR = '/Users/seonghoonjung/.kaggle/data/tensorflow-speech-recognition'
TRAIN_AUDIO_DIR = DATA_DIR + '/train/audio/'
TEST_AUDIO_DIR = DATA_DIR + '/test/audio/'

SEED=2018

## 훈련 데이터를 화자 기반으로 훈련:검증 데이터로 분리 

In [2]:
labels = ['yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop', 'go']

def random_shuffle(lst):
    random.seed(SEED)
    random.shuffle(lst)
    return lst

# 결과 저장 폴더
if not os.path.exists('input'):
    os.mkdir('input')
    
# 훈련 데이터 전체를 (레이블,화자,파일명) 형태로 메타 파일에 저장한다. 
trn_all_file = open('input/trn_all.txt', 'w')
trn_all = []

files = glob(TRAIN_AUDIO_DIR+'/*/*.wav')
for f in files:
    # 배경 노이즈는 제외 
    if '_background_noise_' in f:
        continue
    
    # 레이블과 화자 정보를 파일명에서 추출
    label = f.split('/')[-2]
    speaker = f.split('/')[-1].split('_')[0]
    
    # 지정된 레이블이 아니면 unknown 처리
    if label not in labels:
        label = 'unknown'
        if random.random() < 0.2:  # 20% 만 사용 
            trn_all.append((label,speaker,f))
            trn_all_file.write('{},{},{}\n'.format(label, speaker, f))
    else:
        trn_all.append((label,speaker,f))
        trn_all_file.write('{},{},{}\n'.format(label, speaker, f))        
trn_all_file.close()

# 화자 기준으로 9:1로 학습:검증을 분리한다. 
unique_speakers = list(set([speaker for (label, speaker, path) in trn_all]))
random_shuffle(unique_speakers)
cutoff = int(len(unique_speakers) * 0.9)
speaker_val = unique_speakers[cutoff:]

# 교차 검증용 파일을 생성한다. 
trn_file = open('input/trn.txt', 'w')
val_file = open('input/val.txt', 'w')
for (label, speaker, path) in trn_all:
    if speaker not in speaker_val:
        trn_file.write('{},{},{}\n'.format(label, speaker, f))
    else:
        val_file.write('{},{},{}\n'.format(label, speaker, f))    
trn_file.close()
val_file.close()

# 테스트 파일에 대해서도 메타 파일 생성 (레이블과 화자 정보 없음)
tst_all_file = open('input/tst.txt', 'w')
files = glob(TEST_AUDIO_DIR+'/*.wav')
for f in files:
    tst_all_file.write(',,{}\n'.format(f))    
tst_all_file.close()

## 데이터 전처리 루틴

In [3]:
import torch
import numpy as np
from torch.utils.data import Dataset
import librosa
from glob import glob
import random

# 샘플링 비율
SR = 16000

In [4]:
class SpeechDataset(Dataset):
    def __init__(self, mode, label_to_int, wav_list, label_list=None):
        self.mode = mode
        self.label_to_int = label_to_int # 문자 -> 숫자 레이블 전환 
        self.wav_list = wav_list
        self.label_list = label_list  # 문자로 된 label
        self.sr = SR
        self.n_silence = int(len(wav_list) * 0.1)  # 묵음은 전체 데이터의 10%
        
        # 배경 데이터를 미리 읽어온다. 
        # 데이터 augmentation에 사용
        self.bg_noises = [librosa.load(f, sr=self.sr)[0] for f in glob(TRAIN_AUDIO_DIR+'/_background_noise_/*.wav')]
        
    # 목록에서 idx번째 음성 데이터를 전달(1초만)
    def get_one_word_wav(self, idx):
        wav = librosa.load(self.wav_list[idx], sr=self.sr)[0]
        if len(wav) < self.sr: # 1초보다 모자르면 뒤에 0으로 패딩
            wav = np.pad(wav, (0, self.sr - len(wav)), 'constant')
        return wav[:self.sr]
    
        
    # 배경 노이즈 목록에서 랜덤하게 1초만 전달    
    def get_one_noise(self):
        # 노이즈 랜덤 선택
        noise = self.bg_noises[random.randint(0,len(self.bg_noises)-1)]
        
        # 시작 위치 랜덤 선택
        start_idx = random.randint(0, len(noise)-1-self.sr)
        
        return noise[start_idx:(start_idx+self.sr)]
    
    # N개의 배경 노이즈를 합성하여 전달
    def get_mix_noises(self, num_noise=1, max_ratio=0.1):
        result = np.zeros(self.sr)
        for _ in range(num_noise):
            result += random.random() * max_ratio * self.get_one_noise()
        return result / num_noise if num_noise >0 else result
    
    def get_silent_wav(self, num_noise=1, max_ratio=0.5):
        return self.get_mix_noises(num_noise=num_noise, max_ratio=max_ratio)
    
    def __len__(self):
        if self.mode == 'test':
            return len(self.wav_list)
        else:
            return len(self.wav_list) + self.n_silence # 교차 검증일 때는 silence를 추가한 만큼이 데이터 크기
        
     
    # i번째 음성 데이터의 스펙트럼을 전달 
    # 목록 이상을 요청하는 경우는 배경 노이즈를 전달
    def __getitem__(self,idx):
        if idx < len(self.wav_list):
            spec_numpy = self.preprocess_mel(self.get_one_word_wav(idx))
            spec_tensor = torch.from_numpy(spec_numpy).float()
            spec_tensor = spec_tensor.unsqueeze(0) # to 2D
            
            # feature와 파일 경로, label 전달
            if self.mode == 'test':
                return { 'spec': spec_tensor, 'id': self.wav_list[idx]}
            else:
                label = self.label_to_int.get(self.label_list[idx], len(self.label_to_int))
                return { 'spec': spec_tensor, 'id': self.wav_list[idx], 'label': label}
                
        else:
             # 노이즈 전달
            spec_numpy = self.preprocess_mel(
                self.get_silent_wav(
                    num_noise=random.choice([0,1,2,3]),
                    max_ratio=random.choice([x/10 for x in range(20)]))
            )
            spec_tensor = torch.from_numpy(spec_numpy).float()
            spec_tensor = spec_tensor.unsqueeze(0) # to 2D    
            return { 'spec': spec_tensor, 'id': 'silence', 'label': len(self.label_to_int) + 1}  # 라벨 index는 번외 +1 
            
    
    def preprocess_mel(self, data, n_mels=40):
        spectrogram = librosa.feature.melspectrogram(data, sr=SR, n_mels=n_mels, hop_length=160, n_fft=480, fmin=20, fmax=4000)
        spectrogram = librosa.power_to_db(spectrogram)
        spectrogram = spectrogram.astype(np.float32) 
        return spectrogram

## 모델 구현 

In [5]:
import torch
import torch.nn.functional as F
from torch.nn import MaxPool2d

# ResNet 구현 
class ResModel(torch.nn.Module):
    def __init__(self):
        super(ResModel, self).__init__()
        
        n_labels = 12
        n_maps = 128 
        
        # 총 9계층
        self.n_layers = n_layers = 9
        
        # 모듈 정의
        self.conv0 = torch.nn.Conv2d(1, n_maps, (3,3), padding=(1,1), bias=False)
        self.pool = MaxPool2d(2, return_indices=True)
        self.convs = torch.nn.ModuleList(
            [torch.nn.Conv2d(n_maps, n_maps, (3,3), padding=(1,1), dilation=1, bias=False) for _ in range(n_layers)]
        )
        
        # 조합
        for i, conv in enumerate(self.convs):
            # BN - 9 convs
            self.add_module("bn{}".format(i+1), torch.nn.BatchNorm2d(n_maps, affine=False))
            self.add_module("conv{}".format(i+1), conv)

        # 최종 계층은 선형 모듈
        self.output = torch.nn.Linear(n_maps, n_labels)
    
    def forward(self,x):
        for i in range(self.n_layers+1):
            y = F.relu(getattr(self, "conv{}".format(i))(x))
            
            # residual 구현 - 2계층마다 residual
            if i == 0:
                old_x = y
            if i > 0 and i%2 == 0:
                x = y + old_x
                old_x = x
            else:
                x = y
            
            # BatchNorm
            if i > 0:
                x = getattr(self, "bn{}".format(i))(x)
            
            # pooling
            pooling = False
            if pooling:
                x_pool, pool_indices = self.pool(x)
                x = self.unpool(x_pool. pool_indices, output_size=x.size())

        x = x.view(x.size(0), x.size(1), -1)
        x = torch.mean(x, 2)
        
        # 최종 선형 계층을 통과한 결과값 반환
        return self.output(x)
                
                

## 모델 학습

In [12]:
from torch.autograd import Variable
from torch.utils.data import DataLoader
import torch
from time import time
from torch.nn import Softmax
import numpy as np
import pandas as pd
import os
from random import choice
from tqdm import tqdm

def create_directory(dir):
    if not os.path.exists(dir):
        os.makedirs(dir)

def get_time(now, start):
    time_in_min = int((now - start) / 60)
    return time_in_min

BATCH_SIZE = 32
mGPU = False
epochs = 1 #20
mode = 'cv'
model_name = 'model/model_resnet.pth'

# 모델 정의
loss_fn = torch.nn.CrossEntropyLoss()
model = ResModel
speechModel = torch.nn.DataParallel(model()) if mGPU else model()
#speechModel = speechModel.cuda()

# Dataset 정의
labels = ['yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop', 'go']
label_to_int = dict(zip(labels, range(len(labels))))
int_to_label = dict(zip(range(len(labels)), labels))
int_to_label.update({len(labels): 'unknown', len(labels)+1:'silence'})

trn = 'input/trn.txt' if mode == 'cv' else 'input/trn_all.txt'
tst = 'input/val.txt' if mode == 'cv' else 'input/tst.txt'

# Train DataSet 활성화 
trn_size = BATCH_SIZE # * 10
trn = [line.strip()for line in open(trn, 'r').readlines()]
wav_list = [line.split(',')[-1] for line in trn][:trn_size]  
label_list = [line.split(',')[0] for line in trn][:trn_size]
trainDataset = SpeechDataset(mode='train', label_to_int=label_to_int, wav_list=wav_list, label_list=label_list)

In [15]:
start_time = time()
for e in range(epochs):
    print('training epoch ', e)
    
    # lr decay
    learning_rate = 0.01 if e < 10 else 0.001
    
    optimizer = torch.optim.SGD(filter(lambda p: p.requires_grad, speechModel.parameters()), lr=learning_rate, momentum=0.9, weight_decay=0.00001)
    
    # train mode
    speechModel.train()
    
    total_correct = 0
    num_labels = 0
    
    trainLoader = DataLoader(trainDataset, BATCH_SIZE, shuffle=True)
    
    # 학습을 수행한다. 
    for batch_idx, batch_data in enumerate(tqdm(trainLoader)):
        spec = batch_data['spec']
        label = batch_data['label']
        #spec, label = Variable(spec).cuda(), Variable(label).cuda()
        
        # 예측값 계산 
        y_pred = speechModel(spec)
        _, pred_labels = torch.max(y_pred.data, 1)
        correct = (pred_labels == label.data).sum()
        
        total_correct += correct
        num_labels += len(label)
        
        # loss 계산 
        loss = loss_fn(y_pred, label) 
        
        # 역전파 
        optimizer.zero_grad()  
        loss.backward()
        
        # 업데이트
        optimizer.step()
        
    # 정확도 계산
    print('training accuracy:', 100. * total_correct/num_labels, get_time(time(), start_time))
        
    # 교차 검증 모드의 경우, 검증 데이터에 대한 정확률을 기록한다
    if mode == 'cv':
        # 현재 학습 중인 모델을 임시로 저장한다
        torch.save(speechModel.state_dict(), '{}_cv'.format(model_name))
        
        # 검증 데이터를 불러온다
        val_size = 10
        softmax = Softmax()
        tst_list = [line.strip() for line in open(tst, 'r').readlines()]
        wav_list = [line.split(',')[-1] for line in tst_list][:val_size]
        label_list = [line.split(',')[0] for line in tst_list][:val_size]
        cvdataset = SpeechDataset(mode='test', label_to_int=label_to_int, wav_list=wav_list)
        cvloader = DataLoader(cvdataset, BATCH_SIZE, shuffle=False)

        # 모델을 불러와 .eval() 함수로 검증 준비를 한다
        speechmodel = torch.nn.DataParallel(model()) if mGPU else model()
        speechmodel.load_state_dict(torch.load('{}_cv'.format(model_name)))
        #speechmodel = speechmodel.cuda()
        speechmodel.eval()

        # 검증 데이터를 batch_size만큼씩 받아오며 예측값을 저장한다
        fnames, preds = [], []
        for batch_idx, batch_data in enumerate(tqdm(cvloader)):
            spec = Variable(batch_data['spec'])
            fname = batch_data['id']
            y_pred = softmax(speechmodel(spec))
            preds.append(y_pred.data.cpu().numpy())
            fnames += fname

        preds = np.vstack(preds)
        preds = [int_to_label[x] for x in np.argmax(preds, 1)]
        fnames = [fname.split('/')[-2] for fname in fnames]
        num_correct = 0
        for true, pred in zip(fnames, preds):
            if true == pred:
                num_correct += 1

        # 검증 데이터의 정확률을 기록한다
        print("cv accuracy:", 100. * num_correct / len(preds), get_time(time(), start_time))
        
# 학습이 완료된 모델을 저장한다
create_directory("model")
torch.save(speechmodel.state_dict(), model_name)
    
    



  0%|          | 0/2 [00:00<?, ?it/s][A[A

training epoch  0




 50%|█████     | 1/2 [00:17<00:17, 17.51s/it][A[A

100%|██████████| 2/2 [00:19<00:00,  9.56s/it][A[A


  0%|          | 0/1 [00:00<?, ?it/s][A[A

training accuracy: tensor(0.) 0




100%|██████████| 1/1 [00:01<00:00,  1.49s/it][A[A

cv accuracy: 0.0 0





## 테스트 데이터에 대해 평가

In [19]:
tst = 'input/tst.txt'

# 테스트 데이터에 대한 예측값을 파일에 저장한다
print("doing prediction...")
softmax = Softmax()

# 테스트 데이터를 불러온다
tst_size = 10
tst = [line.strip() for line in open(tst, 'r').readlines()]
wav_list = [line.split(',')[-1] for line in tst][:tst_size]
testdataset = SpeechDataset(mode='test', label_to_int=label_to_int, wav_list=wav_list)
testloader = DataLoader(testdataset, BATCH_SIZE, shuffle=False)

# 모델을 불러온다
speechmodel = torch.nn.DataParallel(model()) if mGPU else model()
speechmodel.load_state_dict(torch.load(model_name))
#speechmodel = speechmodel.cuda()
speechmodel.eval()
    
test_fnames, test_labels = [], []
pred_scores = []

# 테스트 데이터에 대한 예측값을 계산한다
for batch_idx, batch_data in enumerate(tqdm(testloader)):
    #spec = Variable(batch_data['spec'].cuda())
    spec = Variable(batch_data['spec'])
    fname = batch_data['id']
    y_pred = softmax(speechmodel(spec))
    pred_scores.append(y_pred.data.cpu().numpy())
    test_fnames += fname

# 가장 높은 확률값을 가진 예측값을 label 형태로 저장한다
final_pred = np.vstack(pred_scores)
final_labels = [int_to_label[x] for x in np.argmax(final_pred, 1)]
test_fnames = [x.split("/")[-1] for x in test_fnames]

# 테스트 파일 명과 예측값을 sub 폴더 아래 저장한다. 캐글에 직접 업로드 할 수 있는 파일 포맷이다.
create_directory("sub")
pd.DataFrame({'fname': test_fnames, 'label': final_labels}).to_csv("sub/{}.csv".format(model_name.split('/')[-1]), index=False)




  0%|          | 0/1 [00:00<?, ?it/s][A[A

doing prediction...




100%|██████████| 1/1 [00:01<00:00,  1.56s/it][A[A
