In [None]:
import os
import warnings
import shutil
warnings.filterwarnings(action='ignore')


import math
import glob
import numpy as np
import random
import time

from sklearn.utils import shuffle
from sklearn.metrics import roc_curve, confusion_matrix, ConfusionMatrixDisplay, roc_auc_score, precision_score, recall_score, f1_score, classification_report
from sklearn.dummy import DummyClassifier
from PIL import Image
from tqdm import tqdm,tnrange,tqdm_notebook
import tensorflow as tf
from tqdm.keras import TqdmCallback
from tensorflow.keras import backend as K 
from tensorflow.keras import layers, Model
from tensorflow.keras.callbacks import ReduceLROnPlateau,EarlyStopping,ModelCheckpoint
from tensorflow.keras.preprocessing.image import ImageDataGenerator 
from tensorflow.keras import applications as app
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten,AveragePooling2D#, CenterCrop
from tensorflow.keras.layers.experimental.preprocessing import CenterCrop
from tensorflow.keras.layers import Dense,BatchNormalization,Dropout 
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.applications import EfficientNetB4, ResNet50, ResNet101, VGG16, MobileNet, InceptionV3, EfficientNetB2, densenet, ConvNeXtTiny
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.utils import plot_model

%matplotlib inline
%matplotlib inline
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.io as pio
import plotly.graph_objects as go



In [None]:
# @title Helper functions (run me)

def set_device():
  device = "cuda" if torch.cuda.is_available() else "cpu"
  if device != "cuda":
      print("WARNING: For this notebook to perform best, "
          "if possible, in the menu under `Runtime` -> "
          "`Change runtime type.`  select `GPU` ")
  else:
      print("GPU is enabled in this notebook.")

  return device

def set_device_tf():    
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
      # Restrict TensorFlow to only use the first GPU
      try:
        tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
        print(gpus[0])
      except RuntimeError as e:
        # Visible devices must be set at program startup
        print(e)
    else:
      print("No GPUs found")

#  Plotting function.
def plot_accuracy_and_loss(history):
  acc = history.history['accuracy']
  val_acc = history.history['val_accuracy']

  loss = history.history['loss']
  val_loss = history.history['val_loss']

  plt.figure(figsize=(8, 8))
  plt.subplot(2, 1, 1)
  plt.plot(acc, label='Training')
  plt.plot(val_acc, label='Validation')
  plt.legend(loc='lower right')
  plt.ylabel('Accuracy')
  plt.ylim([min(plt.ylim()),1.05])
  plt.title('Training and Validation Accuracy')

  plt.subplot(2, 1, 2)
  plt.plot(loss, label='Training')
  plt.plot(val_loss, label='Validation')
  plt.legend(loc='upper right')
  plt.ylabel('Cross Entropy')
  plt.ylim([np.min(np.concatenate((val_loss,loss)))-0.1,np.max(np.concatenate((val_loss,loss)))+0.1])
  plt.title('Training and Validation Loss')
  plt.xlabel('epoch')
  plt.show()
  plt.close('all')

In [None]:
set_device_tf()

