In [None]:
import shutil
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import os
import pandas as pd
import cv2
from keras import Sequential
from skimage import exposure
import seaborn as sns
from tensorflow import keras
from keras import layers
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
from keras.preprocessing.image import ImageDataGenerator
from skimage import filters
from scipy.fft import fftn, fftshift


In [None]:
def count_image(pdfile, column):
    classes = sorted(list(pdfile[column].unique()))
    groups = pdfile.groupby(column)
    print('{0:^12s} {1:^12s}'.format('CLASS', 'IMAGE COUNT'))
    count_list = []
    label_list = []
    for i in classes:
        group = groups.get_group(i)
        count_list.append(len(group))
        label_list.append(i)
        print('{0:^12s} {1:^12s}'.format(i, str(len(group))))
    max_value = np.max(count_list)
    max_index = count_list.index(max_value)
    max_class = label_list[max_index]
    min_value = np.min(count_list)
    min_index = count_list.index(min_value)
    min_class = label_list[min_index]
    print(max_class, ' has the most images= ', max_value, ' ', min_class, ' has the least images= ', min_value)

def trim(df, max_samples, min_samples, column):
    df_copy = df.copy()
    groups = df_copy.groupby(column)
    trimmed_df = pd.DataFrame(columns= df_copy.columns)
    for label in df_copy[column].unique():
        group = groups.get_group(label)
        count = len(group)
        if count > max_samples:
            sampled_group = group.sample(n = max_samples, random_state = 123, axis = 0)
            trimmed_df = pd.concat([trimmed_df, sampled_group], axis= 0)
        elif count >= min_samples:
            sampled_group = group
            trimmed_df = pd.concat([trimmed_df, sampled_group], axis = 0)
    print(f"The number of max sample in any class is now: {max_samples}, \n "
          f"The number of min sample in any class is now: {min_samples}")
    return trimmed_df

def Data_augmentation(df, n, working_directory, img_size, column):
 
    df = df.copy()
    print(f"Initial length of the data: {len(df)}")
    aug_dir = os.path.join(working_directory, 'aug')
    if os.path.isdir(aug_dir):
        shutil.rmtree(aug_dir) # Delete the existing directory
    os.mkdir(aug_dir) # Create the new directory
    for label in df[column].unique():
        dir_path = os.path.join(aug_dir, label) # Create a new subdirectory for each label
        os.mkdir(dir_path)
    total = 0
    gen = ImageDataGenerator(horizontal_flip= True, rotation_range = 30,
                             brightness_range= [0.3, 0.8],
                             width_shift_range=0.2, height_shift_range= 0.2,
                             validation_split= 0.2, zoom_range= 0.3)
    groups = df.groupby(column)
    for label in df[column].unique():
        group = groups.get_group(label)
        count = len(group)
        if count < n:
            aug_img_count = 0
            Needed_to_be_augmented = n - count
            target_dir = os.path.join(aug_dir, label)
            aug_gen = gen.flow_from_dataframe(group, x_col= 'Data_file', y_col= None,
                                              target_size= img_size, class_mode= None, batch_size= 1,
                                              shuffle= False, save_to_dir= target_dir, save_prefix= 'aug',
                                              color_mode= 'rgb',save_format= 'jpg')
            while aug_img_count < Needed_to_be_augmented:
                images = next(aug_gen)
                aug_img_count += len(images)
            total += aug_img_count
    print(f"Total amount of augmentation: {total} images")
    
    aug_fpaths = []
    aug_labels = []
    classlist = os.listdir(aug_dir)
    for kclass in classlist:
        classpath = os.path.join(aug_dir, kclass)
        flist = os.listdir(classpath)
        for f in flist:
            fpath = os.path.join(classpath, f)
            aug_fpaths.append(fpath)
            aug_labels.append(kclass)
    Image_data = pd.Series(aug_fpaths, name= 'Data_file')
    Labels = pd.Series(aug_labels, name= 'Classification')
    aug_df  = pd.concat([Image_data, Labels], axis= 1)
    df = pd.concat([df, aug_df], axis= 0).reset_index(drop= True)
    return df

