In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.optimizers import Adam, Adamax
from tensorflow.keras.metrics import categorical_crossentropy
from tensorflow.keras import regularizers
from tensorflow.keras.models import Model, load_model
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from matplotlib.pyplot import imshow
import seaborn as sns
sns.set_style('darkgrid')
from sklearn.metrics import confusion_matrix, classification_report
from IPython.display import display, HTML

In [None]:
path=r'G:\Dataset-image-Eye_Data_New'
path_list=os.listdir(path)
classes=[]
file_Paths=[]
labels=[]

for i in path_list:
    class_path=os.path.join(path,i)
    if os.path.isdir(class_path):
        if i !=".git":
            classes.append(i)
class_count=len(classes)
classes

In [None]:
for j in classes:
    class_path=os.path.join(path,j)
    image_names=os.listdir(class_path)
    for k in image_names:
        image_path=os.path.join(class_path,k)
        file_Paths.append(image_path)
        labels.append(j)
file_Paths[:10]

In [None]:
file_series=pd.Series(file_Paths,name='file_path')
label_series=pd.Series(labels,name='label')
data_df=pd.concat([file_series,label_series],axis=1)
print(data_df.head(10))
balance_df=data_df['label'].value_counts()
balance_df

In [None]:
train_split=.8
test_split=.1
dummy_split=test_split/(1-train_split)
train_df,dummy_df=train_test_split(data_df,train_size=train_split,shuffle=True,random_state=125)
test_df,valid_df=train_test_split(dummy_df,train_size=dummy_split,shuffle=True,random_state=125)
batch_size=16
length=len(test_df)
print(f"Train Size: {len(train_df)}\nTest size: {len(test_df)}\nValid size: {len(valid_df)}")


In [None]:
def scalar(x):
    return x/127.5-1
trgen=tf.keras.preprocessing.image.ImageDataGenerator(preprocessing_function=scalar, horizontal_flip=True)
train_gen=trgen.flow_from_dataframe(train_df, x_col='file_path', y_col='label', target_size=(448,448), class_mode='categorical',batch_size=batch_size, shuffle=True, seed=123)
tvgen=tf.keras.preprocessing.image.ImageDataGenerator(preprocessing_function=scalar)
valid_gen=tvgen.flow_from_dataframe(valid_df, x_col='file_path', y_col='label', target_size=(448,448), class_mode='categorical',
                                   batch_size=batch_size, shuffle=False)
test_batch_size=sorted([int(length/n) for n in range(1,length+1) if length % n ==0 and length/n<=batch_size],reverse=True)[0]
test_steps=int(length/test_batch_size)
test_gen=tvgen.flow_from_dataframe(test_df, x_col='file_path', y_col='label', target_size=(448,448), class_mode='categorical',
                                   batch_size=test_batch_size, shuffle=False)
test_labels=test_gen.labels

In [None]:
def show_training_samples(gen):
    class_dict=gen.class_indices
    new_dict={}
    for key, value in class_dict.items(): 
        new_dict[value]=key
    images,labels=next(gen) 
    plt.figure(figsize=(15, 15))
    length=len(labels)
    if length<25: 
        r=length
    else:
        r=25
    for i in range(r):
        plt.subplot(5, 5, i + 1)
        image=(images[i]+1 )/2 
        plt.imshow(image)
        index=np.argmax(labels[i])
        class_name=new_dict[index]
        plt.title(class_name, color='blue', fontsize=16)
        plt.axis('off')
    plt.show()

In [None]:
img_shape=(448,448,3)
neurons=512
dropout=.3
lr=.001
freeze=True
base_model=tf.keras.applications.VGG19(include_top=False, input_shape=img_shape, pooling='max', weights='imagenet')
if freeze:
    base_model.trainable=False
x=base_model.output
x=tf.keras.layers.BatchNormalization(axis=-1, momentum=0.99, epsilon=0.001 )(x)
x =tf.keras.layers.Dense(neurons, kernel_regularizer = regularizers.l2(0.016),activity_regularizer=regularizers.l1(0.006),
                bias_regularizer=regularizers.l1(0.006) ,activation='relu', kernel_initializer= tf.keras.initializers.GlorotUniform(seed=123))(x)
