In [1]:
import numpy as np
import tensorflow.keras.backend as K
import numpy as np
import tensorflow as tf
from tensorflow import keras

In [2]:
dataset = np.load("../../KEMDy20_v1_1/Extract/Dataset_MFCC.npz", allow_pickle=True)

In [3]:
train_x = dataset["train_x"]
train_y = dataset["train_y"]
test_x = dataset["test_x"]
test_y = dataset["test_y"]
train_aug_x = dataset["train_aug_x"]
train_aug_y = dataset["train_aug_y"]

In [4]:
def Macro_f1(y_true, y_pred):
    y_true = K.round(y_true)
    y_pred = K.round(y_pred)
    
    tp = K.sum(y_true * y_pred, axis=0)
    fp = K.sum((1-y_true) * y_pred, axis=0)
    fn = K.sum(y_true * (1-y_pred), axis=0)
    precision = tp / (tp + fp + K.epsilon())
    recall = tp / (tp + fn + K.epsilon())
    f1 = 2 * precision * recall / (precision + recall + K.epsilon())
    
    # Calculate macro F1 score
    macro_f1 = K.mean(f1)
    
    return macro_f1

In [5]:
from sklearn.preprocessing import LabelEncoder
from keras.utils import to_categorical

# 레이블 인코딩
le = LabelEncoder()
le.fit(train_y)
encoded_train_y = le.transform(train_y)

# 원핫 인코딩
onehot_train_y = to_categorical(encoded_train_y)

# train_y에서 사용된 LabelEncoder 객체를 그대로 사용하여 test_y 인코딩
encoded_test_y = le.transform(test_y)

# test_y를 원핫 인코딩
onehot_test_y = to_categorical(encoded_test_y)

# Split the train and test data into four parts
train_data_parts = np.split(train_x, 4, axis=1)
test_data_parts = np.split(test_x, 4, axis=1)

In [6]:
from keras.models import Model
from keras.layers import Input, Conv1D, MaxPooling1D, BatchNormalization, Flatten, Dense, Average, Dropout

# Define submodel architecture
def create_submodel(input_shape):
    submodel_input = Input(shape=input_shape)
    x = Conv1D(filters=16, kernel_size=3, activation='relu')(submodel_input)
    x = MaxPooling1D(pool_size=2)(x)
    x = BatchNormalization()(x)
    x = Conv1D(filters=32, kernel_size=3, activation='relu')(x)
    x = MaxPooling1D(pool_size=2)(x)
    x = BatchNormalization()(x)
    x = Conv1D(filters=64, kernel_size=3, activation='relu')(x)
    x = MaxPooling1D(pool_size=2)(x)
    x = BatchNormalization()(x)
    x = Flatten()(x)
    x = Dense(units=256, activation='relu')(x)
    x = Dropout(rate=0.5)(x)
    x = Dense(units=7, activation='softmax')(x)
    return submodel_input, x

# Define the list to store submodels and input shapes
submodels = []
input_shapes = []

# Create submodels
for i in range(4):
    input_shape = train_data_parts[i].shape[1:]
    submodel_input, submodel_output = create_submodel(input_shape)
    submodels.append(Model(inputs=submodel_input, outputs=submodel_output))
    input_shapes.append(input_shape)

# Define final model architecture
inputs = [Input(shape=input_shape) for input_shape in input_shapes]
outputs = [[submodel(inputs[i])] for i, submodel in enumerate(submodels)]
merged = Average()([output[0] for output in outputs])
#merged = Dense(units=7, activation='softmax')(Flatten()(merged))
final_model = Model(inputs=inputs, outputs=merged)

# Compile the final model
final_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=[Macro_f1])

In [8]:
from keras.utils import plot_model

# Visualize final model architecture
plot_model(final_model, to_file='../final_model.png', show_shapes=True)

You must install pydot (`pip install pydot`) and install graphviz (see instructions at https://graphviz.gitlab.io/download/) for plot_model to work.


In [None]:
from tensorflow.keras import callbacks

# Prepare the data for training
final_train_data = [train_data_parts[i].reshape(train_x.shape[0], -1, train_aug_x.shape[2]) for i in range(4)]
final_test_data = [test_data_parts[i].reshape(test_x.shape[0], -1, test_x.shape[2]) for i in range(4)]

weight_path="../Model/Model_weights_best.hdf5"
checkpoint = callbacks.ModelCheckpoint(
    weight_path,
    monitor='val_Macro_f1',
    verbose=1,
    save_weights_only=True,
    save_best_only=True,
    mode='max'
)

# Train the final model
final_model.fit(
    final_train_data,
    onehot_train_y,
    epochs=100,
    batch_size=32,
    validation_data=(final_test_data, onehot_test_y)
)

# Evaluate the final model on the test data
test_loss, test_macro_f1 = final_model.evaluate(final_test_data, onehot_test_y)
print(f'Test loss: {test_loss}, test macro_f1: {test_macro_f1}')

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100




In [None]:
model = load_weights(weight_path)
best_eva_list = model.evaluate(final_test_data, onehot_test_y)
loss = best_eva_list[0]
macro_f1 = best_eva_list[1]
print('Model evaluation: ', best_eva_list)
y_pred_best = self.model.predict(tx)
self.matrix.append(confusion_matrix(np.argmax(ty,axis=1),np.argmax(y_pred_best,axis=1)))
em = classification_report(np.argmax(ty,axis=1), np.argmax(y_pred_best,axis=1), target_names=self.class_label, output_dict=True)
self.eva_matrix.append(em)
print(classification_report(np.argmax(ty,axis=1), np.argmax(y_pred_best,axis=1), target_names=self.class_label))

In [7]:
#from sklearn.manifold import TSNE
#import matplotlib.pyplot as plt

#tsne = TSNE(n_components=2, random_state=0)
#digits_tsne = tsne.fit_transform(train_aug_x)

In [8]:
#plt.figure(figsize=(10, 8))
#plt.scatter(digits_tsne[:, 0], digits_tsne[:, 1])
#plt.colorbar()
#plt.show()