<a href="https://colab.research.google.com/github/Magwindiri/africatransformation/blob/main/MobileNetClassifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install tensorflow

In [None]:
import tensorflow as tf
import pickle
import os
import cv2
import numpy as np
from sklearn.metrics import recall_score, precision_score, accuracy_score, f1_score
print(tf.__version__)

2.13.0


In [None]:
os.environ["TF_FORCE_GPU_ALLOW_GROWTH"]="true"

In [None]:
! export TF_FORCE_GPU_ALLOW_GROWTH=true

In [None]:
IMAGE_SIZE = (224,224,3)

The is configuring the GPU memory growth for TensorFlow to allocate memory on-demand, rather than allocating all available GPU memory upfront

In [None]:
gpus_devices = tf.config.experimental.list_physical_devices('GPU')

if gpus_devices:
  for gpu in gpus_devices:
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
# Use all available GPUs with MirroredStrategy
strategyMirror = tf.distribute.MirroredStrategy()

# Print the number of synchronized devices (usually the number of GPUs)
print(f'Number of devices: {strategyMirror.num_replicas_in_sync}')

Number of devices: 1


In [None]:
# defines a convolutional neural network (CNN)

def create_cnnnet(input_shape):
    model = tf.keras.Sequential()

    model.add(tf.keras.layers.Conv2D(64, (3, 3), input_shape=input_shape, padding='same', activation='relu'))
    model.add(tf.keras.layers.BatchNormalization(momentum=0.9))
    model.add(tf.keras.layers.MaxPooling2D())

    model.add(tf.keras.layers.Conv2D(128, (3, 3), padding='same', activation='relu'))
    model.add(tf.keras.layers.BatchNormalization(momentum=0.9))
    model.add(tf.keras.layers.MaxPooling2D())

    model.add(tf.keras.layers.Conv2D(256, (3, 3), padding='same', activation='relu'))
    model.add(tf.keras.layers.BatchNormalization(momentum=0.9))

    # Flatten the output
    model.add(tf.keras.layers.GlobalMaxPooling2D())
    return model


In [None]:
# creates and returns a neural network model
def second_model():
    input_shape = (90, IMAGE_SIZE[0], IMAGE_SIZE[1], IMAGE_SIZE[2])
    print('Input data shape: ', input_shape)

    convnet = create_cnnnet(input_shape[1:])

    model = tf.keras.Sequential()
    model.add(tf.keras.layers.TimeDistributed(convnet, input_shape=input_shape))
    model.add(tf.keras.layers.LSTM(64, return_sequences=False))

    model.add(tf.keras.layers.Dense(256, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))
    model.add(tf.keras.layers.Dense(128, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))
    model.add(tf.keras.layers.Dense(64, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))
    model.add(tf.keras.layers.Dense(16, activation='relu'))

    model.add(tf.keras.layers.Dense(1, activation='sigmoid'))

    return model

