In [1]:
from tensorflow.keras.layers import Layer, Lambda, Conv2D, Dropout,Dense,Activation,Input,GlobalAveragePooling1D, Concatenate, GlobalAveragePooling2D
from tensorflow.keras.layers import Reshape,Flatten,BatchNormalization,MaxPooling1D,AveragePooling2D,Reshape,Attention, ReLU
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras import regularizers
from sklearn.model_selection import KFold
import keras.backend as K
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tqdm.auto import tqdm
import os
from Config import Config
import numpy as np
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.metrics import Mean, SparseCategoricalAccuracy
from sklearn.metrics import classification_report, confusion_matrix
from datetime import datetime
import warnings
warnings.filterwarnings('always')

In [2]:
DATA_PATH = 'EMODB'
CLASS_LABELS = Config.EMODB_LABELS
k = Config.EMODB_K

model_name = 'LIGHT_SERNET'
feature_name = 'mfcc'

EPOCHS = 300
BATCH = 32

In [3]:
class SERNET(tf.keras.Model):

  def __init__(self, num_classes, L2=1e-6, DROPOUT=0.3):
    super().__init__()

    self.path1 = Sequential([
        Conv2D(32, (11, 1), padding="same", strides=(1, 1)),
        BatchNormalization(),
        ReLU(),
        AveragePooling2D(pool_size=2, padding='same')
    ])
    self.path2 = Sequential([
        Conv2D(32, (1, 9), padding="same", strides=(1, 1)),
        BatchNormalization(),
        ReLU(),
        AveragePooling2D(pool_size=2, padding='same')
    ])
    self.path3 = Sequential([
        Conv2D(32, (3, 3), padding="same", strides=(1, 1)),
        BatchNormalization(),
        ReLU(),
        AveragePooling2D(pool_size=2, padding='same')
    ])

    self.LFLBs = Sequential([
        Conv2D(64, (3, 3), strides=1, padding='same', use_bias=False,
               kernel_regularizer=regularizers.l2(L2)),
        BatchNormalization(),
        ReLU(),
        AveragePooling2D(pool_size=(2, 2), padding='same'),

        Conv2D(96, (3, 3), strides=1, padding='same', use_bias=False,
               kernel_regularizer=regularizers.l2(L2)),
        BatchNormalization(),
        ReLU(),
        AveragePooling2D(pool_size=(2, 2), padding='same'),

        Conv2D(128, (3, 3), strides=1, padding='same', use_bias=False,
               kernel_regularizer=regularizers.l2(L2)),
        BatchNormalization(),
        ReLU(),
        AveragePooling2D(pool_size=(2, 1), padding='same'),

        Conv2D(160, (3, 3), strides=1, padding='same', use_bias=False,
               kernel_regularizer=regularizers.l2(L2)),
        BatchNormalization(),
        ReLU(),
        AveragePooling2D(pool_size=(2, 1), padding='same'),

        Conv2D(320, (1, 1), strides=1, padding='same', use_bias=False,
               kernel_regularizer=regularizers.l2(L2)),
        BatchNormalization(),
        ReLU(),
        GlobalAveragePooling2D(),
    ])

    self.drop = Dropout(DROPOUT)
    self.classifier = Dense(num_classes, activation="softmax")


  def call(self, inputs):
    x = inputs

    path1 = self.path1(x)
    path2 = self.path2(x)
    path3 = self.path3(x)

    x = Concatenate(axis=1)([path1, path2, path3])

    x = self.LFLBs(x)

    x = self.drop(x)
    output = self.classifier(x)

    return output

In [4]:
# history = model.fit(x_train, y_train, epochs=10, validation_split=0.25)

def train_step(model, loss_fn, optimizer, mean_train_loss, train_accuracy, x, labels):
    with tf.GradientTape() as tape:
    # 미분 계산
        predictions = model(x)
        loss = loss_fn(labels, predictions)

    grad = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grad, model.trainable_variables))     # 신경망 파라미터 업데이트
    mean_train_loss(loss)
    train_accuracy(labels, predictions)

def test_step(model, loss_fn, mean_test_loss, test_accuracy, x, labels):
    predictions = model(x)
    loss_t = loss_fn(labels, predictions)   
    
    mean_test_loss(loss_t)
    test_accuracy(labels, predictions)

In [5]:
# Read data
with open(f'dataset/{DATA_PATH}.npy', 'rb') as f:
    x = np.load(f)
    y = np.load(f)
    
# y = to_categorical(y, num_classes=len(CLASS_LABELS))

In [6]:
model = SERNET(len(CLASS_LABELS))

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])


history = model.fit(x, y, epochs=10, validation_split=0.25, batch_size=32)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
# Smooth label operation
def smooth_labels(labels, factor=0.1):
    """
        smooth the labels
        returned the smoothed labels
    """
    labels *= (1 - factor)
    labels += (factor / labels.shape[1])
    return labels

In [None]:
def save_model(model, dataset_name, model_name, feature_name, fold, now_time):
    save_path = os.path.join('Models', dataset_name)
    os.makedirs(save_path, exist_ok=True)
    
    naming = f'{model_name}_{feature_name}_{fold}-fold_{now_time}'
    
    h5_path = f'{naming}.h5'
    model.save_weights(os.path.join(save_path, h5_path))
    
    # json_path = f'{naming}.json'
    # with open(os.path.join(save_path, json_path), "w") as json_file:
    #     json_file.write(model.to_json())

