In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import cv2
import time
import sklearn
from tqdm import tqdm
from utils.train_preprocessing import my_processing

from tensorflow.keras.layers import TimeDistributed, LSTM, Dense, Conv2D, MaxPool2D, Flatten, Dropout, Input, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import Model

In [None]:
label_th = 0.1
seg_time = 30
data_folder = f'/NAS/Benson/Sleep_Apnea/Sleep_Codes/Code_with_data/CNN_LSTM/Model_Training/Data'
info_csv = pd.read_csv(f'{data_folder}/label_{seg_time}s_{int(label_th*100)}.csv')
train_data = []
train_label = []
ids = ["00000711-100839", "00000781-100816", "00001096-100779", "00000782-100816"]

for x in range(len(info_csv["filename"])):
    image = cv2.imread(f'{data_folder}/Segments_{seg_time}s/{info_csv["filename"][x]}.png')
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    train_data.append(image)
    train_label.append(info_csv["label"][x])
        
print(len(train_data))
print(len(train_label))
del info_csv

In [None]:
train_data = my_processing(dataset = train_data, outsize=448, pre_type="his")
#train_data, train_label = sklearn.utils.shuffle(train_data, train_label)

In [None]:
plt.figure(figsize=(20,10))
for i in range(10):
    ax = plt.subplot(2, 5, i+1)                                    
    plt.title("class : " + str(train_label[i]), 
               fontsize=18)                                        
    plt.xticks([])                                                
    ax.imshow(train_data[i])    

In [None]:
def my_model(timesteps, img_height, img_width, img_color_channels, num_classes):
    
    input_batch = Input(shape = (timesteps, img_height, img_width, img_color_channels))
    
    
    cnn_network = TimeDistributed(Conv2D(4, (3,3), padding='same', activation='relu'))(input_batch)
    cnn_network = TimeDistributed(Conv2D(4, (3,3), padding='same', activation='relu'))(cnn_network)
    cnn_network = TimeDistributed(BatchNormalization())(cnn_network)
    cnn_network = TimeDistributed(MaxPool2D(pool_size=(2,2), padding='same', strides=(4,4)))(cnn_network)
    
    cnn_network = TimeDistributed(Conv2D(8, (3,3), padding='same', activation='relu'))(cnn_network)
    cnn_network = TimeDistributed(Conv2D(8, (3,3), padding='same', activation='relu'))(cnn_network)
    cnn_network = TimeDistributed(BatchNormalization())(cnn_network)
    cnn_network = TimeDistributed(MaxPool2D(pool_size=(2,2), padding='same', strides=(4,4)))(cnn_network)
    
    cnn_network = TimeDistributed(Conv2D(8, (3,3), padding='same', activation='relu'))(cnn_network)
    cnn_network = TimeDistributed(Conv2D(8, (3,3), padding='same', activation='relu'))(cnn_network)
    cnn_network = TimeDistributed(BatchNormalization())(cnn_network)
    cnn_network = TimeDistributed(MaxPool2D(pool_size=(2,2), padding='same', strides=(4,4)))(cnn_network)
    
    cnn_network = TimeDistributed(Conv2D(16, (3,3), padding='same', activation='relu'))(cnn_network)
    cnn_network = TimeDistributed(Conv2D(16, (3,3), padding='same', activation='relu'))(cnn_network)
    cnn_network = TimeDistributed(BatchNormalization())(cnn_network)
    cnn_network = TimeDistributed(MaxPool2D(pool_size=(2,2), padding='same', strides=(4,4)))(cnn_network)
    
    cnn_network = TimeDistributed(Flatten())(cnn_network)
   
    
    lstm_network = LSTM(12, return_sequences=False, dropout=0.05, recurrent_dropout=0.05)(cnn_network)
    lstm_network = Dense(4,activation='relu')(lstm_network)
    lstm_network = Dropout(0.1)(lstm_network)
    lstm_network = Dense(2,activation='relu')(lstm_network)
    lstm_network = Dropout(0.1)(lstm_network)
    lstm_network = Dense(num_classes, activation='softmax')(lstm_network)
    
    
    full_network = Model(input_batch, lstm_network)
    return full_network

In [None]:
model = my_model(timesteps = 10,
                 img_height = 448,
                 img_width = 448,
                 img_color_channels = 3,
                 num_classes = 2)
#model.summary()

In [None]:
##training v1
# from tensorflow.keras.callbacks import EarlyStopping

# model.compile(optimizer=Adam(learning_rate=1e-5),
#               loss='categorical_crossentropy', 
#               metrics=['accuracy'])


# batch_sizes = 10
# num_epoch = 1200
# start_time = time.perf_counter()
# model_history = model.fit(x = X_train,
#                     y = Y_train, 
#                     batch_size = batch_sizes,
#                     epochs = num_epoch,
#                     validation_data = (X_valid, Y_valid),
#                     validation_batch_size = batch_sizes,
#                     callbacks = [EarlyStopping(monitor = 'val_loss',
#                                                patience = 60,
#                                                verbose = 1)]
#                    )
# print(f'Total time: {time.perf_counter() - start_time} sec')

In [None]:
##training v2
def timestep_id_generator():
    pass

def train():
    pass

In [None]:
def num():
    for i in range(10):
        yield i
test = num()
num_list = []
for test_num in num():
    num_list.append(test_num)
print(num_list[0:6])
new_list = [num_list[(n := next(test)):n+2] for _ in range(4)]
print(new_list)

In [None]:
training_loss = model_history.history['loss']
val_loss = model_history.history['val_loss']

plt.plot(training_loss, label="training_loss")
plt.plot(val_loss, label="validation_loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Learning Curve")
plt.legend(loc='best')
plt.show()

In [None]:
plt.plot(model_history.history['accuracy'])
plt.plot(model_history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

In [None]:
test_pred = model.evaluate(X_test, Y_test, batch_size=10)
print("test loss, test acc:", test_pred)

In [None]:
pred = model.predict(X_test, batch_size=10)
print("predictions: ")
print(pred)
print("answer : ")
print(Y_test)

In [None]:
print(f'ROC_AUC_Score: {sklearn.metrics.roc_auc_score(Y_test[:,1], pred[:,1])}')
print()
fpr, tpr, _ = sklearn.metrics.roc_curve(Y_test[:,1], pred[:,1])
plt.plot(fpr, tpr)
plt.title('ROC_Curve')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

In [None]:
print('Confusion Matrix: ')
print(sklearn.metrics.confusion_matrix(Y_test.argmax(-1), pred.argmax(-1)))
print()
print(f'Cohen_Kappa_Score: {sklearn.metrics.cohen_kappa_score(Y_test.argmax(-1), pred.argmax(-1))}')

In [None]:
# model.save("lstm.h5(1)")