In [None]:
import keras
from keras.models import Sequential
from keras.layers import Dense
from tensorflow.keras.utils import to_categorical
from keras.layers import Dropout

In [None]:
import os
import pandas as pd

cwd = os.getcwd()
df = pd.read_csv(cwd + "\\..\\data_csv\\preprocessing_data.csv")

In [None]:
df.groupby('emotion').count()

In [None]:
# extract raw_images and labels

import cv2
import numpy as np

count = [0, 0, 0, 0, 0, 0, 0, 0]
raw_images =  []
labels = []
for i, row in df.iterrows():
    if count[row.emotion] > 3000:
        continue

    image_path = row.image
    full_image_path = os.path.dirname(cwd) + "\\cleaned_images\\" + image_path
    image = cv2.imread(full_image_path)
    pixels = image.flatten()
    raw_images.append(pixels)
    
    label = row.emotion
    labels.append(label)
    
    count[row.emotion] += 1
    if i > 0 and i % 1000 == 0: print('[INFO] processed {}/{}'.format(i, len(df)))
    
print(count)

In [None]:
# define hypoparameter

from hyperopt.pyll.base import scope 
from hyperopt import hp
#quniform returns float, some parameters require int; use this to force int
space = {
         'rate'       : hp.uniform('rate', 0.01, 0.5),
         'dropout'    : hp.uniform('dropout', 0.01, 0.5),
         'units1'      : scope.int(hp.quniform('units1', 10, 100, 5)),
         'units2'      : scope.int(hp.quniform('units2', 10, 100, 5)),
         'units3'      : scope.int(hp.quniform('units3', 10, 100, 5)),
         'units4'      : scope.int(hp.quniform('units4', 10, 100, 5)),
         'batch_size' : scope.int(hp.quniform('batch_size', 100, 250, 25)),
         'layers'     : scope.int(hp.quniform('layers', 3, 5, 1)),
         'optimizer'  : hp.choice('optimizer', ['adam', 'adadelta', 'sgd', 'RMSprop']),
         'epochs'     : scope.int(hp.quniform('epochs', 100, 500, 10)),
         'activation' : hp.choice('activation', ['relu', 'sigmoid', 'tanh', 'elu']),
        }

In [None]:
def f_nn(params):
    print("params", params)

    # Keras LSTM model
    model = Sequential()
    
    if params['layers'] == 1:
        model.add(Dense(params['units1'], activation=params['activation'], input_shape=(X_train.shape[1],)))
        #model.add(Dropout(rate=params['rate']))
    else:
        # First layer specifies input_shape and returns sequences
        model.add(Dense(params['units1'], activation=params['activation'], input_shape=(X_train.shape[1],)))
        #model.add(Dropout(rate=params['rate']))
        
        # Middle layers return sequences
        for i in range(params['layers']-2):
            model.add(Dense(params['units' + str(i + 2)], activation=params['activation']))
            #model.add(Dropout(rate=params['rate']))
        
        # Last layer doesn't return anything
        model.add(Dense(8, activation='softmax'))
        model.add(Dropout(rate=params['rate']))

    model.add(Dense(8, activation='softmax'))
    model.compile(optimizer=params['optimizer'], loss='mean_squared_error')
    
    es = EarlyStopping(monitor='val_loss',mode='min', verbose=1,patience=15)
    '''result = model.fit(X_train, y_train, 
                       verbose=0, 
                       validation_split=0.1,
                       batch_size=params['batch_size'],
                       epochs=200)'''
    result =  model.fit(X_train, y_train, validation_data=(X_val, y_val,), batch_size=params['batch_size'], epochs=params["epochs"], verbose=0)
    
    # Get the lowest validation loss of the training epochs
    validation_loss = np.amin(result.history['val_loss']) 
    print('Best validation loss of epoch:', validation_loss)
    
    return {'loss': validation_loss, 
            'status': STATUS_OK, 
            'model': model, 
            'params': params}

In [None]:
# split data for hypoparameter

from sklearn.model_selection import train_test_split
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.metrics import confusion_matrix
import numpy as np

X_train, X_test, y_train, y_test = train_test_split(np.array(raw_images), np.array(labels), test_size=0.1)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1)

In [None]:
from sklearn import preprocessing

le = preprocessing.LabelEncoder()
le.fit(y_train)
y_train = le.transform(y_train)
y_test = le.transform(y_test)
y_val = le.transform(y_val)
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
y_val = to_categorical(y_val)

In [None]:
from hyperopt import hp, fmin, tpe, STATUS_OK, Trials
from keras.callbacks import EarlyStopping
trials = Trials()
best = fmin(f_nn, 
            space, 
            algo=tpe.suggest,
            max_evals=50,
            trials=trials)

