# Model2
구글 colab 환경에서 진행.  

모델 성능은 **wandb.ai**에 기록  

기쁨, 슬픔, 당황, 분노, 중립 각 감정마다 30명의 성우, 데이터 30개씩 학습 진행.  

음성 데이터 증강을 하기 전 모델

In [None]:
import numpy as np
import librosa
import random
import tensorflow as tf
import os
import warnings
warnings.filterwarnings('ignore')
from pydub import AudioSegment


def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)

seed_everything(927)

from tensorflow.keras.models import load_model
from sklearn.metrics import accuracy_score

from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Dropout, Flatten, GlobalAveragePooling2D, Conv2D, MaxPool2D, ZeroPadding2D, BatchNormalization, Input, DepthwiseConv2D, Add, LeakyReLU, ReLU
from tensorflow.keras.optimizers import Adam, SGD

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tqdm import tqdm
import os
import warnings
warnings.filterwarnings('ignore')

import random
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)

seed_everything(927)


import librosa
from sklearn.model_selection import train_test_split

from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Dropout, Flatten, GlobalAveragePooling2D, Conv2D, MaxPool2D, ZeroPadding2D, BatchNormalization, Input, DepthwiseConv2D, Add, LeakyReLU, ReLU
from tensorflow.keras.optimizers import Adam, SGD

from sklearn.model_selection import StratifiedKFold
from tensorflow.keras.models import load_model
from sklearn.metrics import accuracy_score

import wandb
from wandb.keras import WandbCallback
wandb.login()

wandb_project = 'cp2'
wandb_group = '2hg'