In [None]:
def first_model(IMAGE_SIZE):
    # Define the input layer
    input_shape = (None, IMAGE_SIZE[0], IMAGE_SIZE[1], IMAGE_SIZE[2])
    #inp = tf.keras.layers.Input(shape=input_shape)
    input = tf.keras.layers.Input(shape=input_shape)

    # Load MobileNetV2 model with modifications
    baseline_models = tf.keras.applications.MobileNetV2(
        include_top=False,
        weights='imagenet',
        pooling='max',
        classes=2,
        input_shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3),
    )

    # Freeze the layers of the pre-trained model
    for layer in baseline_models.layers:
        layer.trainable = False

    # Wrap the MobileNetV2 with TimeDistributed layer for sequence input
    x = tf.keras.layers.TimeDistributed(baseline_models)(input)

    # Add an LSTM layer for sequence modeling
    x = tf.keras.layers.LSTM(64, return_sequences=False)(x)

    # Apply dropout to prevent overfitting
    x = tf.keras.layers.Dropout(0.3)(x)

    # Create the output layer for binary classification
    out = tf.keras.layers.Dense(1, activation='sigmoid')(x)

    # Create the model
    model = tf.keras.Model(inputs=input, outputs=out)

    # Compile the model
    model.compile(
        loss=tf.keras.losses.BinaryCrossentropy(),
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
        metrics=['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall()]
    )

    return model

# Example usage:
IMAGE_SIZE = (224, 224, 3)  # Define your IMAGE_SIZE
model = first_model(IMAGE_SIZE)

In [None]:
all_accs = []
all_pres = []
all_recs = []
all_f1s = []

In [None]:
STARTING_FOLD_INDEX = 319

In [None]:
import sklearn
from scipy import interpolate
import pickle, numpy as np
from sklearn.metrics import roc_curve, roc_auc_score
base_falsepositiverate = np.linspace(0, 1, 101)

ROC curve analysis calculating the mean of true positive rates (calculate_mean_tpr) and the mean of area under the ROC curve (mean_auroc)

In [None]:
def calculate_mean_tpr(TPRS, aurocs):

    calculate_mean_tpr = np.mean(TPRS, axis=0)

    # plot the auroc curves
    calculate_mean_auroc = sum(aurocs) / len(aurocs)

    return calculate_mean_tpr, calculate_mean_auroc

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pickle
import os

from google.colab import drive
drive.mount('/content/drive')

FOLD_TPRS = [] # all the saved TPRS
FOLD_AUROCS = [] # all the saved AUROCs
META_RESULT_MATRIX = [] # all the saved results

#with open ("file_names_folds.pkl", 'rb') as f:
#SEEDS, FOLD_FILES = pickle.load(f)
with open ("/content/drive/MyDrive/CPUT/PostGradDip/capstone/file_names_folds.pkl", 'rb') as f:
  SEEDS, FOLD_FILES = pickle.load(f)

STARTING_FOLD_INDEX = 0

index_to_start_at = STARTING_FOLD_INDEX

TPRS, FPRS, local_aurocs = [],[], []
for fold in FOLD_FILES[index_to_start_at:]:

    print('FOLD::: ', fold)

    train_files = [a.strip('_') for a in fold['train']]
    test_files = [a.strip('_') for a in fold['test']]

    X_train = []
    X_test = []

    y_train = []
    y_test = []

    for filename in train_files:
        filename_int = int(filename.split('.mp4')[0])

        if filename_int <= 115:
            curr_y = 1
            subdir_name = 'armflapping'
        else:
            curr_y = 0
            subdir_name = 'control'

        curr_x = []
        for frame in os.listdir('behavior_data/' + subdir_name + '/' + filename):

            frame_num = int(frame.split('.')[0])
            if frame_num > 90:
                continue

            image = cv2.imread('behavior_data/' + subdir_name + '/' + filename + '/' + frame)
            try:
                image = image.reshape((image.shape[0], image.shape[1], image.shape[2]))
            except:
                continue

            image = cv2.resize(image, (224, 224))
            curr_x.append(image)

        len_data = len(os.listdir('behavior_data/' + subdir_name + '/' + filename))
        if len_data < 90:
            for abc in range(len_data, 90):
                curr_x.append(np.zeros((224, 224, 3)))

        curr_x = np.array(curr_x)

        X_train.append(curr_x)
        y_train.append(curr_y)

    for filename in test_files:
        filename_int = int(filename.split('.mp4')[0])

        if filename_int <= 115:
            curr_y = 1
            subdir_name = 'armflapping'
        else:
            curr_y = 0
            subdir_name = 'control'

        curr_x = []
        for frame in os.listdir('behavior_data/' + subdir_name + '/' + filename):

            frame_num = int(frame.split('.')[0])
            if frame_num > 90:
                continue

            image = cv2.imread('behavior_data/' + subdir_name + '/' + filename + '/' + frame)
            try:
                image = image.reshape((image.shape[0], image.shape[1], image.shape[2]))
            except:
                continue

            image = cv2.resize(image, (224, 224))
            curr_x.append(image)

        len_data = len(os.listdir('behavior_data/' + subdir_name + '/' + filename))
        if len_data < 90:
            for abc in range(len_data, 90):
                curr_x.append(np.zeros((224, 224, 3)))

        curr_x = np.array(curr_x)

        X_test.append(curr_x)
        y_test.append(curr_y)

    X_train = np.array(X_train)
    X_test = np.array(X_test)
    y_train = np.array(y_train)
    y_test = np.array(y_test)

    print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

    model = first_model()

    model.compile(loss = tf.keras.losses.BinaryCrossentropy(),
                    optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001),
                    metrics = [['accuracy', tf.keras.metrics.Precision(name="precision"), tf.keras.metrics.Recall(name="recall")]])

    history = model.fit(X_train,
                        y_train,
                        #validation_data = (X_test, y_test),
                        batch_size = 16,
                        epochs = 60)


    ###
    ### EVALUATE HERE!!!!!
    ###

    predictions = []
    trues = []
    for i in range(X_test.shape[0]):
        X_to_predict = np.array([X_test[i]])
        pred = model(X_to_predict)
        print(pred)
        true = y_test[i]
        if pred < 0.5:
            predictions.append(0)
        else:
            predictions.append(1)
        trues.append(true)

    acc = accuracy_score(trues, predictions)
    pre = precision_score(trues, predictions)
    rec = recall_score(trues, predictions)
    f1 = f1_score(trues, predictions)

    print('\n\n\n\n\n\n\n  ', acc, pre, rec, f1, ' \n\n\n\n\n\n\n')

    all_accs.append(acc)
    all_pres.append(pre)
    all_recs.append(rec)
    all_f1s.append(f1)

    # get the training scores
    training_accuracy = history.history['accuracy'][-1]
    training_precision = history.history['precision'][-1]
    training_recall = history.history['recall'][-1]
    training_f1 = 2 * training_precision * training_recall / (training_precision + training_recall + tf.keras.backend.epsilon())

    with open('performances/' + str(STARTING_FOLD_INDEX) + '_results.txt', 'w') as f:
        f.write('Training Accuracy: ' + str(training_accuracy) + '\n')
        f.write('Training Precision: ' + str(training_precision) + '\n')
        f.write('Training Recall: ' + str(training_recall) + '\n')
        f.write('Training F1: ' + str(training_f1) + '\n')
        f.write('Validation Accuracy: ' + str(acc) + '\n')
        f.write('Validation Precision: ' + str(pre) + '\n')
        f.write('Validation Recall: ' + str(rec) + '\n')
        f.write('Validation F1: ' + str(f1) + '\n')
        f.write('AUROC: ' + str(roc_auc_score(trues, predictions)) + '\n')
        f.close()

    STARTING_FOLD_INDEX += 1

    fpr, tpr, _ = roc_curve(trues, predictions)
    tpr = interp(base_fpr, fpr, tpr)
    tpr[0] = 0.0

    with open("tprs/" + str(STARTING_FOLD_INDEX) + "_tprs.pickle", "wb") as f:
        pickle.dump(tpr, f)