In [None]:
# Data loading.
def load_data(train_dir, test_dir):
  BATCH_SIZE = 32 # 64 can fill up GPU memory!
  IMG_SIZE = (224, 224)
  AUTOTUNE = tf.data.AUTOTUNE
    
  if train_dir is not None:
    train_dataset = tf.keras.preprocessing.image_dataset_from_directory(train_dir,
                                                                  shuffle=True,
                                                                  validation_split=.2, ##RECENT CHANGE
                                                                  seed=123,
                                                                  subset='training',
                                                                  batch_size=BATCH_SIZE,
                                                                  image_size=IMG_SIZE)

    validation_dataset = tf.keras.preprocessing.image_dataset_from_directory(train_dir,
                                                                  shuffle=True,
                                                                  validation_split=.2, ##RECENT CHANGE
                                                                  seed=123,
                                                                  subset='validation',
                                                                  batch_size=BATCH_SIZE,
                                                                  image_size=IMG_SIZE)
    train_dataset = train_dataset.prefetch(buffer_size=AUTOTUNE)
    validation_dataset = validation_dataset.prefetch(buffer_size=AUTOTUNE)
    print('Number of train batches: %d' % tf.data.experimental.cardinality(train_dataset))
  else:
    train_dataset = None
    validation_dataset = None
        
  if test_dir is not None:
    test_dataset = tf.keras.preprocessing.image_dataset_from_directory(test_dir,
                                                                  shuffle=False,
                                                                  image_size=IMG_SIZE)
    test_dataset = test_dataset.prefetch(buffer_size=AUTOTUNE)

  else:
    test_dataset = None

  return train_dataset, validation_dataset, test_dataset


In [None]:
def compute_weights(train_dataset, validation_dataset):
  num_fails = 0
  num_passes = 0
  for images, labels in train_dataset:
      labels_np = labels.numpy()
      passes = np.count_nonzero(labels_np)
      num_passes = num_passes + passes
      fails = len(labels_np) - passes
      num_fails = num_fails + fails
      
  total = num_fails + num_passes
  print('Train Examples:\n    Total: {}\n, Passes: {}, Fails: {} ({:.2f}% of total)\n'.format(
      total, num_passes, num_fails, 100 * num_fails / total))

  # Class weights
  weight_for_fail = (1 / num_fails) * (total / 2.0)
  weight_for_passes = (1 / num_passes) * (total / 2.0)

  class_weights = {0: weight_for_fail, 1: weight_for_passes}

  print('Weight for class 0 (Fail): {:.2f}'.format(weight_for_fail))
  print('Weight for class 1 (Pass): {:.2f}'.format(weight_for_passes))

  # Number of validation classes
  num_fails = 0
  num_passes = 0
  for images, labels in validation_dataset:
    labels_np = labels.numpy()
    passes = np.count_nonzero(labels_np)
    num_passes = num_passes + passes
    fails = len(labels_np) - passes
    num_fails = num_fails + fails
    
  total = num_fails + num_passes
  print('Validation Examples: Total: {}\n, Passes: {}, Fails: {} ({:.2f}% of total)\n'.format(
    total, num_passes, num_fails, 100 * num_fails / total))
  
  return class_weights