In [None]:
class Emotion_pred:
    def __init__(self, size, pad_size, repeat_size, sr, epochs, batch_size):
        self.size = size
        self.pad_size = pad_size
        self.repeat_size = repeat_size
        self.sr = sr
        self.epochs = epochs
        self.batch_size = batch_size
        
        wandb.config.size = self.size
        wandb.config.pad_size = self.pad_size
        wandb.config.repeat_size = self.repeat_size
        wandb.config.sr = self.sr
        wandb.config.epochs = self.epochs
        wandb.config.batch_size = self.batch_size


    def load_audio(self, file_names, target, path):
        audios = []
        for audio in tqdm(file_names):
            au_audio, _ = librosa.load(path+audio, sr=self.sr)
            audio_array = np.array(au_audio)
            audios.append(audio_array)
        audios = np.array(audios)
        target = target.copy()

        return audios, target


    def random_pad(mels, pad_size, mfcc=True):
        pad_width = pad_size - mels.shape[1]
        rand = np.random.rand()
        left = int(pad_width * rand)
        right = pad_width - left
        
        if mfcc:
            mels = np.pad(mels, pad_width=((0,0), (left, right)), mode='constant')
            local_max, local_min = mels.max(), mels.min()
            mels = (mels - local_min)/(local_max - local_min)
        else:
            local_max, local_min = mels.max(), mels.min()
            mels = (mels - local_min)/(local_max - local_min)
            mels = np.pad(mels, pad_width=((0,0), (left, right)), mode='constant')


        return mels

    
    
    def train_mels_mfcc(self, audio):
        audio_mels = []
        audio_mfcc = []

        for y in tqdm(audio):
            mels = librosa.feature.melspectrogram(y, sr=self.sr, n_mels=self.size)
            mels = librosa.power_to_db(mels, ref=np.max)

            mfcc = librosa.feature.mfcc(y, sr=self.sr, n_mfcc=self.size)

            for i in range(self.repeat_size):
                audio_mels.append(Emotion_pred.random_pad(mels, pad_size=self.pad_size, mfcc=False))
                audio_mfcc.append(Emotion_pred.random_pad(mfcc, pad_size=self.pad_size, mfcc=True))

        audio_mels_array = np.array(audio_mels, np.float64)
        audio_mfcc_array = np.array(audio_mfcc, np.float64)

        print()
        print("train mels shape: ", audio_mels_array.shape)
        print("train mfcc shape: ", audio_mfcc_array.shape)
        print()

        return audio_mels_array, audio_mfcc_array


    def test_mels_mfcc(self, audio):
        audio_mels = []
        audio_mfcc = []

        for y in tqdm(audio):
            mels = librosa.feature.melspectrogram(y, sr=self.sr, n_mels=self.size)
            mels = librosa.power_to_db(mels, ref=np.max)

            mfcc = librosa.feature.mfcc(y, sr=self.sr, n_mfcc=self.size)

            audio_mels.append(Emotion_pred.random_pad(mels, pad_size=self.pad_size, mfcc=False))
            audio_mfcc.append(Emotion_pred.random_pad(mfcc, pad_size=self.pad_size, mfcc=True))

        audio_mels_array = np.array(audio_mels, np.float64)
        audio_mfcc_array = np.array(audio_mfcc, np.float64)

        print()
        print("test mels shape: ", audio_mels_array.shape)
        print("test mfcc shape: ", audio_mfcc_array.shape)
        print()

        return audio_mels_array, audio_mfcc_array




    def residual_block(self, x, filters_in, filters_out):
        self.x = x
        self.filters_in = filters_in
        self.filters_out = filters_out
        
        shortcut = self.x
        self.x = BatchNormalization()(self.x)
        self.x = ReLU()(self.x)
        self.x = Conv2D(self.filters_in, kernel_size=(1, 1), strides=(1, 1), padding="same",kernel_initializer='he_normal')(self.x)

        self.x = BatchNormalization()(self.x)
        self.x = ReLU()(self.x)    
        self.x = Conv2D(self.filters_in, kernel_size=(3, 3), strides=(1, 1), padding="same",kernel_initializer='he_normal')(self.x)

        self.x = BatchNormalization()(self.x)
        self.x = ReLU()(self.x)  
        self.x = Conv2D(self.filters_out, kernel_size=(1, 1), strides=(1, 1), padding="same",kernel_initializer='he_normal')(self.x)

        shortcut_channel = self.x.shape.as_list()[0]
        
        if shortcut_channel != self.filters_out:
            shortcut = Conv2D(self.filters_out, kernel_size=(1, 1), strides=(1, 1), padding="same",kernel_initializer='he_normal')(shortcut)
            
        self.x = Add()([self.x, shortcut])
        return ReLU()(self.x)



    def build_model(self):

        inputs = tf.keras.layers.Input(shape=(self.size, self.pad_size,1))

        outputs = Conv2D(16,(3,3),activation=None,padding='same',kernel_initializer='he_normal')(inputs)
        outputs = BatchNormalization()(outputs)
        outputs = ReLU()(outputs)
        outputs = MaxPool2D((2,2))(outputs)

        outputs = Emotion_pred.residual_block(self, outputs, 16, 32)
        outputs = MaxPool2D((2,2))(outputs)
        outputs = Emotion_pred.residual_block(self, outputs, 32, 32)
        outputs = Emotion_pred.residual_block(self, outputs, 32, 32)
        #outputs = Emotion_pred.residual_block(self, outputs, 32, 64)
        outputs = MaxPool2D((2,2))(outputs)
        outputs = Emotion_pred.residual_block(self, outputs, 64, 64)
        outputs = Emotion_pred.residual_block(self, outputs, 64, 64)
        outputs = MaxPool2D((2,2))(outputs)

        outputs = GlobalAveragePooling2D()(outputs)
        outputs = Flatten()(outputs)

        outputs = Dense(32,activation=None,kernel_initializer='he_normal')(outputs)
        outputs = BatchNormalization()(outputs)
        outputs = ReLU()(outputs)
        outputs = Dropout(0.5)(outputs)

        outputs = Dense(5,activation='softmax')(outputs)
        model = Model(inputs=inputs, outputs=outputs)
        model.compile(optimizer='adam',
                    loss='sparse_categorical_crossentropy',
                    metrics=['accuracy'])
        
        return model