In [None]:
print(best)

In [None]:
best_model = trials.results[np.argmin([r['loss'] for r in trials.results])]['model']
best_params = trials.results[np.argmin([r['loss'] for r in trials.results])]['params']
worst_model = trials.results[np.argmax([r['loss'] for r in trials.results])]['model']
worst_params = trials.results[np.argmax([r['loss'] for r in trials.results])]['params']

In [None]:
print(best_model)
print(best_params)

In [None]:
# defind model for plot roc curve and test model

import tensorflow as tf

def classification_model_multi():
    model = Sequential()
    model.add(Dense(512, activation='relu', input_shape=(X_train.shape[1],)))
    model.add(Dropout(0.2))
    model.add(Dense(256, activation='relu'))
    model.add(Dense(128, activation='relu'))
    model.add(Dense(64, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(32, activation='relu'))
    model.add(Dense(16, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(len(lb.classes_), activation='softmax'))
    opt = tf.keras.optimizers.Adam(learning_rate=0.01)
    model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# no need to use
def classification_model_bin():
    model = Sequential()
    model.add(Dense(128, activation='relu', input_shape=(X_train.shape[1],)))
    # model.add(Dropout(0.2))
    model.add(Dense(64, activation='relu'))
    # model.add(Dropout(0.2))
    model.add(Dense(32, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(2, activation='softmax'))
    opt = tf.keras.optimizers.Adam(learning_rate=0.01)
    model.compile(optimizer=opt, loss='binary_crossentropy', metrics=['accuracy'])
    return model

In [None]:
# split data for plot roc curve

from sklearn.model_selection import train_test_split
import numpy as np

X_train, X_test, y_train, y_test = train_test_split(np.array(raw_images), np.array(labels), test_size=0.1)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1)

In [None]:
# no need to use but keep learning to_categorical()

from sklearn import preprocessing

le = preprocessing.LabelEncoder()
le.fit(y_train)
y_train = le.transform(y_train)
y_test = le.transform(y_test)
y_val = le.transform(y_val)
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
y_val = to_categorical(y_val)

In [None]:
# plot roc curve

import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import RocCurveDisplay
from sklearn.metrics import auc

dictionary = ['ANGER', 'CONTEMPT', 'DISGUST', 'FEAR', 'HAPPINESS',  'NEUTRAL', 'SADNESS', 'SURPRISE']

for emo in range(8):
    print(dictionary[emo])

    emo_feature = []
    emo_target = []

    for i in range(len(df['emotion'])):
        if df['emotion'][i] == emo:
            emo_target.append(1)
        else:
            emo_target.append(0)
        
        image_path = row.image
        full_image_path = os.path.dirname(cwd) + "\\cleaned_images\\" + image_path
        image = cv2.imread(full_image_path)
        pixels = image.flatten()
        emo_feature.append(pixels)

    emo_feature = np.array(emo_feature)
    emo_target = np.array(emo_target)

    # import data
    X = emo_feature
    y = emo_target

    # train test split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)

    history = []
    tprs = []
    aucs = []
    mean_fpr = np.linspace(0, 1, 100)
    fig, ax = plt.subplots()

    # Run classifier with cross-validation and plot ROC curves
    cv = StratifiedKFold(n_splits=5, shuffle=True)
    for i, (train, val) in enumerate(cv.split(X_train, y_train)):
        X_train, X_val = X[train], X[val]
        y_train, y_val = y[train], y[val]

        le = preprocessing.LabelEncoder()
        y_train = to_categorical(y_train)
        y_val = to_categorical(y_val)

        # create and fit model
        model = classification_model_multi()
        model.fit(X_train, y_train, validation_data=(X_val, y_val,), epochs=100, verbose=2)

        # predict
        y_pred = model.predict(X_val).ravel()
        y_val = y_val.ravel()
        
        print(f'====================Fold {i}====================', '\n')

        # plot ROC curve
        viz = RocCurveDisplay.from_predictions(y_val, y_pred, ax=ax, name="ROC fold {}".format(i), alpha=0.3, lw=1,)
        interp_tpr = np.interp(mean_fpr, viz.fpr, viz.tpr)
        interp_tpr[0] = 0.0
        tprs.append(interp_tpr)
        aucs.append(viz.roc_auc)
        
    # middle line
    ax.plot([0, 1], [0, 1], 'k--')

    # mean line
    mean_tpr = np.mean(tprs, axis=0)
    mean_tpr[-1] = 1.0
    mean_auc = auc(mean_fpr, mean_tpr)
    std_auc = np.std(aucs)
    ax.plot(
        mean_fpr,
        mean_tpr,
        color="b",
        label=r"Mean ROC (AUC = %0.2f $\pm$ %0.2f)" % (mean_auc, std_auc),
        lw=2,
        alpha=0.8,
    )

    # std
    std_tpr = np.std(tprs, axis=0)
    tprs_upper = np.minimum(mean_tpr + std_tpr, 1)
    tprs_lower = np.maximum(mean_tpr - std_tpr, 0)
    ax.fill_between(
        mean_fpr,
        tprs_lower,
        tprs_upper,
        color="grey",
        alpha=0.2,
        label=r"$\pm$ 1 std. dev.",
    )

    ax.set(xlim=[-0.05, 1.05],
            ylim=[-0.05, 1.05],
            title="Receiver operating characteristic")
    ax.legend(loc="lower right")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.savefig(cwd + '/../graph/' + dictionary[emo] + '/ann.jpg')
    plt.show()

In [None]:
# split data for test 

from sklearn.model_selection import train_test_split
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.metrics import confusion_matrix
import numpy as np

X_train, X_test, y_train, y_test = train_test_split(np.array(raw_images), np.array(labels), test_size=0.1)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1)

In [None]:
from sklearn import preprocessing

le = preprocessing.LabelEncoder()
le.fit(y_train)
y_train = le.transform(y_train)
y_test = le.transform(y_test)
y_val = le.transform(y_val)
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
y_val = to_categorical(y_val)

In [None]:
model = classification_model_multi()
print(model.summary())
model.fit(X_train, y_train, validation_data=(X_val, y_val,), epochs=100, verbose=2)

In [None]:
#PLOT CONFUSION MATRIX
y_test = np.argmax(y_test, axis=1)
y_pred = np.argmax(model.predict(X_test), axis=1)

cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=dictionary)
fig, ax = plt.subplots(figsize=(10,10))
disp.plot(ax=ax)
plt.show()