x=tf.keras.layers.Dropout(rate=dropout, seed=123)(x)
output=tf.keras.layers.Dense(class_count, activation='softmax',kernel_initializer=tf.keras.initializers.GlorotUniform(seed=123))(x)
model=Model(inputs=base_model.input, outputs=output)
model.compile(Adamax(learning_rate=lr), loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
def print_in_color(txt_msg,fore_tupple,back_tupple,):
    rf,gf,bf=fore_tupple
    rb,gb,bb=back_tupple
    msg='{0}' + txt_msg
    mat='\33[38;2;' + str(rf) +';' + str(gf) + ';' + str(bf) + ';48;2;' + str(rb) + ';' +str(gb) + ';' + str(bb) +'m'
    print(msg .format(mat), flush=True)
    print('\33[0m', flush=True)
    return

In [None]:
class LRA(keras.callbacks.Callback):
    def __init__(self, patience, stop_patience, threshold, factor, dwell, model_name, freeze, end_epoch):
        super(LRA, self).__init__()
        self.patience = patience
        self.stop_patience = stop_patience
        self.threshold = threshold
        self.factor = factor
        self.dwell = dwell
        self.lr = 0
        self.highest_tracc = 0.0
        self.lowest_vloss = np.inf
        self.count = 0
        self.stop_count = 0
        self.end_epoch = end_epoch
        self.best_weights = None
        msg = ' '
        if freeze:
            msgs = f' Starting training using base model {model_name} with weights frozen to imagenet weights initializing LRA callback'
        else:
            msgs = f' Starting training using base model {model_name} training all layers '
        print_in_color(msgs, (244, 252, 3), (55, 65, 80))

    def set_model(self, model):
        self.model_ = model
        self.lr = float(tf.keras.backend.get_value(model.optimizer.learning_rate))
        self.best_weights = self.model_.get_weights()

    def on_epoch_begin(self, epoch, logs=None):
        if epoch != 0:
            msgs = f'for epoch {epoch} '
            msgs = msgs + LRA.msg
            print_in_color(msgs, (255, 255, 0), (55, 65, 80))

    def on_epoch_end(self, epoch, logs=None):
        lr = float(tf.keras.backend.get_value(self.model_.optimizer.learning_rate))
        v_loss = logs.get('val_loss')
        acc = logs.get('accuracy')

        if acc < self.threshold:
            if acc > self.highest_tracc:
                LRA.msg = f' training accuracy improved from  {self.highest_tracc:7.4f} to {acc:7.4f} learning rate held at {lr:10.8f}'
                self.highest_tracc = acc
                LRA.best_weights = self.model_.get_weights()
                self.count = 0
                self.stop_count = 0
                if v_loss < self.lowest_vloss:
                    self.lowest_vloss = v_loss
            else:
                if self.count >= self.patience - 1:
                    self.lr = lr * self.factor
                    self.model_.optimizer.learning_rate.assign(self.lr)
                    self.count = 0
                    self.stop_count = self.stop_count + 1
                    if self.dwell:
                        self.model_.set_weights(LRA.best_weights)
                    else:
                        if v_loss < self.lowest_vloss:
                            self.lowest_vloss = v_loss
                    msgs = f' training accuracy {acc:7.4f} < highest accuracy of {self.highest_tracc:7.4f} '
                    LRA.msg = msgs + f' for {self.patience} epochs, lr adjusted to {self.lr:10.8f}'
                else:
                    self.count = self.count + 1
                    LRA.msg = f' training accuracy {acc:7.4f} < highest accuracy of {self.highest_tracc:7.4f} '
        else:
            if v_loss < self.lowest_vloss:
                msgs = f' validation loss improved from {self.lowest_vloss:8.5f} to {v_loss:8.5}, saving best weights'
                LRA.msg = msgs + f' learning rate held at {self.lr:10.8f}'
                self.lowest_vloss = v_loss
                LRA.best_weights = self.model_.get_weights()
                self.count = 0
                self.stop_count = 0
            else:
                if self.count >= self.patience - 1:
                    self.lr = self.lr * self.factor
                    self.stop_count = self.stop_count + 1
                    msgs = f' val_loss of {v_loss:8.5f} > {self.lowest_vloss:8.5f} for {self.patience} epochs'
                    LRA.msg = msgs + f', lr adjusted to {self.lr:10.8f}'
                    self.count = 0
                    self.model_.optimizer.learning_rate.assign(self.lr)
                    if self.dwell:
                        self.model_.set_weights(LRA.best_weights)
                else:
                    self.count = self.count + 1
                    LRA.msg = f' validation loss of {v_loss:8.5f} > {self.lowest_vloss:8.5f}'
                if acc > self.highest_tracc:
                    self.highest_tracc = acc

        if epoch == self.end_epoch:
            print_in_color(LRA.msg, (255, 255, 0), (55, 65, 80))

        if self.stop_count > self.stop_patience - 1:
            LRA.msg = f' training has been halted at epoch {epoch + 1} after {self.stop_patience} adjustments of learning rate with no improvement'
            print_in_color(LRA.msg, (0, 255, 0), (55, 65, 80))
            self.model_.stop_training = True


In [None]:
patience=1
stop_patience=4
threshold=.9
factor=.5
dwell=False
model_type='VGG19'
epochs=15
callbacks=[LRA(patience=patience,stop_patience=stop_patience, threshold=threshold,
                   factor=factor,dwell=dwell, model_name=model_type, freeze=freeze, end_epoch=epochs - 1 )]

history=model.fit(x=train_gen,  epochs=epochs, verbose=1, callbacks=callbacks,  validation_data=valid_gen,
               validation_steps=None,  shuffle=False,  initial_epoch=0)

In [None]:
model_path = r'G:\Dataset-image-Eye_Data_New\Test\Copy of vgg19_model.h5'
try:
    model = load_model(model_path)
    print("Model loaded successfully.")
except Exception as e:
    print(f"Error loading model: {e}")

In [None]:
def display_eval_metrics(e_data):
    msg = 'Model Metrics after Training'
    print_in_color(msg, (255,255,0), (55,65,80))
    msg = '{0:^24s}{1:^24s}'.format('Metric', 'Value')
    print_in_color(msg, (255,255,0), (55,65,80))
    for key, value in e_data.items():
        print(f'{key:^24s}{value:^24.5f}')
    acc = e_data['accuracy'] * 100
    return acc

In [None]:
model_type = 'VGG19' 
e_dict = model.evaluate(test_gen, batch_size=test_batch_size, verbose=1, steps=test_steps, return_dict=True)
acc = display_eval_metrics(e_dict)
msg = f'Accuracy on the test set is {acc:5.2f} %'
preds = model.predict(test_gen, batch_size=test_batch_size, verbose=0, steps=None)
print_in_color(msg, (0,255,0), (55,65,80))

In [None]:
def print_info(test_gen, preds, print_code, save_dir, subject):
    class_dict = test_gen.class_indices
    labels = test_gen.labels
    file_names = test_gen.filenames
    error_list = []
    true_class = []
    pred_class = []
    prob_list = []
    new_dict = {}
    error_indices = []
    y_pred = []

    for key, value in class_dict.items():
        new_dict[value] = key             

    classes = list(new_dict.values())     
    dict_as_text = str(new_dict)

    dict_name = subject + '-' + model_type + '-' + str(len(classes)) + '.txt'
    dict_path = os.path.join(save_dir, dict_name)
    with open(dict_path, 'w') as x_file:
        x_file.write(dict_as_text)

    errors = 0
    for i, p in enumerate(preds):
        pred_index = np.argmax(p)
        true_index = labels[i] 
        if pred_index != true_index: 
            error_list.append(file_names[i])
            true_class.append(new_dict[true_index])
            pred_class.append(new_dict[pred_index])
            prob_list.append(p[pred_index])
            error_indices.append(true_index)
            errors = errors + 1
        y_pred.append(pred_index)

    if print_code != 0:
        if errors > 0:
            if print_code > errors:
                r = errors
            else:
                r = print_code
            msg = '{0:^28s}{1:^28s}{2:^28s}{3:^16s}'.format('Filename', 'Predicted Class', 'True Class', 'Probability')
            print_in_color(msg, (0,255,0), (55,65,80))
            for i in range(r):
                msg = '{0:^28s}{1:^28s}{2:^28s}{3:4s}{4:^6.4f}'.format(
                    error_list[i], pred_class[i], true_class[i], ' ', prob_list[i]
                )
                print_in_color(msg, (255,255,255), (55,65,60))
        else:
            msg = 'With accuracy of 100 % there are no errors to print'
            print_in_color(msg, (0,255,0), (55,65,80))

    if errors > 0:
        plot_bar = []
        plot_class = []
        for key, value in new_dict.items():
            count = error_indices.count(key)
            if count != 0:
                plot_bar.append(count) 
                plot_class.append(value)   

        fig = plt.figure()
        fig.set_figheight(len(plot_class) / 3)
        fig.set_figwidth(10)
        plt.style.use('fivethirtyeight')
        for i in range(len(plot_class)):
            c = plot_class[i]
            x = plot_bar[i]
            plt.barh(c, x)
            plt.title(' Errors by Class on Test Set')

        plt.savefig(f"Errors_by_Class_{model_type}.png", bbox_inches='tight')

    if len(classes) <= 20:
        y_true = np.array(labels)
        y_pred = np.array(y_pred)
        cm = confusion_matrix(y_true, y_pred)
        clr = classification_report(y_true, y_pred, target_names=classes)
        length = len(classes)

        if length < 8:
            fig_width = 8
            fig_height = 8
        else:
            fig_width = length
            fig_height = length

        plt.figure(figsize=(fig_width, fig_height))
        sns.heatmap(cm, annot=True, vmin=0, fmt='g', cmap='Blues', cbar=False)
        plt.xticks(np.arange(length) + .5, classes, rotation=90)
        plt.yticks(np.arange(length) + .5, classes, rotation=0)
        plt.xlabel("Predicted")
        plt.ylabel("Actual")
        plt.title("Confusion Matrix")
        plt.savefig(f"confusion_matrix_{model_type}.png", bbox_inches='tight')
        plt.show()
        print("Classification Report:\n----------------------\n", clr)

print_code = 20
print_info(test_gen, preds, print_code, save_dir, subject)