def Translate_data(df):
    Data = []
    Classes_list = ["Healthy", "Doubtful", "Minimal", "Moderate", "Severe"]
    Label = df.loc[:, 'Classification'].map({"Healthy": 0, "Doubtful": 1,
                                             "Minimal": 2, "Moderate": 3,
                                             "Severe": 4})
    for i in df['Data_file']:
        img = cv2.imread(i)
        Data.append(img)
        Data_1 = np.stack(Data, axis= 0)
    return Data_1,  Label

def Image_Preprocessing_1(data):
    # each filter we work with each line of filter in this code
    Filtered_data = []
    low = 0.03
    high = 0.3
    for i in range(data.shape[0]):
        img = data[i, :, :, :]
        gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        
        #DoG
        #img = cv2.equalizeHist(gray_img)
        #arr = np.zeros_like(img)
        #arr = difference_of_gaussians(img, low_sigma= 1.5, high_sigma= 15)
        #arr_combined_1 = exposure.rescale_intensity(arr, in_range=(0,1), out_range=(0, 1))
        #sobel
        #edges = filters.sobel(gray_img)
        
        #Hysteresis thresholding
        #edges = filters.sobel(gray_img)
        #hyst = filters.apply_hysteresis_threshold(edges, low, high)
        
        Filtered_data.append(gray_img)
    Filtered_data = np.stack(Filtered_data, axis= 0)
    return Filtered_data

def model4():
    model4 = Sequential()
    model4.add(layers.Conv2D(190, 5, activation= 'relu', input_shape= (224,224, 1)))
    model4.add(layers.Conv2D(30, 5, activation = 'relu'))
    model4.add(layers.BatchNormalization())
    model4.add(layers.MaxPool2D(2))
    model4.add(layers.Conv2D(130, 1, activation='relu'))
    model4.add(layers.BatchNormalization())
    model4.add(layers.MaxPool2D(2))
    model4.add(layers.Dropout(0.5))
    model4.add(layers.Flatten())
    model4.add(layers.Dense(5, activation='softmax'))
    model4.compile(optimizer=keras.optimizers.Adam(learning_rate= 1e-05),
                  loss=tf.keras.losses.CategoricalCrossentropy(),
                  metrics='accuracy')
    return model4

def show_image(data):
    t_dict = data.class_indices
    classes = list(t_dict.keys())
    images, labels = next(data)
    plt.figure(figsize= (20, 20))
    length = len(labels)
    if length < 10:
        r = length
    else:
        r = 10
    for i in range(r):
        plt.subplots(5, 5, i+1)
        image = images[i] /255
        plt.imshow(image)
        index = np.argmax(labels[i])
        class_name = classes[index]
        plt.title(class_name, color = 'blue', fontsize = 12, loc= 'center')
        plt.axis('off')
    plt.show()




    pass

In [None]:
base_dir = "D:/STUDY/IMG_PROCESS/Histogram"
train_path = os.path.join(base_dir,'train')
valid_path = os.path.join(base_dir,'val')
test_path = os.path.join(base_dir, 'test')
list_of_classes = ["Healthy", "Doubtful", "Minimal", "Moderate", "Severe"]

In [None]:
for d in [train_path,test_path,valid_path]:
    file_path = []
    labels = []
    classlist = os.listdir(d)
    for klass in classlist:
        intklass = int(klass)
        label = list_of_classes[intklass]
        classpath = os.path.join(d, klass)
        flist = os.listdir(classpath)
        for f in flist:
            fpath = os.path.join(classpath, f)
            file_path.append(fpath)
            labels.append(label)
    Data = pd.Series(file_path, name='Data_file')
    Classes = pd.Series(labels, name= 'Classification')
    pdf = pd.concat([Data, Classes], axis= 1)
    if d == test_path:
        test_pdf = pdf
    elif d == valid_path:
        val_pdf = pdf
    else:
        train_pdf = pdf
print(f"the length of train data {len(train_pdf)} \n "f"The length of test data {len(test_pdf)} \n "
      f"The length of val data {len(val_pdf)}")
print(train_pdf)

