# MNIST Standard Biological Images: Feed Forward Neural Network

In [None]:
import time
from datetime import datetime
import re
import numpy as np
import pandas as pd
#import seaborn as sns  # for nicer plots
#sns.set(style="darkgrid")  # default style
import tensorflow as tf
from tensorflow import keras
from keras import metrics
import numpy as np
from tensorflow import keras
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras import layers, models, Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Activation
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
tf.get_logger().setLevel('INFO')

# Get date/time for local timezone
def get_localdate_str():
    tm_str = str(datetime.now())
    return tm_str

def get_time_with_minutes(date_str):
    tm_str = re.sub('^....-..-.. ', "", date_str)
    tm_str = re.sub(':..\.......$', "", tm_str)
    tm_str = re.sub(":", "-", tm_str)
    return tm_str

def get_date_with_seconds(date_str):
    tm_str = re.sub('\.......$', "", date_str)
    tm_str = re.sub(" ", "_", date_str)
    return tm_str

In [None]:
def display_confusion_matrix(y_tst, y_prd, ttl=None, sz=5, lbl=None):
    # Generate and display the confusion matrix
    cm = confusion_matrix(y_tst, y_prd)

    # Plot confusion matrix with labels
    fig, ax = plt.subplots(figsize=(sz,sz))
    
    if (lbl != None):
        im = ax.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
        ax.figure.colorbar(im, ax=ax)

        # Set labels and title
        ax.set_xlabel('Predicted labels')
        ax.set_ylabel('True labels')

        # Set tick labels
        ax.set_xticks(np.arange(len(lbl)))
        ax.set_yticks(np.arange(len(lbl)))
        ax.set_xticklabels(lbl)
        ax.set_yticklabels(lbl)

        # Rotate x-tick labels
        plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")

        # Add values to cells (optional)
        for i in range(cm.shape[0]):
            for j in range(cm.shape[1]):
                ax.text(j, i, format(cm[i, j], 'd'),
                        ha="center", va="center",
                        color="white" if cm[i, j] > cm.max() / 2. else "black")
    else:
        disp = ConfusionMatrixDisplay(confusion_matrix=cm)
        disp.plot(cmap=plt.cm.Blues, ax=ax)

    # Customize plot
    plt.title(ttl)
    plt.show()

In [None]:
def display_eval_graphs(ttl, epochs, trn_acc, val_acc, trn_loss, val_loss, xlab, ylab_acc, ylab_loss):
   
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(8, 3))    
    fig.suptitle(ttl)
    
    ax1.plot(epochs, trn_acc, label='training accuracy')
    ax1.plot(epochs, val_acc, label='validation accuracy')
    #plt.plot  ( epochs, accuracy, label = 'training accuracy' )
    #plt.plot  ( epochs, val_accuracy, label = 'validation accuracy' )
    ax1.set_title(ylab_acc)
    ax1.set_xlabel(xlab)
    ax1.set_ylabel(ylab_acc)
    ax1.legend(loc = 'best')

    ax2.plot(epochs, trn_loss, label='training loss')
    ax2.plot(epochs, val_loss, label='validation loss')
    ax2.set_title(ylab_loss)
    ax2.set_xlabel(xlab)
    ax2.set_ylabel(ylab_loss)
    fig.subplots_adjust(wspace=0.5)
    ax2.legend(loc = 'best')

    plt.show()

In [None]:
def get_data_set_dimensions(contained_array, data):
    x_shape = data.shape
    end_of_range = len(x_shape)
    #print("end_of_range:", end_of_range)

    dim = ""
    if end_of_range == 4:
        if x_shape[3] == 3:
            dim = "RGB images:"
        else:
            dim = "3D images: "
        dim = dim + str(x_shape[1]) + "x" + str(x_shape[2]) + "x" + str(x_shape[3])
    elif end_of_range == 3:
        dim = "2D images: " + str(x_shape[1]) + "x" + str(x_shape[2])
    if (dim != ""):
        print(f"  {contained_array}: {x_shape[0]} {dim}")

    return end_of_range