In [None]:
def start(model_n, size, pad_size, repeat_size, sr, epochs, batch_size):
    BASE_PATH = '/content/drive/MyDrive/CP2/'
    train_df = pd.read_csv('/content/drive/MyDrive/CP2/Dir_New/train.csv')
    test_df = pd.read_csv('/content/drive/MyDrive/CP2/Dir_New/test.csv')
    
    train_file_names = train_df['file_name'].to_numpy()
    test_file_names = test_df['file_name'].to_numpy()
    target = train_df['label'].to_numpy()

    
    model_name = Emotion_pred(size=size, pad_size=pad_size, repeat_size=repeat_size, sr=sr, epochs=epochs, batch_size=batch_size)
    
    audio_train, target_train = model_name.load_audio(train_file_names, target, path=BASE_PATH+'Dir_New/train/')
    audio_test, _ = model_name.load_audio(test_file_names, np.array([None]), path=BASE_PATH+'Dir_New/test/')
    
    audio_mels_array, audio_mfcc_array = model_name.train_mels_mfcc(audio_train)
    audio_mels_array_test, audio_mfcc_array_test = model_name.test_mels_mfcc(audio_test)
    repeated_target = np.repeat(train_df['label'].to_numpy(), repeat_size)


    acc_list = []
    pred_list = []
    skf = StratifiedKFold(n_splits=5)

    for fold,(train_index, val_index) in enumerate(skf.split(audio_mels_array, repeated_target)):

        print(f'\n********** {fold+1} fold **********')

        preds_val_list = []
        ### melspectrogram ###
        model = model_name.build_model()
        x_train, x_val, y_train, y_val = audio_mels_array[train_index], audio_mels_array[val_index], repeated_target[train_index], repeated_target[val_index]
        filepath = f"/content/drive/MyDrive/CP2/DS/{model_n}/{model_n}.res_test_0930_mels_{fold+1}.h5"
        callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath, monitor='val_loss', verbose=0, save_best_only=True, mode='min'), WandbCallback()]
        history = model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_val,y_val), callbacks=callbacks, verbose=0)
        model = load_model(filepath)

        preds_val = model.predict(x_val)
        preds_val_list.append(preds_val)
        preds_val_label = np.argmax(preds_val, axis=1)
        pred_list.append(model.predict(audio_mels_array_test))
        print(f'mels_model_acc : {accuracy_score(y_val,preds_val_label):.4f}')

        ### mfcc ###
        model = model_name.build_model()
        x_train, x_val, y_train, y_val = audio_mfcc_array[train_index], audio_mfcc_array[val_index], repeated_target[train_index], repeated_target[val_index]
        filepath = f"/content/drive/MyDrive/CP2/DS/{model_n}/{model_n}.res_test_0930_mfcc_{fold+1}.h5"
        callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath, monitor='val_loss', verbose=0, save_best_only=True, mode='min'), WandbCallback()]
        history = model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_val,y_val), callbacks=callbacks, verbose=0)
        model = load_model(filepath)

        preds_val = model.predict(x_val)
        preds_val_list.append(preds_val)
        preds_val_label = np.argmax(preds_val, axis=1)
        pred_list.append(model.predict(audio_mfcc_array_test))
        print(f'mfcc_model_acc : {accuracy_score(y_val,preds_val_label):.4f}')

        ### ensemble ###
        val_pred_result = preds_val_list[0].copy()
        for i in range(1, len(preds_val_list)):
            val_pred_result += preds_val_list[i]
        val_pred_label = np.argmax(val_pred_result, axis=1)
        en_acc = accuracy_score(y_val,val_pred_label)
        acc_list.append(en_acc)
        print(f'ensemble_model_acc : {en_acc:.4f}')

        print(f'\n\nmean_acc : {np.mean(acc_list):.4f}')

In [None]:
wandb.init(project=wandb_project, entity=wandb_group)
model7 = start(model_n='model7',size=120, pad_size=450, repeat_size=2, sr=16000, epochs=50, batch_size=64)