# Import libraries

In [2]:
import os
import warnings
import numpy as np
import logging
import random

import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl

import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
from sklearn.preprocessing import StandardScaler

# Fix randomness and hide warnings
RND = False
if not RND:
    seed = 76998669

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
if not RND:
    os.environ['PYTHONHASHSEED'] = str(seed)
os.environ['MPLCONFIGDIR'] = os.getcwd()+'/configs/'

warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

if not RND:
    np.random.seed(seed)
    random.seed(seed)

tf.autograph.set_verbosity(0)
tf.get_logger().setLevel(logging.ERROR)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
if not RND:
    tf.random.set_seed(seed)
    tf.compat.v1.set_random_seed(seed)
print(tf.__version__)

2.14.0


# Load data

In [None]:
# download clean dataset
!wget https://storage.googleapis.com/storage.barbiero.dev/public_data_clean.npz

In [3]:
# load dataset
dataset = np.load('public_data_clean.npz', allow_pickle=True)
keys = list(dataset.keys())
images = np.array(dataset[keys[0]])
labels = np.array(dataset[keys[1]])

labels_map = {0: "healthy", 1: "unhealthy"}
labels_rev_map = {"healthy": 0, "unhealthy": 1}
labels = np.array([labels_rev_map[label] for label in labels])

## Examine the class label imbalance

In [4]:
pos, neg = np.sum(labels), len(labels) - np.sum(labels)
total = len(labels)
print('Examples:\n    Total: {}\n    Positive: {} ({:.2f}% of total)\n'.format(total, pos, 100 * pos / total))

Examples:
    Total: 4850
    Positive: 1790 (36.91% of total)



## Split data

In [5]:
# Split the dataset into a combined training and validation set, and a separate test set
X_train_val, X_test, y_train_val, y_test = train_test_split(
    images,
    labels,
    test_size = int(0.15 * len(images)),
    **({"random_state":seed} if not RND else {}),
    stratify = labels
)

# Further split the combined training and validation set into a training set and a validation set
X_train, X_val, y_train, y_val = train_test_split(
    X_train_val,
    y_train_val,
    test_size = int(0.15 * len(images)),
    **({"random_state":seed} if not RND else {}),
    stratify = y_train_val
)

In [6]:
print(f'Average class probability in training set:   {y_train.mean():.4f}')
print(f'Average class probability in validation set: {y_val.mean():.4f}')
print(f'Average class probability in test set:       {y_test.mean():.4f}')

Average class probability in training set:   0.3693
Average class probability in validation set: 0.3686
Average class probability in test set:       0.3686


In [7]:
pos_features = X_train[y_train == 1]
neg_features = X_train[y_train == 0]

pos_labels = y_train[y_train == 1]
neg_labels = y_train[y_train == 0]

In [8]:
BUFFER_SIZE = 100000

def make_ds(features, labels):
  ds = tf.data.Dataset.from_tensor_slices((features, labels))#.cache()
  ds = ds.shuffle(BUFFER_SIZE).repeat()
  return ds

pos_ds = make_ds(pos_features, pos_labels)
neg_ds = make_ds(neg_features, neg_labels)

In [9]:
resampled_ds = tf.data.Dataset.sample_from_datasets([pos_ds, neg_ds], weights=[0.5, 0.5])
resampled_ds = resampled_ds.batch(batch_size).prefetch(2)

In [10]:
resampled_steps_per_epoch = np.ceil(2.0*neg/batch_size)
resampled_steps_per_epoch

96.0

In [11]:
for features, label in resampled_ds.take(1):
  print(label.numpy().mean())

0.53125


In [12]:
metrics = [
      tfk.metrics.BinaryCrossentropy(name='cross entropy'),  # same as model's loss
      tfk.metrics.MeanSquaredError(name='Brier score'),
      tfk.metrics.TruePositives(name='tp'),
      tfk.metrics.FalsePositives(name='fp'),
      tfk.metrics.TrueNegatives(name='tn'),
      tfk.metrics.FalseNegatives(name='fn'), 
      tfk.metrics.BinaryAccuracy(name='accuracy'),
      tfk.metrics.Precision(name='precision'),
      tfk.metrics.Recall(name='recall'),
      tfk.metrics.AUC(name='auc'),
      tfk.metrics.AUC(name='prc', curve='PR'), # precision-recall curve
]

In [13]:
colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
def plot_metrics(history):
  metrics = ['loss', 'prc', 'precision', 'recall']
  for n, metric in enumerate(metrics):
    name = metric.replace("_"," ").capitalize()
    plt.subplot(2,2,n+1)
    plt.plot(history.epoch, history.history[metric], color=colors[0], label='Train')
    plt.plot(history.epoch, history.history['val_'+metric],
             color=colors[0], linestyle="--", label='Val')
    plt.xlabel('Epoch')
    plt.ylabel(name)
    if metric == 'loss':
      plt.ylim([0, plt.ylim()[1]])
    elif metric == 'auc':
      plt.ylim([0.8,1])
    else:
      plt.ylim([0,1])

    plt.legend()

# Model

