In [1]:
import numpy as np
import random
import cv2
import os

from tensorflow import keras
from keras.layers import *
from keras.models import *
from keras.optimizers import *

from keras.utils import np_utils
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import confusion_matrix, roc_auc_score
import tensorflow as tf

### 讀取頻譜圖路徑

In [2]:
mfcc_path = "ImageData/frameLen_40ms/mfcc" #original augmentation
spect_path = "ImageData/frameLen_40ms/spectrogram" #original augmentation
model_save_dir = "Model/ms_mfcc_model"
model_name = "MS_and_MFCC_40" #AUG
HIS_SHOW = False #是否show出訓練的history loss
epochs = 20
fold_num = 5
batch_size = 4
mfcc_folder_names = os.listdir(mfcc_path)
spect_folder_names = os.listdir(spect_path)
print(mfcc_folder_names, spect_folder_names)

['Blue', 'Classical', 'Country', 'Disco', 'EDM', 'Hiphop', 'Jazz', 'Metal', 'Pop', 'Reggae'] ['Blue', 'Classical', 'Country', 'Disco', 'EDM', 'Hiphop', 'Jazz', 'Metal', 'Pop', 'Reggae']


In [3]:
folders=[] #儲存各類別資料夾中的檔案名稱
for folder in spect_folder_names:
    file_names=[]
    for file_name in os.listdir(spect_path+"/"+folder):
        file_names.append([spect_path+"/"+folder+"/"+file_name, mfcc_path+"/"+folder+"/"+file_name]) #加入mel 、mfcc路徑
    folders.append(file_names)

class_count_list = [len(files) for files in folders] #紀錄每個類別各有多少檔案(後面會用到)
print("class count: ", len(folders))
print("class file count: ", class_count_list)

class count:  10
class file count:  [20, 20, 20, 20, 20, 20, 20, 20, 20, 20]


In [4]:
Data_names = []
for folder in folders:
    for names in folder:
        Data_names.append(names)
print(len(Data_names))
print(Data_names[0])

200
['ImageData/frameLen_40ms/spectrogram/Blue/Blue000.png', 'ImageData/frameLen_40ms/mfcc/Blue/Blue000.png']


### 製作Label

In [5]:
Label = []
for l, class_count in enumerate(class_count_list):
    for _ in range(class_count):
        Label.append(np.ones(1)*l)
Label = np.array(Label)

### Data Generator定義

In [6]:
class DataGenerator:
    def __init__(self, data, label, batch_size=16):
        self.data = data
        self.label = label
        self.batch_size = batch_size
        self.steps = len(data)//self.batch_size
        if len(self.data) % self.batch_size != 0:
            self.steps += 1
    def __len__(self):
        return self.steps
    def __iter__(self):
        while True:
            #idxs = list(range(len(self.data)))
            #np.random.shuffle(idxs)
            X1, X2, Y = [], [], []
            for i in range(len(self.data)):
                name = self.data[i]
                x1 = cv2.imread(name[0]) #從路徑讀入spectogram和mfcc圖片
                x2 = cv2.imread(name[1])
                x1 = cv2.resize(x1, (224, 224))
                x2 = cv2.resize(x2, (224, 224))
                y = self.label[i]
                X1.append(x1)
                X2.append(x2)
                Y.append(y)
                if len(X1) == self.batch_size or i == len(self.data)-1:
                    X1 = np.array(X1)
                    X2 = np.array(X2)
                    Y = np.array(Y)
                    yield [X1, X2], Y
                    X1, X2, Y = [], [], []

### 模型定義函數

In [7]:
def CONV(inputs):
    x = Conv2D(64, (7,3), activation='relu', padding = 'same')(inputs)
    x = Dropout(0.03)(x)
    x = MaxPooling2D(pool_size=(2, 2), strides=(2, 2))(x)
    x = Conv2D(128, (3,3), activation='relu', padding = 'same')(x)
    x = Dropout(0.03)(x)
    x = MaxPooling2D(pool_size=(2, 2), strides=(2, 2))(x)
    x = Conv2D(256, (3,3), activation='relu', padding = 'same')(x)
    x = Dropout(0.03)(x)
    x = MaxPooling2D(pool_size=(2, 2), strides=(2, 2))(x)
    x = Conv2D(512, (3,3), activation='relu', padding = 'same')(x)
    x = Dropout(0.03)(x)
    x = MaxPooling2D(pool_size=(2, 2), strides=(2, 2))(x)
    x = Flatten()(x)
    return x

In [8]:
def create_new_model():
    input_shape = (224, 224, 3)
    output_dim = len(folders)

    spect_inputs = Input(shape=input_shape)
    mfcc_inputs = Input(shape=input_shape)

    spect_outputs = CONV(spect_inputs)
    mfcc_outputs = CONV(mfcc_inputs)
    x = Concatenate()([spect_outputs, mfcc_outputs])
    
    x = Dense(256, activation='relu')(x)
    x = Dense(128, activation='relu')(x)
    outputs = Dense(output_dim, activation='softmax')(x)

    model=Model([spect_inputs, mfcc_inputs],outputs)
    return model

