In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight
from sklearn.utils import shuffle
import os
import cv2
from glob import glob
from tqdm import tqdm
import time

import random
import numpy as np
import tensorflow as tf
from tensorflow.keras.optimizers import Adam

In [None]:
save_path='/model_file/ldfgnet.hdf5'
data_dir='./dataset/kvasir5class'

In [None]:
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split


def apply_clahe(img):
    lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    l_clahe = clahe.apply(l)
    lab_clahe = cv2.merge((l_clahe, a, b))
    img_clahe = cv2.cvtColor(lab_clahe, cv2.COLOR_LAB2BGR)
    return img_clahe

def load_split_dataset(data_dir, img_size=(128,128), test_size=0.2, val_size=0.1, random_state=42):

    class_names = sorted([d for d in os.listdir(data_dir)
                         if os.path.isdir(os.path.join(data_dir, d))])

    images = []
    labels = []

    # Load images and preserve original labels tqdm(iterable)
    for class_idx, class_name in enumerate(tqdm(class_names)):
        class_path = os.path.join(data_dir, class_name)
        for img_file in os.listdir(class_path):
            if img_file.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
                img_path = os.path.join(class_path, img_file)
                img = cv2.imread(img_path)
                img = apply_clahe(img)
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                # Resize and store
                img = cv2.resize(img, img_size)
                images.append(img)
                labels.append(class_idx)  # Integer label
        time.sleep(0.05)    
        
    # # Convert to numpy arrays
    x = np.array(images)
    y = np.array(labels)
#     y = np.expand_dims(y, axis=-1)

    # # First split: separate test set
    x_train_val, x_test, y_train_val, y_test = train_test_split(x, y, test_size=test_size, random_state=42, stratify=y)

    # Second split: separate validation from training
    val_size_adjusted = val_size / (1 - test_size)
    x_train, x_val, y_train, y_val = train_test_split(x_train_val, y_train_val, test_size=val_size_adjusted,random_state=42, stratify=y_train_val)

    # # Print summary
    print("\nData loaded successfully:")
    print(f"- Total images: {len(x)}")
    print(f"- Classes: {class_names}")
    print(f"\nSplit sizes:")
    print(f"- Training: {len(x_train)} images")
    print(f"- Validation: {len(x_val)} images")
    print(f"- Test: {len(x_test)} images")

    return x_train, x_val, x_test, y_train, y_val, y_test, class_names


img_size=(128,128)
train_size=0.7
test_size=0.2
val_size=0.1

# # Load and split the data (60% train, 20% val, 20% test)
x_train, x_val, x_test, y_train, y_val, y_test, class_names = load_split_dataset(
    data_dir,img_size=(128,128),test_size=0.2,val_size=0.1,random_state=42)

np.save("x_test5c.npy", x_test)
np.save("y_test5c.npy", y_test)
print("\n")
print("The test dataset saved as .npy file")

print('x_train shape ', x_train.shape)
print('y_train shape ', y_train.shape)

print('x_val shape ', x_val.shape)
print('y_val shape ', y_val.shape)

print('x_test shape ', x_test.shape)
print('y_test shape ', y_test.shape)


In [None]:
# performed for HyperKvasir Dataset
#  Calculate class weights for the imbalanced dataset
# Use the original integer labels (y_train) for this
from sklearn.utils import class_weight
class_weights_array = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train),
    y=y_train
    
)
class_weights = dict(enumerate(class_weights_array))

print("\nCalculated Class Weights:")
for name, weight in zip(class_names, class_weights.values()):
    print(f"  - {name}: {weight:.2f}")

In [None]:
import os
import cv2
import numpy as np
from glob import glob
from sklearn.utils import shuffle
from tensorflow.keras.utils import to_categorical


# Currently Augmentation done on  Rotate, HorizontalFlip, VerticalFlip and Grayscale

# !pip install albumentations==1.3.0 numpy==1.21.5 typing_extensions==4.5.0     # tf gpu 2

from tqdm import tqdm
import cv2
from albumentations import (
    Affine,
    HorizontalFlip,
    VerticalFlip,
    CenterCrop,
    Crop,
    Compose,
    Transpose,
    RandomRotate90,
    RandomSizedCrop,
    RandomBrightnessContrast,
    RandomGamma,
    HueSaturationValue,
    RGBShift,
    RandomBrightness,
    RandomContrast,
    GaussianBlur,
    GaussNoise

)

