In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


vgg pretrained - not fine tuned (multi - channel)

In [None]:
import tensorflow as tf
from keras.layers import Conv3D, Dense, MaxPooling3D, Flatten, Dropout, BatchNormalization, LeakyReLU
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, MaxPooling2D, Conv2D
from keras.models import Model
from tensorflow.keras.optimizers import Adam
from keras.losses import binary_crossentropy, categorical_crossentropy
from keras.applications.vgg16 import VGG16

from keras.applications.vgg16 import preprocess_input

import numpy as np

from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, f1_score, recall_score, confusion_matrix
from sklearn.metrics import roc_curve, auc

import os
import pandas as pd
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from matplotlib import pyplot as plt

import csv

In [None]:
# adam optimizer default learning rate = 0.001
lr = 0.0001 #learning rate
ep = 1
batch_size = 1
n_splits = 5

In [None]:
# not performing fine tuning during training
def get_model(input_shape, num_classes):

    #returns compiled model for the smri pipeline
    base_model = VGG16(weights='imagenet', include_top=False, input_shape=input_shape) #(224, 224, 3)

    # Add classification layers
    x = base_model.output
    x = Flatten()(x)
    x = Dense(512, activation='relu')(x)
    x = Dropout(0.5)(x)
    predictions = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=base_model.input, outputs=predictions)

    for layer in base_model.layers:
        layer.trainable = False

    model.compile(optimizer=Adam(learning_rate = lr), loss='categorical_crossentropy', metrics=['accuracy'])
    print("Output shape of the model:", model.output_shape)

    return model

In [None]:
def compile_fit(model, X_train, y_train, X_val, y_val):

    # print(type(X_train), type(X_val))
    # print(type(y_train), type(y_val))

    hist = model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=ep, batch_size=batch_size)

    return model, hist

In [None]:
def plot_history(hist, n):
    fig, ax = plt.subplots(2)
    ax[0].plot(hist.history['accuracy'], color="blue")
    ax[0].plot(hist.history['val_accuracy'], color="green")
    ax[0].set(xlabel="epochs", ylabel="Accuracy")
    ax[0].set_xlim((0,ep))
    ax[0].set_ylim((0,1))
    ax[0].legend(['training', 'testing'])
    ax[0].set_title("Accuracy Trend")

    ax[1].plot(hist.history['loss'], color="blue")
    ax[1].plot(hist.history['val_loss'], color="green")
    ax[1].set(xlabel="epochs", ylabel="Loss")
    ax[1].set_xlim((0,ep))
    ax[1].set_ylim((0,1))
    ax[1].legend(['training', 'testing'])
    ax[1].set_title("Loss Trend")

    fig.tight_layout(pad=2.0)
    plt.show()

In [None]:
def eval_model(num_classes, model, X_val, y_val, y_cols):
  # Compute loss and accuracy using model.evaluate()
  loss, acc = model.evaluate(X_val, y_val)

  y_pred = model.predict(X_val)
  print('y_pred', y_pred)
  print('y_val', y_val)

  if (num_classes == 3) :

    # Convert y_val to multiclass format
    y_val = np.argmax(y_val, axis=1)

    # Convert y_pred to multiclass format
    y_pred = np.argmax(y_pred, axis=1)

    print('y_pred', y_pred)
    print('y_val', y_val)

    # Check the type of y_pred_probs
    print("Type of y_pred using model.predict:", type(y_pred))
    print("shape of the y_pred using model.predict:", y_pred.shape)

    # Compute confusion matrix
    # y_val_argmax = np.argmax(y_val, axis=1)
    # y_pred_argmax = np.argmax(y_pred, axis=1)
    # conf_mat = confusion_matrix(y_val_argmax, y_pred_argmax)

    conf_mat = mt.confusion_matrix(y_val, y_pred)
    print("confusion matrix ", conf_mat)

    target_names = y_cols

    print("classification report", mt.classification_report(y_val, y_pred, target_names=target_names, digits = 3))

    # Compute classification report
    report = mt.classification_report(y_val, y_pred, target_names=target_names, output_dict=True)
    report_df = pd.DataFrame(report).T

    print("classification report in dataframe - match accuracy with model.evaluate ")
    print(report_df)

    # Select the first three rows
    report_df_top3 = report_df.head(3)

    # Calculate average metrics for the first three rows
    avg_precision = report_df_top3['precision'].mean()
    avg_recall = report_df_top3['recall'].mean()
    avg_f1_score = report_df_top3['f1-score'].mean()

    print(f"Average Precision (first 3 classes): {avg_precision:.3f}")
    print(f"Average Recall (first 3 classes): {avg_recall:.3f}")
    print(f"Average F1-Score (first 3 classes): {avg_f1_score:.3f}")

    metrics = {
        'acc': acc,
        'loss': loss,
        'conf_mat': conf_mat,
        'sens (recall)': avg_recall,
        'f1': avg_f1_score,
        'prec': avg_precision
    }

  elif (num_classes == 2) :

    # Convert y_val to multiclass format
    y_val = np.argmax(y_val, axis=1)

    # Convert y_pred to multiclass format
    y_pred = np.argmax(y_pred, axis=1)

    print('y_pred', y_pred)
    print('y_val', y_val)

    # Check the type of y_pred_probs
    print("Type of y_pred using model.predict:", type(y_pred))
    print("shape of the y_pred using model.predict:", y_pred.shape)

    # Compute confusion matrix
    conf_mat = confusion_matrix(y_val, y_pred)

    # Compute confusion matrix
    # y_pred = np.argmax(y_pred, axis=1)
    # conf_mat = confusion_matrix(y_val, y_pred)

    # Compute metrics from confusion matrix
    tn, fp, fn, tp = conf_mat.ravel()
    precision = precision_score(y_val, y_pred)
    recall = recall_score(y_val, y_pred)
    f1 = f1_score(y_val, y_pred)

    metrics = {
        'acc': acc,
        'loss': loss,
        'conf_mat': conf_mat,
        'sens (recall)': recall,
        'f1': f1,
        'prec': precision,
        'tn': tn,
        'tp': tp,
        'fn': fn,
        'fp': fp
    }

  else :
    metrics = {}

  return metrics