In [9]:
import matplotlib.pyplot as plt
def show_train_history(train_history, train, validation):
    plt.plot(train_history.history[train])
    plt.plot(train_history.history[validation])
    plt.title('Train History')
    plt.ylabel(train)
    plt.xlabel('Epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.show()

### 資料切割(Train and Test)

In [10]:
seed =  random.randint(0, 1000**2)
X_train_names, X_test_names, y_train, y_test = train_test_split(Data_names, Label, test_size=0.3,
                                                    random_state=seed, stratify = Label)
y_test_onehot = np_utils.to_categorical(y_test)

### Stratified K-fold

In [11]:
VALIDATION_ACCURACY = []
VALIDATION_LOSS = []
TEST_ACCURACY=[]

skf = StratifiedKFold(n_splits=fold_num)
fold_var = 1

for train_index, valid_index in skf.split(X_train_names, y_train):
    
    #準備該次的train data(路徑)
    X_train_name_fold = []
    for idx in train_index:
        X_train_name_fold.append(X_train_names[idx])
    y_train_fold = y_train[train_index]
    y_train_fold_onehot = np_utils.to_categorical(y_train_fold)

    X_val_name_fold = []
    for idx in valid_index:
        X_val_name_fold.append(X_train_names[idx])
    y_val_fold = y_train[valid_index]
    y_val_fold_onehot = np_utils.to_categorical(y_val_fold)
    
    train_D = DataGenerator(X_train_name_fold, y_train_fold_onehot, batch_size)
    valid_D = DataGenerator(X_val_name_fold, y_val_fold_onehot, batch_size)
    
    model = create_new_model()
    opt = Adam(lr=0.0001)
    model.compile(loss="categorical_crossentropy", optimizer=opt, metrics=['accuracy'])
    # CREATE CALLBACKS
    checkpoint = tf.keras.callbacks.ModelCheckpoint(model_save_dir+"/"+model_name+"_fold"+str(fold_var)+".h5",
                                    monitor='val_accuracy', 
                                    save_best_only=True)
    callbacks_list = [checkpoint]
    
    history = model.fit(
                    train_D.__iter__(),
                    steps_per_epoch = train_D.__len__(),
                    validation_data=valid_D.__iter__(),
                    validation_steps=valid_D.__len__(),
                    epochs=epochs,
                    callbacks=callbacks_list,
                    verbose=1) # 訓練紀錄顯示
    if HIS_SHOW:
        show_train_history(history, 'loss', 'val_loss')
    
    model.load_weights(model_save_dir+"/"+model_name+"_fold"+str(fold_var)+".h5")
    results = model.evaluate(valid_D.__iter__(), steps=valid_D.__len__(), verbose=1)
    results = dict(zip(model.metrics_names,results))
    
    VALIDATION_ACCURACY.append(results['accuracy'])
    VALIDATION_LOSS.append(results['loss'])
    
    test_D = DataGenerator(X_test_names, y_test_onehot, batch_size)
    scores = model.evaluate(test_D.__iter__(), steps=test_D.__len__(), verbose=0)
    print(str(fold_var)+" Fold: "+"Accuracy of testing data = {:2.2f}%".format(scores[1]*100.0))
    print()
    TEST_ACCURACY.append(scores[1]*100.0)
    
    tf.keras.backend.clear_session()
    fold_var+=1
    #break

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
1 Fold: Accuracy of testing data = 53.33%

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
2 Fold: Accuracy of testing data = 51.67%

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20


Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
3 Fold: Accuracy of testing data = 41.67%

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
4 Fold: Accuracy of testing data = 50.00%

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
5 Fold: Accuracy of testing data = 61.67%



In [12]:
print("validation accuracy: ",VALIDATION_ACCURACY)
print("validation loss: ", VALIDATION_LOSS)
print("test accuracy: ",TEST_ACCURACY)

validation accuracy:  [0.6785714030265808, 0.7142857313156128, 0.6071428656578064, 0.6071428656578064, 0.6428571343421936]
validation loss:  [1.7378708124160767, 1.0199177265167236, 1.5679441690444946, 2.11220645904541, 1.6025904417037964]
test accuracy:  [53.33333611488342, 51.66666507720947, 41.66666567325592, 50.0, 61.666667461395264]


In [13]:
print("average validation accuracy: ",sum(VALIDATION_ACCURACY)/len(VALIDATION_ACCURACY))
print("average test accuracy: ",sum(TEST_ACCURACY)/len(TEST_ACCURACY))

average validation accuracy:  0.65
average test accuracy:  51.666666865348816