In [14]:
# Define key model parameters
input_shape = X_train.shape[1:]  # Input shape for the model
output_shape = 1 #y_train.shape[1]  # Output shape for the model
batch_size = 64                # Batch size for training
epochs = 200                     # Number of training epochs

# Print the defined parameters
print("Epochs:", epochs)
print("Batch Size:", batch_size)
print("Input Shape:", input_shape)
print("Output Shape:", output_shape)

Epochs: 200
Batch Size: 64
Input Shape: (96, 96, 3)
Output Shape: 1


## Build model

In [15]:
# Model Function
def apple_elixir_model(input_shape, output_shape):
  
  # Build the neural network layer by layer
  input_layer = tfkl.Input(shape=input_shape, name='Input')

  x = tfkl.Conv2D(filters=32,kernel_size=3,padding='same',activation='relu',name='conv00')(input_layer)
  x = tfkl.Conv2D(filters=32,kernel_size=3,padding='same',activation='relu',name='conv01')(x)
  x = tfkl.MaxPooling2D(name='mp0')(x)

  x = tfkl.Conv2D(filters=64,kernel_size=3,padding='same',activation='relu',name='conv10')(x)
  x = tfkl.Conv2D(filters=64,kernel_size=3,padding='same',activation='relu',name='conv11')(x)
  x = tfkl.MaxPooling2D(name='mp1')(x)

  x = tfkl.Conv2D(filters=128,kernel_size=3,padding='same',activation='relu',name='conv20')(x)
  x = tfkl.Conv2D(filters=128,kernel_size=3,padding='same',activation='relu',name='conv21')(x)
  x = tfkl.MaxPooling2D(name='mp2')(x)

  x = tfkl.Conv2D(filters=256,kernel_size=3,padding='same',activation='relu',name='conv30')(x)
  x = tfkl.Conv2D(filters=256,kernel_size=3,padding='same',activation='relu',name='conv31')(x)
  x = tfkl.GlobalAveragePooling2D(name='gap')(x)


  x = tfkl.Dense(units = 128, activation='relu')(x)


  output_layer = tfkl.Dense(units=output_shape ,activation='sigmoid',name='Output')(x)

  # Connect input and output through the Model class
  model = tfk.Model(inputs=input_layer, outputs=output_layer, name='Convnet')

  # Compile the model
  model.compile(loss=tfk.losses.BinaryCrossentropy(), optimizer=tfk.optimizers.Adam(weight_decay=5e-4), metrics=metrics)

  return model

In [16]:
model = apple_elixir_model(input_shape, output_shape)

In [None]:
# Print the model summary and plot the model architecture
model.summary()
tfk.utils.plot_model(model, expand_nested=True, show_shapes=True)

## Train model

In [17]:
# Define callbacks
early_stopping = tfk.callbacks.EarlyStopping(monitor='val_prc', patience=10, mode='max', restore_best_weights=True)

callbacks = [early_stopping]

val_ds = tf.data.Dataset.from_tensor_slices((X_val, y_val)).cache()
val_ds = val_ds.batch(batch_size).prefetch(2) 

# Train the model and save its history
history = model.fit(
    resampled_ds,
    #batch_size=batch_size,
    steps_per_epoch=resampled_steps_per_epoch,
    epochs=epochs,
    validation_data=val_ds,
    callbacks=callbacks
).history

# Save the trained model
#model.save('CHANGE_THIS_NAME')

Epoch 1/200
19/96 [====>.........................] - ETA: 1:46 - loss: 1.8240 - cross entropy: 1.8240 - Brier score: 0.2744 - tp: 196.0000 - fp: 188.0000 - tn: 438.0000 - fn: 394.0000 - accuracy: 0.5214 - precision: 0.5104 - recall: 0.3322 - auc: 0.5139 - prc: 0.4982

KeyboardInterrupt: 

In [None]:
plot_metrics(history)

In [None]:
# Find the epoch with the highest validation accuracy
best_epoch = np.argmax(history['val_accuracy'])

# Plot training and validation performance metrics
plt.figure(figsize=(20, 5))

# Plot training and validation loss
plt.plot(history['loss'], label='Training', alpha=0.8, color='#ff7f0e', linewidth=3)
plt.plot(history['val_loss'], label='Validation', alpha=0.8, color='#4D61E2', linewidth=3)
plt.legend(loc='upper left')
plt.title('Binary Crossentropy')
plt.grid(alpha=0.3)

plt.figure(figsize=(20, 5))

# Plot training and validation accuracy, highlighting the best epoch
plt.plot(history['accuracy'], label='Training', alpha=0.8, color='#ff7f0e', linewidth=3)
plt.plot(history['val_accuracy'], label='Validation', alpha=0.8, color='#4D61E2', linewidth=3)
plt.plot(best_epoch, history['val_accuracy'][best_epoch], marker='*', alpha=0.8, markersize=10, color='#4D61E2')
plt.legend(loc='upper left')
plt.title('Accuracy')
plt.grid(alpha=0.3)

plt.show()

## Visualizing Intermediate Representations

## Make inference