In [None]:
# Create the base model from the pre-trained model MobileNet V2
def get_model(training=False, base_learning_rate = 0.0001, IMG_SIZE = (224, 224), model_type = ''):
    IMG_SHAPE = IMG_SIZE + (3,)

    ## Densenet, convnext, resnet50

    base_model_trainable = True

    if model_type != "custom":
        if model_type == "mobilenet":
            ## MOBILENETV2
            base_model = tf.keras.applications.MobileNetV2(input_shape=IMG_SHAPE,
                                                          include_top=False,
                                                          weights='imagenet')
            preprocess_input = tf.keras.applications.mobilenet_v2.preprocess_input
        elif model_type == "resnet":
            ## RESNET50
            base_model = tf.keras.applications.resnet50.ResNet50(input_shape=IMG_SHAPE,
                                                          include_top=False,
                                                        weights='imagenet')
            preprocess_input = tf.keras.applications.resnet50.preprocess_input
            fine_tune_at = 175 - 19
        
        elif model_type == "densenet":
            ## Densenet
            base_model = tf.keras.applications.densenet.DenseNet121(input_shape=IMG_SHAPE,
                                                          include_top=False,
                                                        weights='imagenet')
            preprocess_input = tf.keras.applications.densenet.preprocess_input
            fine_tune_at = 427 - 20 #52 #427- 35 #427 - 20
        elif model_type == "densenet with dropout":
            ## Densenet
            base_model = DenseNetWithDropout(input_shape=(224, 224, 3), include_top=False, weights='imagenet')
            preprocess_input = tf.keras.applications.densenet.preprocess_input
            fine_tune_at = 300
        elif model_type == "vgg16":
            ## Vgg16
            base_model = tf.keras.applications.vgg16.VGG16(input_shape=IMG_SHAPE,
                                                        include_top=False,
                                                      weights='imagenet')
            preprocess_input = tf.keras.applications.vgg16.preprocess_input
        elif model_type == "convnexttiny":
            print("CONVNEXT SELECTED")
            # convnexttiny
            base_model = ConvNeXtTiny(input_shape=IMG_SHAPE,
                                      include_top=False,
                                      weights='imagenet')
            preprocess_input = tf.keras.applications.convnext.preprocess_input
            fine_tune_at = 132 #132 #was 132


        #image_batch, label_batch = next(iter(train_dataset))
        #feature_batch = base_model(image_batch)
        print("Number of layers = " + str(len(base_model.layers)))
        
       
        if base_model_trainable:
            base_model.trainable = True
            base_learning_rate = base_learning_rate/10
        
            # Fine-tune from this layer onwards 
            print("Number of layers = " + str(len(base_model.layers)))

            # Freeze all the layers before the `fine_tune_at` layer
            for layer in base_model.layers[:fine_tune_at]:
                layer.trainable = False
        else:
             base_model.trainable = False
    

        global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
        prediction_layer = tf.keras.layers.Dense(1, activation='sigmoid')

        inputs = tf.keras.Input(shape=(224, 224, 3))
        x = preprocess_input(inputs)
        x = base_model(x, training=False) #was training=False
        x = global_average_layer(x)
        x = tf.keras.layers.Dropout(0.8)(x, training=training)
        outputs = prediction_layer(x)
        model = tf.keras.Model(inputs, outputs)


    elif model_type == "custom":
        model = tf.keras.Sequential([
          tf.keras.layers.Rescaling(1./255),
          tf.keras.layers.Conv2D(12, 3, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01)),
          tf.keras.layers.MaxPooling2D(),
          tf.keras.layers.Conv2D(24, 3, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01)),
          tf.keras.layers.MaxPooling2D(),
          tf.keras.layers.Conv2D(48, 3, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01)),
          tf.keras.layers.MaxPooling2D(),
          tf.keras.layers.Flatten(),
          tf.keras.layers.Dense(64, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01)),
          tf.keras.layers.Dropout(0.6),
          tf.keras.layers.Dense(1, activation='sigmoid')
        ])

    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=base_learning_rate),
                  loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
                  metrics=['accuracy']) # recent change - added weight decay, subsequently removed , weight_decay=0.0005


    return model

In [None]:
def get_test_model(model_type):
  test_model = get_model(training=False, model_type=model_type)
  test_model.load_weights("Weights/" + model_type + ".h5")

  return test_model

def get_performance(model, test_dataset):
  test_model = get_test_model(model)
  loss, accuracy = test_model.evaluate(test_dataset)
  print('Test accuracy :', accuracy)
  return accuracy

def predict_on_dataset(model, dataset):
    labels = []
    predictions = []

    for images, image_labels in dataset:
        preds = model.predict(images)
        predictions.extend(preds)
        labels.extend(image_labels.numpy())

    return np.array(labels).flatten(), np.array(predictions).flatten()

In [None]:
#Plotting functions
def plot_image_classifications(validation_dataset):
  class_names = ["Fail", "Pass"]

  image_batch, label_batch = validation_dataset.as_numpy_iterator().next()
  predictions = model.predict_on_batch(image_batch).flatten()

  # Apply a sigmoid since our model returns logits
  predictions = tf.nn.sigmoid(predictions)
  predictions = tf.where(predictions < 0.5, 0, 1)

  print('Predictions:\n', predictions.numpy())
  print('Labels:\n', label_batch)
  print(predictions.numpy() - label_batch)

  plt.figure(figsize=(10, 10))
  start = 0
  stop = 10
  for i in range(start,stop):
    ax = plt.subplot(4, 5, i - start + 1)
    plt.imshow(image_batch[i].astype("uint8"))
    plt.title("True: " + class_names[label_batch[i]] + '\n' + "Pred: " + class_names[predictions[i]])
    plt.axis("off")