In [None]:
'''
# Replacing nan values to 1
def nan_to_0(data):

    df1 = data.copy()

    for idx, row in df1.iterrows():
      arr = row['fcmap']
      matrix = np.nan_to_num(arr, copy = True, nan = 0.0)
      df1.at[idx, 'fcmap'] = matrix

    print(df1)

    return df
'''

"\n# Replacing nan values to 1\ndef nan_to_0(data):\n\n    df1 = data.copy()\n\n    for idx, row in df1.iterrows():\n      arr = row['fcmap']\n      matrix = np.nan_to_num(arr, copy = True, nan = 0.0)\n      df1.at[idx, 'fcmap'] = matrix\n\n    print(df1)\n\n    return df\n"

In [None]:
'''
def computeMinMax(X):
  min_matrix = X.min(axis = 0)
  max_matrix = X.max(axis = 0)
  return (min_matrix, max_matrix)
'''

'\ndef computeMinMax(X):\n  min_matrix = X.min(axis = 0)\n  max_matrix = X.max(axis = 0)\n  return (min_matrix, max_matrix)\n'

In [None]:
'''
def normalize_instance(X, minn, maxx):
  normalised_X = np.zeros(shape=(X.shape[0], X.shape[1]))

  for idx, x in np.ndenumerate(X):
    if minn[idx] == maxx[idx]:
      normalised_X[idx] = x
    else:
      normalised_X[idx] = (x - minn[idx])/(maxx[idx] - minn[idx])
    return normalised_X
'''

'\ndef normalize_instance(X, minn, maxx):\n  normalised_X = np.zeros(shape=(X.shape[0], X.shape[1]))\n\n  for idx, x in np.ndenumerate(X):\n    if minn[idx] == maxx[idx]:\n      normalised_X[idx] = x\n    else:\n      normalised_X[idx] = (x - minn[idx])/(maxx[idx] - minn[idx])\n    return normalised_X\n'

In [None]:
'''
def normalize(X_train, X_val):
    # Assuming X_train is your DataFrame with matrices in a single column
    matrices = X_train  # Get the values from the 'matrices' column
    # Convert the matrices to a 2D NumPy array
    X_train_2d = np.stack(matrices)

    # Assuming X_train is your DataFrame with matrices in a single column
    matrices = X_val  # Get the values from the 'matrices' column
    # Convert the matrices to a 2D NumPy array
    X_val_2d = np.stack(matrices)

    min_matrix, max_matrix = computeMinMax(X_train_2d)

    print("shape of min matrix", min_matrix.shape)
    print("shape of max matrix", max_matrix.shape)

    normalized_instances = []
    for instance in X_train_2d:
        normalized_instance = normalize_instance(instance, min_matrix, max_matrix)
        normalized_instances.append(normalized_instance)

    # Convert the list of normalized instances to a NumPy array
    X_normalized_trained_2d = np.array(normalized_instances)

    normalized_instances = []
    for instance in X_val_2d:
        normalized_instance = normalize_instance(instance, min_matrix, max_matrix)
        normalized_instances.append(normalized_instance)

    # Convert the list of normalized instances to a NumPy array
    X_normalized_val_2d = np.array(normalized_instances)

    return (X_normalized_trained_2d, X_normalized_val_2d)
'''

