# 1. 필요한 패키지

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf

from tensorflow import keras
from keras.applications import ResNet50, VGG16, EfficientNetB0
from keras.layers import LSTM, RepeatVector, TimeDistributed, Dense, Input
from keras.models import Model
from keras.callbacks import ModelCheckpoint, EarlyStopping
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score
from datetime import datetime

# 2. Image

- 소리데이터를 이미지데이터를 변환한 한 것으로 이미지데이터 기반 데이터 이상탐지 테스트 단계

In [None]:
def evaluate_data(Y_test, Y_pred, hyperparameter_file_name):

    TP = 0
    FP = 0
    FN = 0
    TN = 0

    for i in range(len(Y_pred)):
        if Y_test[i] == 0 and Y_pred[i] == 0:
            TN = TN + 1
        elif Y_test[i] == 0 and Y_pred[i] == 1:
            FP = FP + 1
        elif Y_test[i] == 1 and Y_pred[i] == 0:
            FN = FN + 1
        elif Y_test[i] == 1 and Y_pred[i] == 1:
            TP = TP + 1

    row_data = {"Accuracy" : [accuracy_score(Y_test, Y_pred)],
                "F1_Score" : [f1_score(Y_test, Y_pred)],
                "Recall" : [recall_score(Y_test, Y_pred)],
                "Precision" : [precision_score(Y_test, Y_pred)],
                "TN" : [TN],
                "FP" : [FP],
                "FN" : [FN],
                "TP" : [TP]}
    df = pd.DataFrame(row_data)
    df.to_csv("/BTS/result/Test_result/" + hyperparameter_file_name + ".csv")

In [None]:
def define_image_model(hyperparameter):
    if hyperparameter["color"] == "rgb":
        input_tensor = Input(shape = (hyperparameter["size_width"], hyperparameter["size_height"], 3))
    else:
        input_tensor = Input(shape = (hyperparameter["size_width"], hyperparameter["size_height"], 1))

    if hyperparameter["model"] == "CNN" and hyperparameter["color"] == "rgb":
        input_CNN_tensor = (hyperparameter["size_width"], hyperparameter["size_height"], 3)
    else :
        input_CNN_tensor = (hyperparameter["size_width"], hyperparameter["size_height"], 1)

    if hyperparameter["model"] == "CNN":
        training_model = tf.keras.models.Sequential([
            tf.keras.layers.Conv2D(16, (3, 3), activation='relu', input_shape=input_CNN_tensor),
            tf.keras.layers.MaxPool2D(2, 2),
            tf.keras.layers.Conv2D(32, (3, 3), activation='relu'),
            tf.keras.layers.MaxPool2D(2, 2),
            tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
            tf.keras.layers.MaxPool2D(2, 2),
            tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
            tf.keras.layers.MaxPool2D(2, 2),
            tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
            tf.keras.layers.MaxPool2D(2, 2),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(512, activation='relu'),
            tf.keras.layers.Dense(1, activation='sigmoid')])

    elif hyperparameter["model"] == "VGG16":
        training_model = VGG16(weights = None, include_top = False, input_tensor = input_tensor)

        layer_dict = dict([(layer.name, layer) for layer in training_model.layers])

        x = layer_dict['block5_pool'].output
        x = tf.keras.layers.Flatten()(x)
        x = tf.keras.layers.Dense(4096, activation = 'relu')(x)
        x = tf.keras.layers.Dense(4096, activation = 'relu')(x)
        x = tf.keras.layers.Dense(1024, activation = 'relu')(x)
        x = tf.keras.layers.Dense(1, activation = 'sigmoid')(x)

        training_model = Model(inputs = training_model.input, outputs = x)
    
    elif hyperparameter["model"] == "ResNet50":
        training_model = ResNet50(weights = None, include_top = False, input_tensor = input_tensor)

        layer_dict = dict([(layer.name, layer) for layer in training_model.layers])
        x = layer_dict['conv5_block3_out'].output
        x = tf.keras.layers.GlobalAveragePooling2D()(x)
        x = tf.keras.layers.Flatten()(x)
        x = tf.keras.layers.Dense(1024, activation = 'relu')(x)
        x = tf.keras.layers.Dense(512, activation = 'relu')(x)
        x = tf.keras.layers.Dense(1, activation = 'sigmoid')(x)

        training_model = Model(inputs = training_model.input, outputs = x)
    
    elif hyperparameter["model"] == "EfficientNetB0":
        training_model = EfficientNetB0(weights = None, include_top = False, input_tensor = input_tensor)
        
        layer_dict = dict([(layer.name, layer) for layer in training_model.layers])
        
        x = layer_dict['top_activation'].output
        x = tf.keras.layers.GlobalAveragePooling2D()(x)
        x = tf.keras.layers.Flatten()(x)
        x = tf.keras.layers.Dense(1280, activation = 'relu')(x)
        x = tf.keras.layers.Dense(1, activation = 'sigmoid')(x)

        training_model = Model(inputs = training_model.input, outputs = x)

    return training_model

def image_training(training_model, hyperparameter, hyperparameter_file_name, X_train, X_val, y_train, y_val):

    class CustomCallback(keras.callbacks.Callback):
            def on_train_begin(self, logs = None):
                raw_data = {'epoch' : [],
                            'train_loss' : [],
                            'train_accuracy' : [],
                            'validation_loss' : [],
                            'validation_accuracy': [],
                            'timestamp' : []}
                df = pd.DataFrame(raw_data)
                df.to_csv("/BTS/process_log/" + hyperparameter_file_name + ".csv", index = False)
            def on_epoch_end(self, epoch, logs=None):
                now = datetime.now()
                df = pd.read_csv("/BTS/process_log/" + hyperparameter_file_name + ".csv")
                df.loc[-1]=[epoch, logs["loss"], logs["binary_accuracy"], logs["val_loss"], logs["val_binary_accuracy"], now.timestamp()]
                df.to_csv("/BTS/process_log/" + hyperparameter_file_name + ".csv", index = False)
            def on_train_end(self, epoch, logs=None):
                df = pd.read_csv("/BTS/process_log/" + hyperparameter_file_name + ".csv")
                df.loc[-1]=[hyperparameter["epochs"], 0, 0, 0, 0, 0]
                df.to_csv("/BTS/process_log/" + hyperparameter_file_name + ".csv", index = False)
    
    filename = ('/BTS/training_model/' + "training_model(" + hyperparameter_file_name + ").h5")
    checkpoint = ModelCheckpoint(filename, monitor = 'val_loss', verbose = 1, save_best_only = True, mode = 'auto')
    earlystopping = EarlyStopping(monitor = 'val_loss', patience = 30)

    training_model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = hyperparameter["learning_rate"]),
    loss = tf.keras.losses.BinaryCrossentropy(),
    metrics = tf.keras.metrics.BinaryAccuracy())

    training_model.fit(X_train, y_train, validation_data = (X_val, y_val), batch_size = hyperparameter["batch_size"], epochs = hyperparameter["epochs"], callbacks = [checkpoint, earlystopping, CustomCallback()])

def image_test(hyperparameter_file_name, X_test, y_test):
    model_path = "/BTS/training_model/training_model(" + hyperparameter_file_name + ").h5"
    model = keras.models.load_model(model_path)
    y_pred = model.predict(X_test)
    temp = []
    for i in range(len(y_pred)):
        if y_pred[i] > 0.5:
            temp.append(1)
        else:
            temp.append(0)
    y_pred = temp
    row_data = {"y_test" : y_test, "y_pred" : y_pred}
    score = pd.DataFrame(row_data)
    return score