In [None]:
def flatten_data(contained_array, data, end_of_range):
    data_shape = data.shape
    
    if (end_of_range > 2):
        # Handle 2D, 3D, ... nD image dimensions
        flat_sz = 1
        for ix in range(1, end_of_range):
            flat_sz = flat_sz * data_shape[ix]
        #print("flat_sz:", flat_sz)
        data = data.reshape(-1, flat_sz)
    else:
        # Convert label dimensions to 1D
        data = np.ravel(data, order='C')
    return(data)

## Load data then flatten labels, images are flattened later within LR model

In [None]:
def load_and_flatten(fn):
    npz_file = np.load(fn, allow_pickle=True)
    #print(fn, "arrays:", blood_npz.files)

    # Dictionaries to store flattened image data
    flat_arr = {}
    label_arr = {}
    # Dictionary to store images (unflattened)
    orig_arr = {}
    
    substr = "labels"

    for contained_array in npz_file.files:
        # Only labels need to be flattened.
        # Features (images) will be flattened by the Logistic Regression model
        end_of_range = get_data_set_dimensions(contained_array, npz_file[contained_array])
        match = re.search(substr, contained_array)
        if (match):
            #print(f"  BEFORE: {contained_array}: {npz_file[contained_array].shape}")
            arr = flatten_data(contained_array, npz_file[contained_array], end_of_range)
            label_arr[contained_array] = arr
            #print(f"  {contained_array} (flattened): {label_arr[contained_array].shape}")
        else:
            orig_arr[contained_array] = npz_file[contained_array]
    
    return npz_file, orig_arr, label_arr

## Build a Keras Logistic Regression model using a single Dense layer

In [None]:
def build_model(n_classes,
                img_dim,
                hidden_layer_sizes=[],
                activation='relu',
                optimizer='adam',
                learning_rate=0.01):

    tf.keras.backend.clear_session()
    np.random.seed(0)
    tf.random.set_seed(0)

    # Keras logistic regression has input and output layers with as a single Dense layer.
    # So hidden_layer will be 0 for this model

    hidden_layers = len(hidden_layer_sizes)
    ix = 0
    model = keras.Sequential()
    model.add(keras.Input(shape=img_dim)),
    model.add(Flatten())
    while(hidden_layers != 0):
        model.add(Dense(hidden_layer_sizes[ix], activation=activation))
        ix += 1
        hidden_layers -= 1
    model.add(Dense(n_classes, activation='softmax')) # Output layer with n-classes
    # Display the model summary to see the architecture
    model.summary()

    # Note:
    # categorical_crossentropy
    #   Expects the model’s output to be a tensor of predicted probabilities,
    #   typically produced by applying a SoftMax activation function (multi-class
    #   classification to the model’s final layer. The true labels are normally
    #   one-hot encoded vectors.
    # sparse_categorical_crossentropy
    #   A special case of the CategoricalCrossentropy loss function, where the
    #   labels are provided as integers instead of one-hot encoded vectors.
    # categorical_focal_crossentropy
    #   Loss function that is used for multi-class unbalanced classification tasks.
    #   An extension to the standard Cross Entropy that addresses the issue of class
    #   imbalance in certain classification tasks where the cross entropy is not enough.
    #   It is good for multiclass classification where some classes are easy and others
    #   difficult to classify.
    
    # Compile the model using the Adam optimizer
    if (optimizer == 'adam'):
        optimzer = Adam(learning_rate=learning_rate, beta_1=0.9, beta_2=0.999, epsilon=1e-8)
    model.compile(optimizer=optimizer,
                loss='sparse_categorical_crossentropy',
                #loss='categorical_crossentropy',
                metrics=['accuracy'],
            )
    return model