def augment_data(x_train, y_train, augment=True):
#     """ Performing data augmentation. """
#     crop_size = (192-32, 256-32)
#     size = (256, 256)
    
    x_train_aug_list = []
    y_train_aug_list = []
    
    for idx in tqdm(range(len(x_train)), desc="Augmenting images"):
        x = x_train[idx]
        label = y_train[idx]

        # ensure image is uint8 for cv2/albumentations
        if x.dtype != np.uint8:
            x = x.astype(np.uint8)

        if augment == True:
            ## Random Rotate 90 degree
            aug = Affine(rotate=45, p=1)
            augmented = aug(image=x)   
            x1 = augmented['image']
  	    ## Grayscale Vertical Flip
            aug = VerticalFlip(p=1)
            augmented = aug(image=x)
            x2 = augmented['image']

            ## Grayscale Horizontal Flip
            aug = HorizontalFlip(p=1)
            augmented = aug(image=x)
            x3 = augmented['image']

            ## Transpose
            aug = Transpose(p=1)
            augmented = aug(image=x)
            x4 = augmented['image']

            ## Grayscale
            x5 = cv2.cvtColor(x, cv2.COLOR_RGB2GRAY)
            x5 = cv2.cvtColor(x5, cv2.COLOR_GRAY2RGB)
#             print(x6.shape)
            aug = GaussNoise(p=1)
            augmented = aug(image=x)
            x6 = augmented['image']

            aug = RandomBrightness(p=1)
            augmented = aug(image=x)
            x7 = augmented['image']
   
            aug = RandomContrast(p=1)
            augmented = aug(image=x)
            x8 = augmented['image']

            image = [x, x1, x2, x3, x4, x5, x6, x7, x8]
            size = (128, 128)  # target size

            rimage = [cv2.resize(img, size) for img in image]
        else:
            image = [x]
            rimage = [cv2.resize(img, size) for img in image]
            
        x_train_aug_list.extend(rimage)  # flatten into main list
        y_train_aug_list.extend([label]*len(rimage))  # repeat label 10 times
   
    x_train_aug = np.array(x_train_aug_list)
    y_train_aug = np.array(y_train_aug_list)
    
    return [x_train_aug, y_train_aug]  

def train_shuffling(x, y):
    x, y = shuffle(x, y, random_state=42)
    return x, y

[x_train_aug, y_train_aug] = augment_data(x_train,y_train,augment=True)
x_train_aug = (x_train_aug/255.0).astype(np.float32)

y_train_aug = to_categorical(y_train_aug, num_classes=len(class_names))

(x_train_aug, y_train_aug) = train_shuffling(x_train_aug, y_train_aug)
print("finished")

x_val = (x_val/255.0).astype(np.float32)
y_val = to_categorical(y_val, num_classes=len(class_names))

x_test = x_test/255.0
y_test = to_categorical(y_test, num_classes=len(class_names))

print("\nData Augumentation successfully done !:")
# print(f"- Total images: {len(x)}")
# print(f"- Classes: {classes}")
# print(f"\nSplit sizes:")
print(f"- Training: {len(x_train)} images")

# print(x_train.shape)
# print(y_train.shape)

print('Augumented x_train shape ', x_train_aug.shape)
print('Augumented y_train shape ', y_train_aug.shape)

print('x_val shape ', x_val.shape)
print('y_val shape ', y_val.shape)

print('x_test shape ', x_test.shape)
print('y_test shape ', y_test.shape)

In [None]:
import matplotlib.pyplot as plt

# show first 5 augmented images
plt.figure(figsize=(12, 6))

for i in range(10):
    plt.subplot(1, 10, i + 1)
    plt.imshow(x_train_aug[i])
    plt.axis("off")
#     plt.title(f"Label: {y_train_aug[i]}")
plt.show()

In [None]:
# LDFGastro-Net  ##################################
# Code to be updated Soon !!!!



















# Code to be updated soon !!!