'\ndef normalize(X_train, X_val):\n    # Assuming X_train is your DataFrame with matrices in a single column\n    matrices = X_train  # Get the values from the \'matrices\' column\n    # Convert the matrices to a 2D NumPy array\n    X_train_2d = np.stack(matrices)\n\n    # Assuming X_train is your DataFrame with matrices in a single column\n    matrices = X_val  # Get the values from the \'matrices\' column\n    # Convert the matrices to a 2D NumPy array\n    X_val_2d = np.stack(matrices)\n\n    min_matrix, max_matrix = computeMinMax(X_train_2d)\n\n    print("shape of min matrix", min_matrix.shape)\n    print("shape of max matrix", max_matrix.shape)\n\n    normalized_instances = []\n    for instance in X_train_2d:\n        normalized_instance = normalize_instance(instance, min_matrix, max_matrix)\n        normalized_instances.append(normalized_instance)\n\n    # Convert the list of normalized instances to a NumPy array\n    X_normalized_trained_2d = np.array(normalized_instances)\n\n  

In [None]:
# Padding Matrices
# def padding_data(data):
#     def pad_matrix(row):
#         arr = row['fcmap']
#         pad_top = (224 - arr.shape[0]) // 2
#         pad_bottom = 224 - arr.shape[0] - pad_top
#         pad_left = (224 - arr.shape[1]) // 2
#         pad_right = 224 - arr.shape[1] - pad_left
#         padded_matrix = np.pad(arr, ((pad_top, pad_bottom), (pad_left, pad_right)), mode='constant')
#         return padded_matrix

#     df_pad = data.copy()

#     df_pad['fcmap'] = df_pad.apply(lambda row: pad_matrix(row), axis=1)

#     return df_pad

def padding_data(X, target_shape=(224, 224)):
    """
    Pad each matrix in X_train to the target shape.

    Args:
        X_train (numpy.ndarray): Input array containing matrices.
        target_shape (tuple): Target shape for padding.

    Returns:
        numpy.ndarray: Padded matrices.
    """
    padded_matrices = []

    for matrix in X:
        pad_top = (target_shape[0] - matrix.shape[0]) // 2
        pad_bottom = target_shape[0] - matrix.shape[0] - pad_top
        pad_left = (target_shape[1] - matrix.shape[1]) // 2
        pad_right = target_shape[1] - matrix.shape[1] - pad_left

        padded_matrix = np.pad(matrix, ((pad_top, pad_bottom), (pad_left, pad_right), (0, 0)), mode='constant')
        padded_matrices.append(padded_matrix)

    return np.array(padded_matrices)

In [None]:
'''
def increase_channels(X):
    # preparing data for vgg pre-trained
    X = np.stack(X).reshape(-1, 132, 132)
    print("stacking all matrices together", X.shape)
    X = np.expand_dims(X, axis=-1)
    print("adding channel dimension", X.shape)
    # X = np.repeat(X, 3, axis=-1)
    # print("increasing channels for vgg", X.shape)

    return X
'''

'\ndef increase_channels(X):\n    # preparing data for vgg pre-trained\n    X = np.stack(X).reshape(-1, 132, 132)\n    print("stacking all matrices together", X.shape)\n    X = np.expand_dims(X, axis=-1)\n    print("adding channel dimension", X.shape)\n    # X = np.repeat(X, 3, axis=-1)\n    # print("increasing channels for vgg", X.shape)\n\n    return X\n'

In [None]:
def make_dataset(choice):
    # Load dataframe from the pickle file
    data = pd.read_pickle('/content/drive/MyDrive/Colab Notebooks/ROIxTimeseries/fcmap+psi+pearson_data.pkl')

    if choice == 'A':
        # Filter rows where 'adhd' or 'autism' is 1 (keep only ADHD or autism subjects)
        data = data[(data['adhd'] == 1) | (data['autism'] == 1)]
        y_cols = ['adhd', 'autism']  # Specify the columns for y
    elif choice == 'B':
        # Filter rows where 'autism' or 'healthy' is 1 (keep only autism or healthy subjects)
        data = data[(data['autism'] == 1) | (data['healthy'] == 1)]
        y_cols = ['autism', 'healthy']  # Specify the columns for y
    elif choice == 'C':
        # Filter rows where 'adhd' or 'healthy' is 1 (keep only ADHD or healthy subjects)
        data = data[(data['adhd'] == 1) | (data['healthy'] == 1)]
        y_cols = ['adhd', 'healthy']  # Specify the columns for y
    elif choice == 'D':
        # Keep all rows
        y_cols = ['adhd', 'autism', 'healthy']  # Specify the columns for y
    else:
        print("Invalid choice. Please enter 'A', 'B', 'C', or 'D'.")
        return pd.DataFrame(), []

    print(data)

    # df1 = nan_to_0(data)

    print(y_cols)
    return data, y_cols