In [None]:
def process_file(file_to_load, fn):

    #result_dict = {}
    csv_data = [[]]
    print("\n-------------------------------------------------------")
    print(fn)
    npz_file, original_imgs, labels = load_and_flatten(file_to_load)
    feature_keys = list(original_imgs.keys())
    #print(feature_keys)
    label_keys = list(labels.keys())
    #print(label_keys)
    
    img_shape = original_imgs[feature_keys[0]].shape
    img_dim = []
    for ix in range(1, len(img_shape)):
        img_dim.append(img_shape[ix])
    #print("img_dim:", img_dim)

    X_train = original_imgs[feature_keys[0]]
    y_train = labels[label_keys[0]]
    X_test = original_imgs[feature_keys[2]]
    y_test = labels[label_keys[2]]

    # Find number of unique classes within data set
    class_cnt, value_cnts = np.unique(labels[label_keys[0]], return_counts=True)
    #print(f"class_cnt: {class_cnt}")
    #print(f"value_cnts: {value_cnts}")
    n_classes = len(class_cnt)
    #print(f"n_classes: {n_classes}")

    # Display the first image and label in the training data
    print("\nLabel: ", y_train[0])
    if len(img_dim) < 3 or img_dim[2] == 3:
        #img_arr = original_imgs[feature_keys[0]]
        img = X_train[0]
        plt.imshow(img)
        plt.axis('off')
        plt.show()

    lrate = [0.1, 0.01, 0.001]
    #lrate = [0.01]
    #epch = [25, 50, 100, 150]
    epch = [25, 50, 100]

    # Build model and train with a variety of learning rates and epochs for each data set
    for learning_rate in lrate:
        
        for num_epochs in epch:

            # Build the Feed Forward Neural Network Model
            kerasFFNN = build_model(n_classes, img_dim, [32, 64, 128], 'relu', 'adam', learning_rate)
            
            ttl = fn + ": Epochs " + str(num_epochs) + ", LR " + str(learning_rate)
            # Train the Logistic Regression model.
            print(f'\nTraining... {ttl}')
            start = time.time()
            history = kerasFFNN.fit(
                x=X_train,
                y=y_train,
                epochs=num_epochs,
                batch_size=64,
                validation_split=0.1,
                validation_data=(X_test, y_test),
                verbose=0)
            trn_elapsed = time.time() - start

            train_accuracy = history.history['accuracy']
            train_loss = history.history['loss']
            v_accuracy = history.history['val_accuracy']
            v_loss = history.history['val_loss']
            
            epoch_cnt   = range(len(train_accuracy)) # Get number of epochs
            display_eval_graphs(ttl, epoch_cnt, train_accuracy, v_accuracy, train_loss, v_loss, "Epochs", "Accuracy", "Loss")

            # Evaluate the model and determine final accuracy
            start = time.time()
            loss, test_accuracy = kerasFFNN.evaluate(x=X_test, y=y_test, verbose=0)
            eval_elapsed = time.time() - start
        
            print(f"Training time: {round(trn_elapsed, 2)} seconds")
            print(f"Evaluation time: {round(eval_elapsed, 2)} seconds")
            print(f"Accuracy: {round(test_accuracy * 100, 3)}, Loss: {round(loss, 5)}, Epochs: {num_epochs}, LR {learning_rate}")
            #run_result = { 'lr' : learning_rate, 'ep' : num_epochs, 'acc' : test_accuracy, 'loss' : loss, \
            #                       'ttime' : trn_elapsed }

            y_test_proba = kerasFFNN.predict(X_test, batch_size=64, verbose=0)
            # For classification: Convert probabilities to class labels
            y_test_predict = np.argmax(y_test_proba, axis=1)
            #print("Predictions (raw probabilities):", y_test_proba.shape)
            #print("Predicted Classes:", y_test_predict.shape)
            #for xx in range(15):
            #    print(f"{xx}: prob: {y_test_proba[xx]}")
            #for xx in range(15):
            #    print(f"{xx}: pred: {y_test_predict[xx]}\ttrue: {y_test[xx]}")                 
            display_confusion_matrix(y_test, y_test_predict)

            csv_row = np.array([fn, learning_rate, num_epochs, round(test_accuracy * 100, 2), \
                                round(loss, 5), round(trn_elapsed, 2)])
            csv_data.append(csv_row)

            %reset_selective -f history.history, train_accuracy, v_accuracy, train_loss, v_loss

    npz_file.close()
    
    return csv_data