def calculate_metrics(labels, predicted_classes, class_of_interest=0):

    # Get confusion matrix
    cm = confusion_matrix(labels, predicted_classes)

    # Get values for class of interest
    TP = cm[class_of_interest, class_of_interest]
    FP = np.sum(cm[:, class_of_interest]) - TP
    FN = np.sum(cm[class_of_interest, :]) - TP
    TN = np.sum(cm) - TP - FP - FN

    # Calculate metrics
    sensitivity = TP / (TP + FN)
    specificity = TN / (TN + FP)
    precision = precision_score(labels, predicted_classes, pos_label=class_of_interest)
    recall = recall_score(labels, predicted_classes, pos_label=class_of_interest)

    print("Sensitivity = " + str(sensitivity))
    print("Specificity = " + str(specificity))
    print("Precision = " + str(precision))
    print("Recall = " + str(recall))

    return sensitivity, specificity, precision, recall

def plot_confusion_matrix_plotly(label_batch, predictions, class_names, save_path):
    predictions = tf.convert_to_tensor(predictions, dtype=tf.float32)  # Convert to tensor  
    predictions = tf.where(predictions < 0.5, 0, 1)
    cm = confusion_matrix(label_batch, predictions.numpy(), normalize='true')
    
    # Convert normalized values to percentage and prepare text annotations
    annotations = [f'{value:.1%}' for value in cm.flatten()]
    annotations = np.array(annotations).reshape(cm.shape)
    
    # Prepare text annotation colors
    text_colors = [['white', 'black'], ['black', 'white']]  # Top left and bottom right are white
    
    # Create heatmap
    heatmap = go.Heatmap(z=cm,
                         x=class_names,
                         y=class_names,
                         colorscale='Blues',
                         zmin=0,
                         zmax=1)
    
    # Create text annotations
    text_annotations = go.Scatter(x=np.tile(class_names, len(class_names)),
                                  y=np.repeat(class_names, len(class_names)),
                                  text=annotations.flatten(),
                                  mode='text',
                                  textposition='middle center',
                                  textfont_color=np.array(text_colors).flatten())
    
    # Create figure
    fig = go.Figure(data=[heatmap, text_annotations])
    fig.update_layout(title='Confusion Matrix',
                      xaxis_title='Predicted Label',
                      yaxis_title='True Label',
                      yaxis_autorange='reversed',
                      width=600,  # Set width
                      height=600,  # Set height
                      margin=dict(t=50, b=50, l=50, r=50))  # Adjust margins to better center the plot
    
    fig.show()
    pio.write_image(fig, save_path, scale=2)  # Use scale parameter to increase resolution

def plot_confusion_matrix_plotly_no_normalize(label_batch, predictions, class_names, save_path):
    predictions = tf.convert_to_tensor(predictions, dtype=tf.float32)  # Convert to tensor  
    predictions = tf.where(predictions < 0.5, 0, 1)
    cm = confusion_matrix(label_batch, predictions.numpy())  # Removed normalization
    
    # Prepare text annotations
    annotations = cm.astype(str)  # Convert counts to string
    annotations = np.array(annotations).reshape(cm.shape)
    
    # Prepare text annotation colors
    text_colors = [['white', 'black'], ['black', 'white']]  # Top left and bottom right are white
    
    # Create heatmap
    heatmap = go.Heatmap(z=cm,
                         x=class_names,
                         y=class_names,
                         colorscale='Blues')
    
    # Create text annotations
    text_annotations = go.Scatter(x=np.tile(class_names, len(class_names)),
                                  y=np.repeat(class_names, len(class_names)),
                                  text=annotations.flatten(),
                                  mode='text',
                                  textposition='middle center',
                                  textfont_color=np.array(text_colors).flatten())
    
    # Create figure
    fig = go.Figure(data=[heatmap, text_annotations])
    fig.update_layout(title='Confusion Matrix',
                      xaxis_title='Predicted',
                      yaxis_title='Actual',
                      yaxis_autorange='reversed',
                      width=600,  # Set width
                      height=600,  # Set height
                      margin=dict(t=50, b=50, l=50, r=50))  # Adjust margins to better center the plot
    
    fig.show()
    pio.write_image(fig, save_path, scale=2)  # Use scale parameter to increase resolution

    
