In [1]:
import math
import h5py
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from keras.models import Sequential
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.layers import GlobalAveragePooling1D, Conv1D, GlobalMaxPooling1D, BatchNormalization
from keras.layers import Dense
from keras.regularizers import L1L2
from tensorflow.keras.models import Model
from tensorflow.keras import layers, losses, regularizers
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import cross_val_score, KFold
from sklearn.pipeline import Pipeline
from sklearn.metrics import roc_curve, confusion_matrix, auc
from sklearn.metrics import roc_auc_score, precision_recall_curve
import pandas as pd
import time
import os

In [2]:
def normalize(data):
  min = np.min(data,axis = 0)
  max = np.max(data,axis = 0)
  #print('min, max',min, max)
  data =  (data-min)/(max-min)
  return data
subtasks = ['ST2','ST3','ST4','ST5','ST6','ST7','ST8','ST9','ST10','ST11','ST12','ST13']
Dayx = 'D1'
Dayy = 'Retention'
seed = 128 # Fix random seed for reproducibility
np.random.seed(seed)
for subtask in subtasks:
  dir1 =  # directory of connectivities data set1
  dir2 =  # directory of connectivities data set2
  SaveDirectory = # directory to save the outputs
  HypothesisName = subtask+Dayx+'vs'+Dayy+'.xlsx'
  # print(HypothesisName)
  filename1 = Dayx+'_LC_ST'+subtask+'.csv'
  filename2 = 'Reten'+subtask+'.csv'
  data1 = pd.read_csv(os.path.join(dir1,filename1)).values
  data1[:,-1] = 0  # convert the class lable
  data2 = pd.read_csv(os.path.join(dir2,filename2)).values
  data2[:,-1] = 1  # convert the class lable
  data = np.concatenate((data1, data2))  # join two datasets
  data = np.delete(data,[0,6,12,18,24],1)  # delete self-causal columns from the datasets.

  m,n = data.shape
  X0 = normalize(data[:,0:n-1])
  # print(X0.shape)
  X = X0.reshape(X0.shape[0],X0.shape[1],1)
  X = normalize(X)
  # print(X.shape)
  y = data[:,n-1]
  # print(y.shape)

  best_model = {'accuracy': 0, 'sensitivity': 0, 'specificity': 0, 'MCC': 0, 'model': None}
  # classifier
  folds = 5
  batch_sz = 16
  dropout = 0.6
  kernel_szL1 = [14,13,12,11,10,9,8,7,6,5,4,3]
  kernel_szL2 = [3,4,5,6,7,8,9,10,11,12,13,14]
  filters = [32,64,128]
  ## Function to create model, required for KerasClassifier
  start_time = time.time()
  accuracy_temp = 0
  for kernel_sz1 in kernel_szL1:
    for kernel_sz2 in kernel_szL2:
      if 21-kernel_sz1 > kernel_sz2:
        for filter_sz in filters:
          #print(f'working on Kernel_sz1_{kernel_sz1}, Kernel_sz2_{kernel_sz2}, filter_size{filter_sz}')
          def create_model():
              model = Sequential()
              model.add(Conv1D(filters=filter_sz, kernel_size=kernel_sz1, strides=1, activation='relu', input_shape=[None,1], name = 'L1'))
              model.add(tf.keras.layers.Dropout(dropout))  # dropout rate of 0.4
              model.add(Conv1D(filters=filter_sz, kernel_size=kernel_sz2, strides=1, activation='relu', name = 'L2'))
              model.add(Conv1D(filters=32, kernel_size=1, strides=1, activation='relu', name = 'L3'))
              #model.add(Conv1D(filters=16, kernel_size=1, strides=1, activation='relu', name = 'L4'))  # New Conv1D layer
              model.add(GlobalAveragePooling1D())
              model.add(tf.keras.layers.Dropout(dropout))  # dropout rate of 0.4 or 0.6
              model.add(Dense(64, activation='relu'))
              model.add(Dense(32, activation='relu'))
              model.add(Dense(1, activation='sigmoid', name='classification'))
              # Compile model
              adam = tf.keras.optimizers.Adam(learning_rate=1.e-04)
              model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
              #model.summary()
              return model
          #classifier = create_model() # KerasClassifier(build_fn=create_model, epochs=2000, batch_size=5, verbose=0)
          #classifier.summary()

          y_true = []
          y_pred = []
          itest = []
          kf = StratifiedKFold(n_splits=folds, shuffle=True)
          #kf = KFold(n_splits=folds, shuffle=True, random_state=seed)
          for train, test in kf.split(X,y):
              x_train, x_test, y_train, y_test = X[train], X[test], y[train], y[test]
              classifier = create_model() #KerasClassifier(build_fn=create_model, epochs=2000, batch_size=5, verbose=0)
              es = EarlyStopping(monitor='val_loss', mode='min', verbose=0, patience=20, restore_best_weights = True)
              # reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2,
              #                          patience=5, min_lr=0.00001)
              classifier.fit(x_train, y_train, validation_data=(x_test, y_test), epochs=2000, batch_size=batch_sz, shuffle=False, verbose=0, callbacks=[es]) #, callbacks=[es, reduce_lr], epochs=5000
              # Predicting the Test set results
              k_pred = classifier.predict(x_test)
              y_true = np.append(y_true, y_test)
              y_pred = np.append(y_pred, k_pred)
              itest = np.append(itest, test)

          # ROC and AUC
          fpr, tpr, thresholds = roc_curve(y_true, y_pred)
          area = auc(fpr, tpr)
          gmeans = np.sqrt(tpr * (1-fpr))
          ix = np.argmax(gmeans)
          # Making the Confusion Matrix [tn, fp, fn, tp]
          tn, fp, fn, tp = confusion_matrix(y_true, (y_pred >= thresholds[ix])).ravel()
          N = tn+fp+fn+tp
          S = (tp+fn)/N
          P = (tp+fp)/N
          MCC = ((tp/N)-S*P)/np.sqrt(P*S*(1.-S)*(1.-P))
          accuracy = (tp+tn)/(tn+fp+fn+tp)
          sensitivity = (tp)/(tp+fn)
          specificity = (tn)/(tn+fp)
          if accuracy > accuracy_temp:
            accuracy_temp = accuracy
            best_model['accuracy'] = accuracy
            best_model['sensitivity'] = sensitivity
            best_model['specificity'] = specificity
            best_model['MCC'] = MCC
            best_model['model'] = classifier
            best_model['y_true'] = y_true
            best_model['y_pred'] = y_pred
            best_model['itest'] = itest
            best_model['fpr'] = fpr
            best_model['tpr'] = tpr
            best_model['thresholds'] = thresholds
            best_model['ix'] = ix
            best_model['area'] = area
            best_model['tn'] = tn
            best_model['fp'] = fp
            best_model['fn'] = fn
            best_model['tp'] = tp
  end_time = time.time()

  layers_config = best_model['model'].get_config()['layers']
  layers_df = pd.DataFrame([layer['config'] for layer in layers_config])

  file_path = os.path.join(SaveDirectory, HypothesisName)
  # Create DataFrames for additional data
  metrics_df = pd.DataFrame({
      'Metric': ['Accuracy', 'Sensitivity', 'Specificity', 'MCC', 'AUC', 'TP', 'FP', 'TN', 'FN'],
      'Value': [best_model['accuracy'], best_model['sensitivity'], best_model['specificity'], best_model['MCC'], best_model['area'], best_model['tp'], best_model['fp'], best_model['tn'], best_model['fn']]
  })

  roc_df = pd.DataFrame({
      'FPR': best_model['fpr'],
      'TPR': best_model['tpr'],
      'Thresholds': best_model['thresholds']
  })

  predictions_df = pd.DataFrame({
      'y_true': best_model['y_true'],
      'y_pred': best_model['y_pred'],
      'itest': best_model['itest']
  })

  with pd.ExcelWriter(file_path, engine='openpyxl') as writer:
      layers_df.to_excel(writer, sheet_name='Model Configuration', index=False)
      metrics_df.to_excel(writer, sheet_name='Metrics', index=False)
      roc_df.to_excel(writer, sheet_name='ROC Curve', index=False)
      predictions_df.to_excel(writer, sheet_name='Predictions', index=False)

  # ROC and AUC
  y_true = best_model['y_true']
  y_pred = best_model['y_pred']

  fpr = best_model['fpr']
  tpr = best_model['tpr']
  thresholds = best_model['thresholds']

  # calculate roc curves
  area = auc(fpr, tpr)
  gmeans = np.sqrt(tpr * (1-fpr))
  # locate the index of the largest g-mean
  ix = np.argmax(gmeans)
  #print('Best Threshold=%f, G-Mean=%.3f' % (thresholds[ix], gmeans[ix]))

  plt.figure()
  plt.plot([0, 1], [0, 1], 'k--')
  plt.plot(fpr, tpr, label='AUC = {:.3f}'.format(area))
  plt.xlabel('False positive rate')
  plt.ylabel('True positive rate')
  plt.title('ROC curve')
  plt.legend(loc='best')
  plt.savefig(os.path.join(SaveDirectory,subtask+Dayx+'vs'+Dayy+'_ROC.png'))
  #plt.show()
  plt.close()

  precision, recall, thresholds = precision_recall_curve(y_true, y_pred)
  area = auc(recall, precision)
  fscore = (2 * precision * recall) / (precision + recall)
  ix = np.argmax(fscore)

  # plot the roc curve for the model
  plt.figure()
  no_skill = len(y_true[y_true==1]) / len(y_true)
  plt.plot([0,1], [no_skill,no_skill], linestyle='--')
  plt.plot(recall, precision, marker='.', label='AUC = {:.3f}'.format(area))
  plt.xlabel('Recall')
  plt.ylabel('Precision')
  plt.legend()
  plt.savefig(os.path.join(SaveDirectory,subtask+Dayx+'vs'+Dayy+'_PR.png'))
  #plt.show()
  plt.close()

  # Making the Confusion Matrix [tn, fp, fn, tp]
  tn = best_model['tn']
  fp = best_model['fp']
  fn = best_model['fn']
  tp = best_model['tp']

  N = tn+fp+fn+tp
  S = (tp+fn)/N
  P = (tp+fp)/N
  MCC = best_model['MCC']
  accuracy = best_model['accuracy']
  sensitivity = best_model['sensitivity']
  specificity = best_model['specificity']

  # print("CM: [%d %d %d %d]" %(tn, fp, fn, tp))
  # print("Accuracy: %0.3f" %(accuracy))
  # print("Sensitivity: %0.3f" %(sensitivity))
  # print("Specificity: %0.3f" %(specificity))
  # print("MCC: %0.3f" %(MCC))
  
  from matplotlib import rcParams
  from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
  rcParams['font.size'] = 14
  conf_matrix = np.array([[tn, fp],
                [fn, tp]])
  labels = [Dayx, Dayy]
  disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix,display_labels=labels)
  disp.plot()
  plt.savefig(os.path.join(SaveDirectory,subtask+Dayx+'vs'+Dayy+'_CM.png'))
  # plt.show()
  plt.close()