In [None]:
LEARNING_RATE_DECAY_PARAMETERS = -0.15
LEARNING_RATE_DECAY_STRATPOINT = 50
LEARNING_RATE_DECAY_STEP = 20


def scheduler(epoch, lr):
    if epoch < LEARNING_RATE_DECAY_STRATPOINT:
        return lr
    else:
        if epoch % LEARNING_RATE_DECAY_STEP == 0:
            lr = lr * tf.math.exp(LEARNING_RATE_DECAY_PARAMETERS)
    return lr

In [None]:
conf_matrix_list = []
eva_matrix = []

emotions_groundtruth_list = np.array([])
predicted_emotions_list = np.array([])

avg_acc = 0.0

In [None]:
kfold = KFold(n_splits=k, shuffle=False, random_state=None)
for i, (train, test) in tqdm(enumerate(kfold.split(x, y)), desc='Training {k}-Fold.....'):
    now_time = datetime.now().strftime("%m-%d-%H%M%S")
    
    x_train, y_train = x[train], y[train]
    # y[train] = smooth_labels(y[train], 0.1)
    
    x_test, y_test = x[test], y[test]
    
    shape = x_train.shape[1:]
    
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy()
    optimizer = Adam(learning_rate=1e-3)
    
    model = SERNET(len(CLASS_LABELS))
    # model.build(input_shape=(None, shape[0], shape[1], shape[2]))
    # print(model.summary())
    
    # metrics
    mean_train_loss = Mean(name='train_loss')
    train_accuracy = SparseCategoricalAccuracy(name='train_accuracy')
    mean_test_loss = Mean(name='test_loss')
    test_accuracy = SparseCategoricalAccuracy(name='test_accuracy')
    
    best_test_loss = 0x3f3f3f
    best_test_acc = -1
    
    batch_train = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(2022).batch(BATCH)
    batch_test = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(BATCH)
    for epoch in tqdm(range(EPOCHS), desc=f'Fold-{i+1}'):
        for features, labels in batch_train:
            train_step(model, loss_fn, optimizer, mean_train_loss, train_accuracy, features, labels)
        for features, labels in batch_test:
            test_step(model, loss_fn, mean_test_loss, test_accuracy, features, labels)
        
        train_loss = mean_train_loss.result()
        train_acc = train_accuracy.result()
        test_loss = mean_test_loss.result()
        test_acc = test_accuracy.result()
        
        cur_lr = K.eval(optimizer.lr)
        print(f'{epoch+1}/{EPOCHS} lr={cur_lr:.5f} - loss:{train_loss:.3f}, acc:{train_acc:.3f}, test_loss:{test_loss:.3f}, test_acc:{test_acc:.3f}')
        
        set_lr = scheduler(epoch, K.eval(optimizer.lr))
        K.set_value(optimizer.learning_rate, set_lr)
        
        if best_test_loss > test_loss:
            best_test_loss = test_loss
            best_test_acc = test_acc
            y_pred_best = model.predict(x[test])
            
            # model save
            save_model(model, DATA_PATH, model_name, feature_name, i, now_time)
            
    print(f'[*] Done - loss:{best_test_loss:.3f}, acc:{best_test_acc:.3f}')
    
    avg_acc += best_test_acc
    
    conf_matrix = confusion_matrix(np.argmax(y[test],axis=1),np.argmax(y_pred_best,axis=1))
    conf_matrix_list.append(conf_matrix)
    
    em = classification_report(np.argmax(y[test],axis=1),np.argmax(y_pred_best,axis=1), target_names=CLASS_LABELS,output_dict=True)
    eva_matrix.append(em)
    
    emotions_groundtruth_list = np.append(emotions_groundtruth_list, np.argmax(y[test],axis=1))
    predicted_emotions_list = np.append(predicted_emotions_list, np.argmax(y_pred_best,axis=1))

In [None]:
Report = classification_report(emotions_groundtruth_list, predicted_emotions_list)

os.makedirs(f'Results/{DATA_PATH}', exist_ok=True)
report_path = f'Results/{DATA_PATH}/{model_name}_{feature_name}_{k}-fold_nomalize.txt'

with open(report_path, "w") as f:
    f.write(Report)

In [None]:
import pandas as pd

naming = f'Results/{DATA_PATH}/{model_name}_{feature_name}_{k}-fold'
naming = f'{naming}_{avg_acc/5:.3f}.xlsx'

writer = pd.ExcelWriter(naming)
for i,item in enumerate(conf_matrix_list):
    temp = {}
    temp[" "] = CLASS_LABELS
    j = 0
    for j,l in enumerate(item):
        temp[CLASS_LABELS[j]]=item[j]
    data1 = pd.DataFrame(temp)
    data1.to_excel(writer,sheet_name=str(i), encoding='utf8')
    df = pd.DataFrame(eva_matrix[i]).transpose()
    df.to_excel(writer,sheet_name=str(i)+"_evaluate", encoding='utf8')

writer.save()