def plot_confusion_matrix(label_batch, predictions, class_names, save_path):
  predictions = tf.convert_to_tensor(predictions, dtype=tf.float32) # Convert to tensor  
  predictions = tf.where(predictions < 0.5, 0, 1)
  cm = confusion_matrix(label_batch, predictions.numpy() , normalize='true')
  disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
  disp.plot()
  plt.savefig(save_path, facecolor='white', dpi=300,
              transparent=False)
  plt.show()
  plt.close('all')

def plot_roc(label_batch, predictions, save_path):
  fp, tp, _ = roc_curve(label_batch, predictions)

  plt.plot(100*fp, 100*tp, label="ROC", linewidth=2)
  plt.xlabel('False positives [%]')
  plt.ylabel('True positives [%]')
  plt.xlim([-0.5,100])
  plt.ylim([0,100.5])
  plt.grid(True)
  ax = plt.gca()
  ax.set_aspect('equal')
  #plt.savefig(save_path + 'roc.png', facecolor='white', dpi=300,
  #            transparent=False)
  plt.show()
  plt.close('all')

def get_performance_metrics(labels, predictions):
  auc = roc_auc_score(labels, predictions)
  predictions = tf.where(predictions < 0.5, 0, 1)
  print('Auc: %.3f' % auc)
  precision = precision_score(labels, predictions, pos_label=0, average='binary')
  recall = recall_score(labels, predictions, pos_label=0, average='binary')
  f1 = f1_score(labels, predictions, pos_label=0, average='binary')
  print('Precision: %.3f, Recall: %.3f, F-Score: %.3f,' % (precision, recall, f1))
  print(classification_report(labels, predictions))

In [None]:
densenet_model = get_test_model('densenet')
convnexttiny_model = get_test_model('convnexttiny') 