In [None]:
def driver(choice):

    # choice = input("Enter your choice (A, B, C, or D): ").upper()

    choice = choice.upper()

    df, y_cols = make_dataset(choice)
    #print(d.head)

    X = df['combined_matrix']
    print(X.shape)
    print("type of matrices", type(X))

    X = padding_data(X)
    print("after padding",X.shape)
    print("type of matrices after padding", type(X))

    y = df[y_cols].values
    #y = to_categorical(y, num_classes=3)
    # print(y.shape)
    # print(y)
    print("type of label columns", type(y))

    # Get the number of classes
    num_classes = y.shape[1]
    print("No. of classes", num_classes)

    input_shape = X[0].shape
    print("Input_shape:", input_shape)

    if (num_classes == 2) :
      result_df = pd.DataFrame(columns = ['fold','acc','loss','conf_mat', 'sens (recall)','f1','prec', 'tn', 'tp', 'fn', 'fp'])
    elif (num_classes == 3) :
      result_df = pd.DataFrame(columns = ['fold','acc','loss','conf_mat', 'sens (recall)','f1','prec'])
    else :
      result_df = {}

    # n_splits = n_splits # Number of folds

    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)

    tf.keras.backend.clear_session()
    for i, (train_index, val_index) in enumerate(kf.split(X, y)):

        print("FOLD : ", i+1)

        X_train, X_val = X[train_index], X[val_index]
        y_train, y_val = y[train_index], y[val_index]

        # Iterate over the matrices in X_train and convert them to tensors
        X_train_tensors = tf.convert_to_tensor([tf.convert_to_tensor(matrix.astype(np.float32)) for matrix in X_train])
        X_val_tensors = tf.convert_to_tensor([tf.convert_to_tensor(matrix.astype(np.float32)) for matrix in X_val])
        # Convert y_train and y_val to tensors
        y_train_tensors = tf.convert_to_tensor(y_train.astype(np.float32))
        y_val_tensors = tf.convert_to_tensor(y_val.astype(np.float32))

        # X_normal_train, X_normal_val = normalize(X_train, X_val)

        # X_train_1 = increase_channels(X_normal_train)
        # X_val_1 = increase_channels(X_normal_val)

        # input_shape_2 =  X_train_1[0].shape
        # print("Input_shape with channel: ", input_shape_2) # in case, sent as argument to get_model()

        # compiled_m = get_model(num_classes)
        compiled_m = get_model(input_shape,num_classes)

        trained_m, history = compile_fit(compiled_m, X_train_tensors, y_train_tensors, X_val_tensors, y_val_tensors)
        plot_history(history, i+1)

        scores = eval_model(num_classes, trained_m, X_val_tensors, y_val_tensors, y_cols)
        scores['fold']=i+1
        print("Scores", scores)
        scores = pd.DataFrame([scores])
        result_df = pd.concat([result_df,scores], ignore_index=True)
        tf.keras.backend.clear_session()

    return result_df

In [None]:
# Define a list of choices
# choices = ['A', 'B', 'C', 'D']
choices = ['A']

# Create an empty dictionary to store the result dataframes
result_dfs = {}

# Loop through each choice
for choice in choices:
    # Call the driver() function with the current choice
    result_df = driver(choice)

    # Store the result dataframe in the dictionary with the choice as the key
    result_dfs[choice] = result_df

        subject                                    combined_matrix autism  \
0      subject1  [[[0.0, 1.0, 0.0], [0.9769444491181637, 0.8016...      1   
1      subject2  [[[0.0, 1.0, 0.0], [0.9935199866799433, 0.9101...      1   
2      subject3  [[[0.0, 1.0, 0.0], [0.992569280979119, 0.75301...      1   
3      subject4  [[[0.0, 1.0, 0.0], [0.9984916334274369, 0.8178...      1   
4      subject5  [[[0.0, 1.0, 0.0], [0.9920728128274142, 0.7477...      1   
..          ...                                                ...    ...   
105  subject106  [[[0.0, 1.0, 0.0], [0.7935194794747894, 0.7190...      0   
106  subject107  [[[0.0, 1.0, 0.0], [0.8799582823263669, 0.8923...      0   
107  subject108  [[[0.0, 1.0, 0.0], [0.9508848131818459, 0.9383...      0   
108  subject109  [[[0.0, 1.0, 0.0], [0.9414804729782632, 0.9300...      0   
109  subject110  [[[0.0, 1.0, 0.0], [0.8507568146625619, 0.7946...      0   

    adhd healthy  
0      0       0  
1      0       0  
2      0       0  

In [None]:
print(result_dfs['A'])

In [None]:
print(result_dfs['B'])

In [None]:
print(result_dfs['C'])

In [None]:
print(result_dfs['D'])