In [None]:
filecol = 'fn'
lr = 'lr'
eps = 'epochs'
ac = 'accuracy'
ls = 'loss'
tt = 'train time'

def store_results(create_file, outfile, csv_data):
    # Add the CSV data to a new or existing CSV file
    df = pd.DataFrame(csv_info, columns=[filecol, lr, eps, ac, ls, tt])
    if (create_file):
        # write to file (create or overwrite)
        #print(f"Storing result in {outfile}")
        df.to_csv(outfile, index=False)
        create_file = False
    else:
        # append to file
        df.to_csv(outfile, mode='a', index=False, header=False)
    return create_file

In [None]:
def display_best(best_acc_indx, best_loss_indx):
    print(f"        best accuracy: Index {best_acc_indx}, least loss: Index {best_loss_indx}")

def display_row(index, row):
    tbs = '\t\t'
    if row[ls] >= 10.0:
        tbs = '\t'
    print(f"  {index}\t{row[lr]:.3f}\t\t{int(row[eps]):03}\t{row[ac]}\t\t{row[ls]:.5f}{tbs}{row[tt]} secs")

def display_results(csv_file):

    #print(f"Processing {outfile}")

    df = pd.read_csv(outfile)
    
    file_nm = ""
    best_acc = 0.0
    best_loss = 100.0
    best_acc_indx = 0
    best_loss_indx = 0
    first_file = True

    print("\nFeed Forward Neural Network Comparison by Filename")
    for index, row in df.iterrows():
        #print(f"Index {index}: {row}")
        if not np.isnan(row['lr']):
            if (row[filecol] != file_nm):
                if (file_nm != ""):
                    display_best(best_acc_indx, best_loss_indx)
                    first_file = False
                #print(f"file_nm: {file_nm}, row[filecol]: {row[filecol]}")
                file_nm = row[filecol]
                print(f"\n{file_nm]}")
                print(f"  index\t{lr}\t\t{eps}\t{ac}\t{ls}\t\t{tt}")
                display_row(index, row)
                best_acc = 0.0
                best_loss = 100.0
            else:
                display_row(index, row)
                
            if row[ac] > best_acc:
                best_acc = row[ac]
                best_acc_indx = index
            if row[ls] < best_loss:
                best_loss = row[ls]
                best_loss_indx = index
    # for last file
    display_best(best_acc_indx, best_loss_indx)

In [None]:
# NOTE: 'chestmnist.npz', # image, label size length mismatch, not used here
#files = ['bloodmnist.npz', 'breastmnist.npz', 'dermamnist.npz', 'octmnist.npz', 'organsmnist.npz', \
#         'nodulemnist3d.npz', 'organmnist3d.npz', 'retinamnist_128.npz']
files = ['bloodmnist.npz', 'breastmnist.npz']
#files = ['breastmnist.npz']

start_run = time.time()

rslt = {}
first_write = True

dtmin = get_time_with_minutes(get_localdate_str())
outfile = "FFNN-" + dtmin +".csv"

for fn in files:
    file_to_load = "G:/Continuing Education/Stanford/TECH 27/Project/" + fn
    csv_info = process_file(file_to_load, fn)

    first_write = store_results(first_write, outfile, csv_info)

end_run = time.time() - start_run

display_results(outfile)

if end_run > 180.0:
    end_run_min = end_run / 60.0
    end_run = end_run - end_run_min * 60.0
    print(f"\nDONE: Total Run Time: {int(end_run_min)} minutes : {round(end_run, 2)} seconds")
else:
    print(f"\nDONE: Total Run Time: {round(end_run, 2)} seconds")