In [None]:
import os
import numpy as np
from itertools import combinations
from sklearn.model_selection import train_test_split

from keras.models import Model
from keras.layers import Input, Flatten, Dense, Dropout, Lambda, LSTM
from keras.optimizers import Adam
import keras.backend as K

import random

In [None]:
def load_data(directory, max_files_per_class=16):

    data_pairs = []
    labels = []
    all_class_vectors = []
    
    cnt_1=0
    cnt_0=0

    for class_folder in os.listdir(directory):
        if class_folder == ".ipynb_checkpoints":
            continue
        class_folder_path = os.path.join(directory, class_folder)

        class_vectors = []
        for filename in sorted(os.listdir(class_folder_path))[:max_files_per_class]:
            if filename == ".ipynb_checkpoints":
                continue
            file_path = os.path.join(class_folder_path, filename)
            mfcc_vector = np.load(file_path)
            class_vectors.append(mfcc_vector)

        if len(class_vectors) > 0:
            all_class_vectors.append(class_vectors)

    for class_vectors in all_class_vectors:
        for pair in combinations(class_vectors, 2):
            data_pairs.append(pair)
            labels.append(1)
            cnt_1 += 1

    for i, class_vectors in enumerate(all_class_vectors):
        for j in range(i + 1, len(all_class_vectors)):
            for pair in combinations(zip(class_vectors, all_class_vectors[j]), 2):
                data_pairs.append((pair[0][0], pair[1][1]))
                labels.append(0)
                cnt_0 += 1
    
    print(cnt_1)
    print(cnt_0)
    
    return data_pairs, labels

In [None]:
mfcc_dataset_dir = "E:\\UGRP\\npy\\mfcc"

data_pairs, labels = load_data(mfcc_dataset_dir)

In [None]:
data_pairs[0][0].shape

In [None]:
def initialize_base_network(input_shape):
    input = Input(shape=input_shape)
    x = LSTM(128, return_sequences=True)(input)
    x = Flatten()(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.1)(x)
    x = Dense(128, activation='relu')(x)
    return Model(input, x)

def euclidean_distance(vects):
    x, y = vects
    sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_square, K.epsilon()))

def eucl_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0], 1)

def contrastive_loss_with_margin(margin):
    def contrastive_loss(y_true, y_pred):
        y_true = K.cast(y_true, 'float32')
        square_pred = K.square(y_pred)
        margin_square = K.square(K.maximum(margin - y_pred, 0))
        return K.mean(y_true * square_pred + (1 - y_true) * margin_square)
    return contrastive_loss


input_shape = (13, 2498)

base_network = initialize_base_network(input_shape)

input_a = Input(shape=input_shape)
input_b = Input(shape=input_shape)


processed_a = base_network(input_a)
processed_b = base_network(input_b)

distance = Lambda(euclidean_distance, output_shape=eucl_dist_output_shape)([processed_a, processed_b])

model = Model([input_a, input_b], distance)

adam = Adam(learning_rate=0.001)
model.compile(loss=contrastive_loss_with_margin(margin=1), optimizer=adam)

model.summary()

In [None]:
len(data_pairs)

In [None]:
new_data_pairs = []
new_labels = []

for i in range(len(data_pairs)):
    if (data_pairs[i][0].shape == (13, 2498) and data_pairs[i][1].shape == (13, 2498)):
        new_data_pairs.append(data_pairs[i])
        new_labels.append(labels[i])

In [None]:
data_pairs = new_data_pairs
labels = new_labels
len(data_pairs)

In [None]:
slice_num = 100000
data_num = 10000  #한번에 학습시킬 데이터 개수

k = len(data_pairs)/slice_num
t = int(k)

k

In [None]:
import random
from tensorflow.python.keras.callbacks import EarlyStopping
random.seed(42)

all_data_pairs = []
all_labels = []
for i in range(t):
    if (i == t-1):
        sample_data_pairs = data_pairs[i*slice_num:]
        sample_labels = labels[i*slice_num:]
        if (len(sample_data_pairs) > data_num):
            sample_data_pairs = random.sample(sample_data_pairs, k=data_num)
            sample_labels = random.sample(sample_labels, k=data_num)   
    else:
        sample_data_pairs = data_pairs[i*slice_num:(i+1)*slice_num]
        sample_labels = labels[i*slice_num:(i+1)*slice_num]
        sample_data_pairs = random.sample(sample_data_pairs, k=data_num)
        sample_labels = random.sample(sample_labels, k=data_num)
    all_data_pairs.append(sample_data_pairs)
    all_labels.append(sample_labels)

In [None]:
train_loss = []
val_loss = []

for i in range(len(all_data_pairs)):
    
    print(i+1, "training...")
    pairs = np.array(all_data_pairs[i])
    labels = np.array(all_labels[i])
    
    pairs_train, pairs_test, labels_train, labels_test = train_test_split(pairs, labels, test_size=0.2, random_state=42)
    pairs_train, pairs_val, labels_train, labels_val = train_test_split(pairs_train, labels_train, test_size=0.2, random_state=42)

    early_stopping = EarlyStopping()

    hist = model.fit([pairs_train[:, 0], pairs_train[:, 1]], labels_train,
          validation_data=([pairs_val[:, 0], pairs_val[:, 1]], labels_val),
          batch_size=128,
          epochs=10,
          callbacks=[early_stopping])
    train_loss.append(hist.history['loss'])
    val_loss.append(hist.history['val_loss'])

In [None]:
train_loss_result = []
for i in range(len(train_loss)):
    for j in range(len(train_loss[i])):
        train_loss_result.append(train_loss[i][j])

val_loss_result = []
for i in range(len(val_loss)):
    for j in range(len(val_loss[i])):
        val_loss_result.append(val_loss[i][j])

In [None]:
import matplotlib.pyplot as plt

plt.plot(train_loss_result, label="Training loss")
plt.plot(val_loss_result, label="Val loss")


plt.legend()

In [None]:
model.evaluate([pairs_test[:, 0], pairs_test[:, 1]], labels_test)

In [None]:
model.save('C:\\Users\\new_folder\\models\\MFCC_model.h5')