In [None]:
column = 'Classification'
count_image(train_pdf, column)
count_image(test_pdf, column)
count_image(val_pdf, column)
train_pdf_1 = trim(train_pdf, max_samples= 700, min_samples= 173, column= column)
count_image(train_pdf, column)
train_pdf_2 = Data_augmentation(path = r"D:/STUDY/IMG_PROCESS/Histogram")
print(train_pdf_2)
Data_train, label_data_train = Translate_data(train_pdf_2)
Data_test, label_data_test = Translate_data(test_pdf)
Data_val, label_data_val = Translate_data(val_pdf)
print(Data_train.shape)
Filtered_data = Image_Preprocessing_1(Data_train)
Filtered_data_val = Image_Preprocessing_1(Data_val)
Filtered_data_test = Image_Preprocessing_1(Data_test)

In [None]:
label_data_train = np.array(label_data_train)
label_data_test = np.array(label_data_test)
label_data_val = np.array(label_data_val)
unique_train, count_train = np.unique(label_data_train, return_counts= True)
print(np.asarray((unique_train, count_train)).T)
unique_test, count_test = np.unique(label_data_test, return_counts= True)
print(np.asarray((unique_test, count_test)).T)
label_data_train_new = tf.one_hot(label_data_train, 5)
label_data_test_new = tf.one_hot(label_data_test, 5)
label_data_val_new = tf.one_hot(label_data_val, 5)


In [None]:
model_1 = model4()
history_1= model_1.fit(Filtered_data, label_data_train_new, epochs = 20,verbose= 0, batch_size= 70,
                    validation_data=(Filtered_data_val, label_data_val_new)) 

In [None]:
from keras.models import load_model
model1 = load_model('Model.h5')

In [None]:
score = model1.evaluate(Filtered_data_test, label_data_test_new, verbose = 1)
print(f"The accuracy of evaluation: {score}")


In [None]:
y_predict = model1.predict(Filtered_data_test, verbose= 0, batch_size= 50)

In [None]:
y_predict = np.argmax(y_predict, axis = -1)
#Visualize the accuracy of training and validation
plt.plot(history_1.history['accuracy'])
plt.plot(history_1.history['val_accuracy'])
plt.title("Model accuracy")
plt.xlabel('Epoch')
plt.ylabel('accuracy')
plt.legend(['Train', 'Validation'], loc = 'upper left')
plt.show()

# Visualize the loss of training and validation
plt.plot(history_1.history['loss'])
plt.plot(history_1.history['val_loss'])
plt.title("Model loss")
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend(['Train', 'Validation'], loc = 'upper left')
plt.show()


In [None]:
Accuracy = accuracy_score(y_true=label_data_test,y_pred= y_predict)
print(f"The accuracy: {Accuracy}")
print(f"The classification report of the model: \n {classification_report(y_pred= y_predict, y_true= label_data_test)}")
y_predict = np.argmax(y_predict, axis= -1)
cm = confusion_matrix(label_data_test, y_predict)

In [None]:
y_predict = y_predict.reshape(-1,1)
from sklearn.preprocessing import LabelEncoder as le
## Get Class Labels
le = le()
le.fit_transform(y_predict)
labels = le.classes_
class_names = labels
print(class_names)
# Plot confusion matrix in a beautiful manner
fig = plt.figure(figsize=(10, 8))
ax= plt.subplot()
sns.heatmap(cm, annot=True, ax = ax, fmt = 'g') #annot=True to annotate cells
# labels, title and ticks
ax.set_xlabel('Predicted', fontsize=20)
ax.xaxis.set_label_position('bottom')
plt.xticks(rotation=90)

ax.xaxis.set_ticklabels(class_names, fontsize = 10)
ax.xaxis.tick_bottom()

ax.set_ylabel('True', fontsize=20)
ax.yaxis.set_ticklabels(class_names, fontsize = 10)
plt.yticks(rotation=0)
plt.title('Refined Confusion Matrix', fontsize=20)
plt.show()

In [None]:
from keras.models import save_model
#Save the model as h5 file
file_name = "Model_hysteresis.h5"
file_path = os.path.join(os.getcwd(), file_name)
save_model(model_1, file_path)
print("Saved the Model to the disk")