In [None]:
top_dir = "../Audio Data/Data/"
rootdir = top_dir + "Fold 0/Test/"
results = []
participant_labels = []
participant_predictions = []
concat_labels = []
concat_predictions = []
concat_predict_score_mean = []
fold_labels = []
prob_avg = []
output_avg = []
tic = time.time()
for subdir, dirs, files in os.walk(rootdir):
    for dir in dirs:
        print(rootdir + dir)
        _, _, test_dataset = load_data(None, rootdir + dir)
        #corrected_labels, corrected_predictions_resnet = predict_on_dataset(resnet_model, test_dataset)
        corrected_labels, corrected_predictions_densenet = predict_on_dataset(densenet_model, test_dataset)
        _, corrected_predictions_convnexttiny = predict_on_dataset(convnexttiny_model, test_dataset)

        print(corrected_predictions_densenet)
        corrected_predictions = np.transpose(np.vstack((#np.array(corrected_predictions_resnet), 
                                             np.array(corrected_predictions_densenet),
                                             np.array(corrected_predictions_convnexttiny))))
        print(np.shape(corrected_predictions))
        print(corrected_predictions)
        corrected_predictions = np.mean(corrected_predictions, axis=1) # np.sqrt(np.multiply(corrected_predictions[:, 0], corrected_predictions[:,1])) 
        print(corrected_predictions)
        print(np.shape(corrected_predictions))

        concat_labels = np.concatenate((concat_labels, corrected_labels), axis=0)
        
        print("Concat labels shape")
        print(np.shape(concat_labels))

        concat_predictions = np.concatenate((concat_predictions, corrected_predictions), axis=0)
        print("Concat predictions shape")
        print(np.shape(concat_predictions))
        predict_on_performance = 1-(np.count_nonzero(tf.where(corrected_predictions < 0.5, 0, 1) - corrected_labels))/len(corrected_labels)
        predict_score_mean = np.mean(corrected_predictions)
        concat_predict_score_mean.append(predict_score_mean)        

        print("Labels " + str(corrected_labels))
        print("Predictions = " + str(tf.where(corrected_predictions < 0.5, 0, 1).numpy()))
        print("Label length = " + str(len(corrected_labels)))
        print("Predictions length = " + str(len(corrected_predictions)))
        print("Model performance (predict_on) " + str(predict_on_performance))
        print("Prediction cumulative score = " + str(predict_score_mean))
        #get_performance(model, test_dataset)
        results.append(predict_on_performance)
        prob_avg.append(np.mean(corrected_predictions))
        output_avg.append(np.mean(tf.where(corrected_predictions < 0.5, 0, 1)))
        
        participant_labels.append(corrected_labels[0])
        print(participant_labels)
        participant_predictions.append(0 if predict_on_performance < 0.5 else 1)

        first_pass = False

        print("Elapsed time = " + str(time.time() - tic))
    break

## Single Clip Performance

In [None]:
#### %matplotlib inline
%matplotlib inline
print(results)
result_dict = []
class_names = ["Fail", "Pass"]
i = 0

print("Overall accuracy is " + str(np.mean(results)))
print(np.shape(concat_predictions))
print(np.shape(concat_labels))
plot_confusion_matrix(concat_labels, concat_predictions, class_names, "Result Images/clip_confusion.png")
plot_confusion_matrix_plotly(concat_labels, concat_predictions, class_names, "Result Images/clip_confusion_plotly.png")
plot_confusion_matrix_plotly_no_normalize(concat_labels, concat_predictions, class_names, "Result Images/clip_confusion_plotly.png")
plot_roc(concat_labels, concat_predictions,  "Result Images/clip_roc.png")
get_performance_metrics(concat_labels, concat_predictions)
print(concat_predictions)
predicted_labels = tf.where(concat_predictions < 0.5, 0, 1)

calculate_metrics(concat_labels, predicted_labels, class_of_interest=0)

# Participant Level Performance

In [None]:
i = 0
participant_predictions = []

print(prob_avg)
for score in prob_avg: # can also use prob_avg or output_avg
    participant_predictions.append(1 if score > 0.5 else 0)
    i=i+1

plot_confusion_matrix(np.array(participant_labels), np.array(participant_predictions), class_names, "Result Images/participant_confusion.png")
plot_confusion_matrix_plotly(np.array(participant_labels), np.array(participant_predictions), class_names, "Result Images/participant_confusion_plotly.png")

print(concat_predict_score_mean)
print(participant_labels)
print(participant_predictions)
plot_roc(participant_labels, participant_predictions,  "Result Images/participant_roc.png")
get_performance_metrics(participant_labels, np.array(concat_predict_score_mean))
predicted_participant_labels = tf.where( np.array(concat_predict_score_mean) < 0.5, 0, 1)
calculate_metrics(participant_labels, predicted_participant_labels, class_of_interest=0)

# **Dummy Classifier to compare performance**


In [None]:
dummy_clf = DummyClassifier(strategy="constant", constant=1)
dummy_clf.fit(None, concat_labels)
dummy_predictions = dummy_clf.predict(np.ones((len(concat_labels),1)))
get_performance_metrics(concat_labels, dummy_predictions)