In [None]:
# save the model to disk

import pickle

filename = 'model.sav'
pickle.dump(model, open(filename, 'wb'))

In [None]:
# test

value = model.predict(X_test)
y_pred = np.argmax(value, axis=1)
y_true = np.argmax(y_test, axis=1)

In [None]:
from sklearn.metrics import classification_report

print(classification_report(y_true,y_pred))

In [None]:
value = model.predict(X_test)
y_pred = np.argmax(value,axis=1)
y_true = np.argmax(y_test,axis=1)

In [None]:
y_pred_arr = list(y_pred)
print("len : ", len(y_pred_arr))
score = 0
for i in range(len(y_pred_arr)):
    if(list(y_pred)[i] == list(y_true)[i]): score += 1
max_score = len(list(y_pred))
print(f"Final Score : {score}/{max_score}")
print("Accuracy : ", 100 * score / max_score)

In [None]:
# Hyperparameter Process #

In [None]:
from scipy.integrate import odeint
# Parameters and time for FOPDT model
ns = 10000
t = np.linspace(0,ns-1,ns)
u = np.zeros(ns)
# Additional FOPDT parameters
yp0 = 0.0
u0 = u[0]
Km = 0.67
taum = 160.0
def fopdt(y,t,um,Km,taum):
    # arguments
    #  y      = output
    #  t      = time
    #  uf     = input linear function (for time shift)
    #  Km     = model gain
    #  taum   = model time constant
    # calculate derivative
    dydt = (-(y-yp0) + Km * (um-u0))/taum
    return dydt

def sim_model(Km,taum):
    # array for model values
    ym = np.zeros(ns)
    # initial condition
    ym[0] = yp0
    # loop through time steps    
    for i in range(0,ns-1):
        ts = [t[i],t[i+1]]
        y1 = odeint(fopdt,ym[i],ts,args=(u[i],Km,taum))
        ym[i+1] = y1[-1]
    return ym

In [None]:
import random
end = 60 # leave 1st minute of u as 0
while end <= ns:
    start = end
    end += random.randint(300,900) # keep new Q1s value for anywhere from 5 to 15 minutes
    u[start:end] = random.randint(0,100)

In [None]:
# Simulate FOPDT model
y = sim_model(Km,taum)

In [None]:
# Add Gaussian noise
noise = np.random.normal(0,0.2,ns)
y += noise

In [None]:
from sklearn.preprocessing import MinMaxScaler
# Scale data
data = np.vstack((u,y)).T
s = MinMaxScaler(feature_range=(0,1))
data_s = s.fit_transform(data)