# Import Libraries

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings('ignore')
import random
random.seed(530)

from glob import glob
from tqdm.auto import tqdm

from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split

import tensorflow as tf
from keras import optimizers
from keras.models import Sequential
from keras.layers import Dense

# Data Load

In [None]:
data_path = "/content/drive/MyDrive/DKU/Ubiosis/total_data.csv"
data_df = pd.read_csv(data_path)

In [None]:
# 결측치 제거
data_df.dropna(axis=0,inplace=True)
data_df.tail(3)

In [None]:
# 데이터 정보 별 데이터프레임 분할
radius_df = data_df.iloc[:,:1]
cis1_df = data_df.iloc[:,1:6001]
cis2_df = data_df.iloc[:,6001:12001]
shear_df = data_df.iloc[:,12001:]

In [None]:
# one-hot encoding
ohe_cols = []
for i in range(0,19):
    ohe_cols.append(((310+i)/100))

ohe_target = np.array(ohe_cols).reshape(-1,1)
ohe_value = np.array(radius_df["RADIUS"]).reshape(-1,1)

ohe = OneHotEncoder()
ohe.fit(ohe_target)

ohe_labels = ohe.transform(ohe_value)
ohe_targets = ohe_labels.toarray()

ohe_df = pd.DataFrame(columns=ohe_cols,data=ohe_targets)
ohe_df.head(3)

In [None]:
# Shear Rate Scaling
scale_list = [10,10,10,10,10,10,10,15,20]
shear_df = shear_df.div(scale_list, axis=1)
shear_df.head(3)

In [None]:
data = pd.concat([ohe_df,cis1_df, cis2_df,shear_df],axis=1)
data.head(3)

# Dataset Split
- 학습용 및 시험용 데이터셋으로 분할

In [None]:
train, test = train_test_split(data, test_size=0.2, random_state=530)
train, valid = train_test_split(train, test_size=0.2, random_state=530)

X_train = train.iloc[:,:-9].reset_index(drop=True)
y_train = train.iloc[:,-9:].reset_index(drop=True)
X_valid = valid.iloc[:,:-9].reset_index(drop=True)
y_valid = valid.iloc[:,-9:].reset_index(drop=True)
X_test = test.iloc[:,:-9].reset_index(drop=True)
y_test = test.iloc[:,-9:].reset_index(drop=True)

In [None]:
del data, data_df

# Model Define

In [None]:
def get_model(n_inputs, n_outputs):
    model = Sequential()
    model.add(Dense(5096, input_dim=n_inputs, kernel_initializer='he_uniform', activation='relu'))
    model.add(Dense(2048, kernel_initializer='he_uniform', activation='relu'))
    model.add(Dense(1024, input_dim=n_inputs, activation='relu'))
    model.add(Dense(512, kernel_initializer='he_uniform', activation='relu'))
    model.add(Dense(256, input_dim=n_inputs, activation='relu'))
    model.add(Dense(128, kernel_initializer='he_uniform', activation='relu'))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(n_outputs, activation="linear")) # activation="linear"
    opt = optimizers.Adam(lr=0.0001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False)
    model.compile(loss='mse', optimizer=opt,
                  metrics=[tf.keras.metrics.MeanAbsoluteError(),tf.keras.metrics.MeanAbsolutePercentageError(),
                           tf.keras.metrics.MeanSquaredError(),tf.keras.metrics.RootMeanSquaredError()])

    return model

# Model Train

In [None]:
n_inputs, n_outputs = X_train.shape[1], y_train.shape[1]
model = get_model(n_inputs, n_outputs)

In [None]:
# Create the EarlyStopping callback
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20, mode='min')
model.fit(X_train, y_train,
          validation_data=(X_valid, y_valid),
          validation_batch_size=32,
          verbose=1, epochs=100, batch_size=32,callbacks=[early_stopping])

# Model Test

In [None]:
y_pred = model.predict(X_test)

In [None]:
y_pred[:10]

In [None]:
print(f"X Test Length : {len(X_test)}")
print(f"Y Test Length : {len(y_test)}")
print(f"Y Pred Length : {len(y_pred)}")

In [None]:
def unscale_values(scaled_list):
    unscale_list = []
    for i in range(len(scaled_list)):
        row = scaled_list[i]
        for j in range(len(scale_list)):
            row[j] = row[j] * scale_list[j]
        unscale_list.append(row)
    return unscale_list

un_y_pred = unscale_values(y_pred)
un_y_test = unscale_values(y_test.values)

In [None]:
un_y_pred[:3]

In [None]:
un_y_test[:3]

In [None]:
col_list = ["1000","300", "150", "100", "50", "10", "5", "2", "1"]
y_real_df = pd.DataFrame(columns=col_list, data=un_y_test)

In [None]:
y_real_df.head(3)

In [None]:
y_pred_df = pd.DataFrame(columns=col_list, data=un_y_pred)

In [None]:
y_pred_df.head(3)

### Model Evaluation

In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, r2_score

def pearson_correlation_coefficient(X, Y):
    """
    두 변수 X와 Y 간의 피어슨 상관계수를 계산하는 함수

    :param X: 첫 번째 변수의 값들을 담은 1차원 NumPy 배열
    :param Y: 두 번째 변수의 값들을 담은 1차원 NumPy 배열
    :return: 피어슨 상관계수
    """
    # 변수들의 평균 계산
    mean_X = np.mean(X)
    mean_Y = np.mean(Y)

    # 각 변수들의 편차 계산
    deviation_X = X - mean_X
    deviation_Y = Y - mean_Y

    # 피어슨 상관계수의 분자 계산
    numerator = np.sum(deviation_X * deviation_Y)

    # 피어슨 상관계수의 분모 계산
    denominator = np.sqrt(np.sum(deviation_X ** 2) * np.sum(deviation_Y ** 2))

    # 피어슨 상관계수 계산
    pearson_coefficient = numerator / denominator

    return pearson_coefficient

In [None]:
for i in range(len(col_list)):
    print(f"{col_list[i]}")
    real_v = list(y_real_df[col_list[i]].values)
    pred_v = list(y_pred_df[col_list[i]].values)

    mae = mean_absolute_error(real_v, pred_v)
    mse = mean_squared_error(real_v, pred_v)
    rmse = np.sqrt(mse)
    mape = mean_absolute_percentage_error(real_v, pred_v)
    r2_scores = r2_score(real_v, pred_v)
    pearson_scores = pearson_correlation_coefficient(real_v, pred_v)

    print(f"MAE : {mae}")
    print(f"MSE : {mse}")
    print(f"RMSE : {rmse}")
    print(f"MAPE : {mape}")
    print(f"R2 : {r2_scores}")
    print(f"Pearson : {pearson_scores}")
    print()

# Save Model

In [None]:
import tf2onnx

# model.save('DNN_method1.h5') # 모델 저장
# re_model = tf.keras.models.load_model('DNN_method1.h5')

# 변환할 모델을 입력합니다.
input_signature = [
    tf.TensorSpec(shape=(None, 6002, 1), dtype=tf.float32),
    tf.TensorSpec(shape=(None, 6002, 1), dtype=tf.float32)
]
# tf2onnx 변환 함수를 사용하여 모델을 ONNX로 변환합니다.
onnx_model, _ = tf2onnx.convert.from_keras(model, input_signature=input_signature)

# ONNX 모델을 파일로 저장합니다.
with open("DNN_method1.onnx", "wb") as f:
    f.write(onnx_model.SerializeToString())