In [None]:
print("\n Compiling the model...")
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss=tf.keras.losses.CategoricalCrossentropy(),
    metrics=[tf.keras.metrics.CategoricalAccuracy(name="accuracy")])

In [None]:
batch_size=16
import time
checkpoint = tf.keras.callbacks.ModelCheckpoint(save_path, monitor='val_accuracy',verbose=1, save_best_only=True)
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_accuracy', factor=np.sqrt(0.1), patience=5, min_lr=0.5e-6, verbose=1)
early_stop =  tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=15)
callbacks = [checkpoint,reduce_lr,early_stop]

trstart=time.time()

history = model.fit(x_train_aug, y_train_aug, validation_data=(x_val,y_val), batch_size=batch_size, epochs=100, callbacks=callbacks, verbose=1)

# history = model.fit(x_train_aug, y_train_aug, validation_data=(x_val,y_val), batch_size=batch_size, epochs=100, callbacks=callbacks, class_weight=class_weights, verbose=1)

trstop=time.time()

In [None]:
import matplotlib.pyplot as plt
import numpy as np
print(history.history.keys())

plt.subplot(1,2,1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Accuracy Plot')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
# plt.show()
# summarize history for loss
plt.subplot(1,2,2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Loss Plot')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()


In [None]:

org_class_indices = np.argmax(y_test,axis=-1)
print('Original Label :')
print(org_class_indices)
print('\n')


tsstart = time.time()
pred =model.predict(x_test, verbose = 1)
pred_class_indices = np.argmax(pred,axis=-1)
print('Predicted Labels :')
print(pred_class_indices)
print('\n')

tsstop = time.time()
print('Inference time : ', tsstop-tsstart, ' sec')
print('\n')
tput=x_test.shape[0]/(tsstop-tsstart)
print('Throughput : ',tput,' fps')
#=======================================================================================================================================

from sklearn.metrics import precision_score,recall_score,accuracy_score, f1_score
print('Precision :  {}'.format(precision_score(y_true=org_class_indices, y_pred=pred_class_indices,average = 'macro')))
print('Recall    :  {}'.format(recall_score(y_true=org_class_indices, y_pred=pred_class_indices,average='macro')))
print('Accuracy  :  {}'.format(accuracy_score(y_true=org_class_indices, y_pred=pred_class_indices)))
print('F1 Score  :  {}'.format(f1_score(y_true=org_class_indices, y_pred=pred_class_indices,average = 'macro')))

from sklearn.metrics import confusion_matrix
matrix=confusion_matrix(y_true=org_class_indices, y_pred=pred_class_indices)
print('\n')
print('Confusion Matrix  : ')
print(matrix)

#========================================================================================================================================


In [None]:
# best_path="./model_file/kv_denh.hdf5"
import tensorflow as tf
import time
loaded_model=tf.keras.models.load_model(best_path)

org_class_indices = np.argmax(y_test,axis=-1)
print('Original Label :')
print(org_class_indices)
print('\n')


tsstart = time.time()
pred =model.predict(x_test, verbose = 1)
pred_class_indices = np.argmax(pred,axis=-1)
print('Predicted Labels :')
print(pred_class_indices)
print('\n')

tsstop = time.time()
print('Inference time : ', tsstop-tsstart, ' sec')
print('\n')
tput=x_test.shape[0]/(tsstop-tsstart)
print('Throughput : ',tput,' fps')

#========================================================================================================================================

from sklearn.metrics import precision_score,recall_score,accuracy_score, f1_score
print('Precision :  {}'.format(precision_score(y_true=org_class_indices, y_pred=pred_class_indices,average = 'macro')))
print('Recall    :  {}'.format(recall_score(y_true=org_class_indices, y_pred=pred_class_indices,average='macro')))
print('Accuracy  :  {}'.format(accuracy_score(y_true=org_class_indices, y_pred=pred_class_indices)))
print('F1 Score  :  {}'.format(f1_score(y_true=org_class_indices, y_pred=pred_class_indices,average = 'macro')))

from sklearn.metrics import confusion_matrix
matrix=confusion_matrix(y_true=org_class_indices, y_pred=pred_class_indices)
print('\n')
print('Confusion Matrix  : ')
print(matrix)
#========================================================================================================================================