# keras model


## pylibs


In [1]:
import os

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import cv2
import random
import datetime
import numpy as np
import tensorflow as tf
from keras.models import Model
import matplotlib.pyplot as plt
from keras.optimizers import SGD, Adagrad
from keras.regularizers import l2
from keras.models import load_model
from keras import Sequential
from keras.callbacks import EarlyStopping
from keras.callbacks import TensorBoard
from keras.utils import to_categorical
from keras.preprocessing.image import ImageDataGenerator
from keras.layers import Dense, BatchNormalization, Input, Concatenate, GlobalAveragePooling2D

## Extra

In [2]:
def print_Color(Input: str, colors: list, print_END: str = "\n", advanced_mode: bool = False) -> None:
    """
    Prints colored text to the console using advanced terminal colors.

    Args:
        Input (str): The input string to be printed. In advanced mode, '~*' is used to separate different parts of the string to be printed in different colors.
        colors (list): A list of colors for the text. In non-advanced mode, only the first color in the list is used. In advanced mode, each color corresponds to a part of the input string separated by '~*'.
        print_END (str): The string appended after the final output. Default is '\\n'.
        advanced_mode (bool): If True, enables advanced mode that allows multiple colors in one string. Default is False.

    Examples:
    ~~~python
        print_Color('Hello, World!', ['green'])
        # Prints 'Hello, World!' in green.

        print_Color('~*Hello in green~*Hello in red', ['green', 'red'], advanced_mode=True)
        # Prints 'Hello in green' in green and 'Hello in red' in red.

    Note:
        The advanced terminal colors can be used by providing the escape sequences directly in the colors list.
        If an invalid color is provided, an error message will be printed.
    """
    color_code = {
        "black": "\x1b[0;30m",
        "red": "\x1b[0;31m",
        "green": "\x1b[0;32m",
        "yellow": "\x1b[0;33m",
        "blue": "\x1b[0;34m",
        "magenta": "\x1b[0;35m",
        "cyan": "\x1b[0;36m",
        "white": "\x1b[0;37m",
        "normal": "\x1b[0m",
        "bg_black": "\x1b[40m",
        "bg_red": "\x1b[41m",
        "bg_green": "\x1b[42m",
        "bg_yellow": "\x1b[43m",
        "bg_blue": "\x1b[44m",
        "bg_magenta": "\x1b[45m",
        "bg_cyan": "\x1b[46m",
        "bg_white": "\x1b[47m",
        "bg_normal": "\x1b[49m",
        "light_gray": "\x1b[0;90m",
        "light_red": "\x1b[0;91m",
        "light_green": "\x1b[0;92m",
        "light_yellow": "\x1b[0;93m",
        "light_blue": "\x1b[0;94m",
        "light_magenta": "\x1b[0;95m",
        "light_cyan": "\x1b[0;96m",
        "light_white": "\x1b[0;97m",
        "bg_light_gray": "\x1b[0;100m",
        "bg_light_red": "\x1b[0;101m",
        "bg_light_green": "\x1b[0;102m",
        "bg_light_yellow": "\x1b[0;103m",
        "bg_light_blue": "\x1b[0;104m",
        "bg_light_magenta": "\x1b[0;105m",
        "bg_light_cyan": "\x1b[0;106m",
        "bg_light_white": "\x1b[0;107m",
        "bold": "\x1b[1m",
        "underline": "\x1b[4m",
        "blink": "\x1b[5m",
    }

    if not advanced_mode:
        if colors[0] in color_code:
            print(color_code[colors[0]] + Input + "\x1b[0m", end=print_END)
        else:
            print("[print_Color] ERROR: Invalid color input!!!")
    else:
        substrings = Input.split("~*")
        if len(substrings) != len(colors) + 1:
            print("[print_Color] ERROR: Number of colors and number of '~*' don't match!!!")
        else:
            for sub_str, color in zip(substrings, ["normal"] + colors):
                if color in color_code:
                    print(color_code[color] + sub_str + "\x1b[0m", end="")
                else:
                    print(f"\n[print_Color] ERROR: Invalid color!!! The input color: '{color}' input list index: {colors.index(color)}")
            print("", end=print_END)

## Conf


### Data processing conf

In [3]:
# Directory paths
train_dir = "Data_set/train"
test_dir = "Data_set/test"
validation_dir = "Data_set/val"
img_res = [224, 224, 3]
# img_res = [224, 200, 3]
categorical_IMP = True
Make_EV_DATA = False
R_fill_mode = True
Save_TS = True
DDA = 3
OP_HDC = True
SL_EX = "_V3"
LNTS = 20000
# BEST CONF T
# DDA = 2
# OP_HDC = False
# SL_EX = '_V3'
# LNTS = 18000

### Training 

In [4]:
SAVE_TYPE = "H5"
Conf_batch_size = 1

## data processing with image


In [5]:
# noise_func
def noise_func(image):
    noise_type = np.random.choice(["L1", "L2", "L3", "none"])
    new_image = np.copy(image)

    if noise_type == "L3":
        intensityL2 = random.uniform(0.01, 0.04)
        intensityL1 = random.uniform(0.005, 0.02)
    else:
        intensityL2 = random.uniform(0.01, 0.04)
        intensityL1 = random.uniform(0.01, 0.04)

    block_size_L1 = random.randint(8, 32)
    block_size_L2 = random.randint(32, 64)

    if noise_type == "L2" or noise_type == "L3":
        for i in range(0, image.shape[0], block_size_L2):
            for j in range(0, image.shape[1], block_size_L2):
                block = image[i : i + block_size_L2, j : j + block_size_L2]
                block = (np.random.rand() * intensityL2 + 1) * block
                block = np.clip(block, 0, 255)  # clip pixel values to 0-255 range
                new_image[i : i + block_size_L2, j : j + block_size_L2] = block
        image = new_image

    if noise_type == "L1" or noise_type == "L3":
        for i in range(0, image.shape[0], block_size_L1):
            for j in range(0, image.shape[1], block_size_L1):
                block = image[i : i + block_size_L1, j : j + block_size_L1]
                block = (np.random.rand() * intensityL1 + 1) * block
                block = np.clip(block, 0, 255)  # clip pixel values to 0-255 range
                new_image[i : i + block_size_L1, j : j + block_size_L1] = block

    return new_image


# shuffle_data
def shuffle_data(x, y):
    indices = np.arange(x.shape[0])
    np.random.shuffle(indices)
    x = x[indices]
    y = y[indices]
    return x, y


# save_images_to_dir
def save_images_to_dir(images, labels, dir_path):
    # create the directory if it doesn't exist
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)
    # iterate over the images and labels
    for i, (image, label) in enumerate(zip(images, labels)):
        # get the class label
        class_label = np.argmax(label)
        # create the file path
        file_path = os.path.join(dir_path, f"image_{i}_class_{class_label}.png")
        # save the image to the file path
        plt.imsave(file_path, image.squeeze())


# Create an ImageDataGenerator for the training set
if OP_HDC:
    print_Color("Using OP_HDC IDG...", ["yellow"])
    train_datagen = ImageDataGenerator(
        horizontal_flip=True,
        vertical_flip=True,
        rotation_range=179,
        zoom_range=0.24,
        shear_range=0.25,
        width_shift_range=0.22,
        brightness_range=(0.9, 1.1),
        height_shift_range=0.22,
        channel_shift_range=100,
        featurewise_center=False,
        featurewise_std_normalization=False,
        fill_mode="constant",
        preprocessing_function=noise_func,
    )
    # train_datagen = ImageDataGenerator(
    #     horizontal_flip=True,
    #     vertical_flip=True,
    #     rotation_range=179,
    #     zoom_range=0.24,
    #     shear_range=0.25,
    #     width_shift_range=0.22,
    #     brightness_range=(0.97, 1.025),
    #     height_shift_range=0.22,
    #     channel_shift_range=100,
    #     featurewise_center=False,
    #     featurewise_std_normalization=False,
    #     fill_mode=np.random.choice(['constant', 'nearest']) if R_fill_mode else 'nearest',
    #     preprocessing_function=noise_func
    # )
else:
    print_Color("Using Def IDG...", ["yellow"])
    train_datagen = ImageDataGenerator(
        horizontal_flip=True,
        vertical_flip=True,
        rotation_range=179,
        zoom_range=0.26,
        shear_range=0.25,
        width_shift_range=0.25,
        brightness_range=(0.7, 1.3),
        height_shift_range=0.25,
        channel_shift_range=100,
        featurewise_center=False,
        featurewise_std_normalization=False,
        fill_mode=np.random.choice(["constant", "nearest"]) if R_fill_mode else "nearest",
        preprocessing_function=noise_func,
    )
train_datagen_SM = ImageDataGenerator(
    horizontal_flip=False,
    vertical_flip=False,
    rotation_range=20,
    zoom_range=0.07,
    shear_range=0.07,
    width_shift_range=0.07,
    brightness_range=(0.99, 1.01),
    height_shift_range=0.07,
    channel_shift_range=0,
    featurewise_center=False,
    featurewise_std_normalization=False,
)
# Create an iterator for the training set
train_generator_SM = train_datagen_SM.flow_from_directory(
    train_dir, target_size=(img_res[0], img_res[1]), batch_size=sum([len(files) for r, d, files in os.walk(train_dir)]), class_mode="binary"
)
# Create an ImageDataGenerator for the validation set (OP)
if Make_EV_DATA:
    val_datagen = ImageDataGenerator(
        horizontal_flip=False, zoom_range=0.05, width_shift_range=0.05, brightness_range=(0.92, 1.02), height_shift_range=0.05
    )

    # Create an iterator for the validation set
    val_generator = val_datagen.flow_from_directory(
        validation_dir,
        target_size=(img_res[0], img_res[1]),
        batch_size=sum([len(files) for r, d, files in os.walk(validation_dir)]),
        class_mode="binary",
        color_mode="rgb",
    )

    # Create an ImageDataGenerator for the test set
    test_datagen = ImageDataGenerator(
        horizontal_flip=False, zoom_range=0.05, width_shift_range=0.05, brightness_range=(0.98, 1.02), height_shift_range=0.05
    )

    # Create an iterator for the test set
    test_generator = test_datagen.flow_from_directory(
        test_dir,
        target_size=(img_res[0], img_res[1]),
        batch_size=sum([len(files) for r, d, files in os.walk(test_dir)]),
        class_mode="binary",
        color_mode="rgb",
    )
# Load all images and labels into memory
print_Color("Loading all images and labels into memory...", ["yellow"])
x_train, y_train = next(iter(train_generator_SM))
if Make_EV_DATA:
    x_val, y_val = next(iter(val_generator))
    x_test, y_test = next(iter(test_generator))
# fit parameters from data
# train_datagen.fit(x_train)
# to_categorical (TEMP)
if categorical_IMP:
    print_Color("Making categorical data...", ["yellow"])
    y_train = to_categorical(y_train, num_classes=2)
    if Make_EV_DATA:
        y_val = to_categorical(y_val, num_classes=2)
        y_test = to_categorical(y_test, num_classes=2)
print(f"Generating augmented data [DDA: {str(DDA)}]...")
if DDA > 0:
    for i in range(DDA):
        print(f">   Generating ADB[{i}]...")
        # prepare an iterators to scale images
        train_iterator = train_datagen.flow(x_train, y_train, batch_size=len(x_train))

        # get augmented data
        x_train_augmented, y_train_augmented = train_iterator.next()

        print(">   \---Adding the Generated ADB...")
        # append augmented data to original data
        x_train = np.concatenate([x_train, x_train_augmented])
        y_train = np.concatenate([y_train, y_train_augmented])
        # free up memory
        del y_train_augmented
        del x_train_augmented
# normalizing
x_train = np.array(x_train) / 255
y_train = np.array(y_train)
if Make_EV_DATA:
    x_val = np.array(x_val) / 255
    y_val = np.array(y_val)
    x_test = np.array(x_test) / 255
    y_test = np.array(y_test)
# Check the range of image data
print_Color(
    f"~*Grayscale range: ~*Min = {np.min(x_train)}~* | ~*Max = {np.max(x_train)}", ["normal", "blue", "normal", "red"], advanced_mode=True
)
# Check the data type of image data
print_Color(f"~*Data type: ~*{x_train.dtype}", ["normal", "green"], advanced_mode=True)
# Calculate the ratio of two labels
if categorical_IMP:
    label_ratio = np.sum(y_train[:, 0]) / np.sum(y_train[:, 1])
else:
    label_ratio = np.sum(y_train == 0) / (np.sum(y_train == 1) + 1e-10)
label_ratio_percentage = label_ratio * 100
print_Color(
    f"~*Label ratio: ~*{100 - label_ratio_percentage:.2f}% PNEUMONIA ~*| ~*{label_ratio_percentage:.2f}% NORMAL",
    ["normal", "red", "magenta", "green"],
    advanced_mode=True,
)
print_Color("Setting LNTS...", ["yellow"])
# Get the total number of samples in the arrays
num_samples = x_train.shape[0]
print_Color(f"~*Original num_samples: ~*{num_samples}", ["normal", "green"], advanced_mode=True)
if LNTS != 0:
    print_Color(f"~*Applying LNTS of: ~*{LNTS}", ["normal", "green"], advanced_mode=True)
    print_Color(f"~*SNC: ~*{num_samples - LNTS}", ["normal", "green"], advanced_mode=True)
    # Generate random indices to select LNTS samples
    indices = np.random.choice(num_samples, size=LNTS, replace=False)
    # Select the samples using the generated indices
    x_selected = x_train[indices]
    y_selected = y_train[indices]
    x_train = x_selected
    y_train = y_selected
    # free up memory
    del x_selected
    del y_selected
    del indices
    # Debug
    num_samples = x_train.shape[0]
    print_Color(f"~*New num_samples: ~*{num_samples}", ["normal", "green"], advanced_mode=True)
# Shuffle the training data
print_Color("shuffling data...", ["yellow"])
x_train, y_train = shuffle_data(x_train, y_train)
# save_images_to_dir
if Save_TS:
    print_Color("Saving TS...", ["yellow"])
    SITD = np.random.choice(num_samples, size=200, replace=False)
    S_dir = "Samples/TSR200_" + datetime.datetime.now().strftime("y%Y_m%m_d%d-h%H_m%M_s%S")
    print_Color(f"~*Sample dir: ~*{S_dir}", ["normal", "green"], advanced_mode=True)
    save_images_to_dir(x_train[SITD], y_train[SITD], S_dir)
print_Color("Done.", ["green"])

[0;33mUsing Def IDG...[0m
Found 5216 images belonging to 2 classes.
[0;33mLoading all images and labels into memory...[0m
[0;33mMaking categorical data...[0m
Generating augmented data [DDA: 2]...
>   Generating ADB[0]...
>   \---Adding the Generated ADB...
>   Generating ADB[1]...
>   \---Adding the Generated ADB...
[0m[0m[0mGrayscale range: [0m[0;34mMin = 0.0[0m[0m | [0m[0;31mMax = 1.0[0m
[0m[0m[0mData type: [0m[0;32mfloat32[0m
[0m[0m[0mLabel ratio: [0m[0;31m65.39% PNEUMONIA [0m[0;35m| [0m[0;32m34.61% NORMAL[0m
[0;33mSetting LNTS...[0m
[0m[0m[0mOriginal num_samples: [0m[0;32m20864[0m
[0m[0m[0mApplying LNTS of: [0m[0;32m18000[0m
[0m[0m[0mSNC: [0m[0;32m2864[0m
[0m[0m[0mNew num_samples: [0m[0;32m18000[0m
[0;33mshuffling data...[0m
[0;33mSaving TS...[0m
[0m[0m[0mSample dir: [0m[0;32mSamples/TSR200_y2023_m10_d17-h15_m39_s25[0m
[0;32mDone.[0m


## Save EV dataset

In [None]:
np.save(f"Database\\x_val{SL_EX}.npy", x_val)
np.save(f"Database\\y_val{SL_EX}.npy", y_val)
np.save(f"Database\\x_test{SL_EX}.npy", x_test)
np.save(f"Database\\y_test{SL_EX}.npy", y_test)

## Load EV Dataset

In [6]:
x_val = np.load(f"Database\\x_val{SL_EX}.npy")
y_val = np.load(f"Database\\y_val{SL_EX}.npy")
x_test = np.load(f"Database\\x_test{SL_EX}.npy")
y_test = np.load(f"Database\\y_test{SL_EX}.npy")

## Creating the model


### V1
```
statuses: ready
Working: ✅
Model acc: 95.51
type: transfer learning>>>(EfficientNetB7)
```

#### EfficientNetB7

In [6]:
from keras.applications import EfficientNetB7

EfficientNet_M = EfficientNetB7(
    include_top=True, input_shape=(img_res[0], img_res[1], img_res[2]), weights=None, classes=2, classifier_activation="softmax"
)
# define new model
model = Model(inputs=EfficientNet_M.inputs, outputs=EfficientNet_M.outputs)

# compile model
opt = SGD(learning_rate=0.008, momentum=0.85, decay=0.001)
# opt = SGD(learning_rate=0.008, momentum=0.85, decay=0.001)
# opt = Adam()
model.compile(optimizer=opt, loss="categorical_crossentropy", metrics=["accuracy", "binary_accuracy"])

model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 rescaling (Rescaling)          (None, 224, 224, 3)  0           ['input_1[0][0]']                
                                                                                                  
 normalization (Normalization)  (None, 224, 224, 3)  7           ['rescaling[0][0]']              
                                                                                                  
 stem_conv_pad (ZeroPadding2D)  (None, 225, 225, 3)  0           ['normalization[0][0]']      

### V1.1
```
statuses: Test
Working: N/A
Model acc: N/A
type: transfer learning>>>(ConvNeXtLarge)
```

In [None]:
from keras.applications import ConvNeXtXLarge

ConvNeXtLarge_M = ConvNeXtXLarge(
    include_top=True,
    input_shape=(img_res[0], img_res[1], img_res[2]),
    weights=None,
    classes=2,
    classifier_activation="softmax",
    include_preprocessing=False,
)
# define new model
model = Model(inputs=ConvNeXtLarge_M.inputs, outputs=ConvNeXtLarge_M.outputs)

# compile model
opt = SGD(learning_rate=1e-03, momentum=0.85, decay=0.001)
# opt = Adam()
model.compile(optimizer=opt, loss="categorical_crossentropy", metrics=["accuracy", "binary_accuracy"])

model.summary()

### V1.2
```
statuses: TEST
Working: ✅
Model acc: ⚠️
type: transfer learning>>>(EfficientNetB7)
Other: ⚠️(BR)
```

In [7]:
from efficientnet.keras import EfficientNetB7 as KENB7


def Eff_B7_NS():
    model_EfficientNetB7_NS = Sequential([
        KENB7(input_shape=(img_res[0], img_res[1], img_res[2]), weights="noisy-student", include_top=False),
        GlobalAveragePooling2D(),
        Dense(128, activation="relu"),
        Dense(64, activation="relu"),
        Dense(2, activation="softmax"),
    ])
    # opt = SGD(learning_rate=0.01, momentum=0.9, decay=0.001)
    # opt = Adadelta()
    # opt = Nadam()
    # model_EfficientNetB7_NS.compile(optimizer = opt,  loss='categorical_crossentropy', metrics=['categorical_accuracy', 'accuracy'])
    model_EfficientNetB7_NS.compile(optimizer="Adam", loss="categorical_crossentropy", metrics=["categorical_accuracy", "accuracy"])
    # model_EfficientNetB7_NS.compile(optimizer = 'Nadam', loss='categorical_crossentropy', metrics=['categorical_accuracy', 'accuracy'])

    return model_EfficientNetB7_NS


model = Eff_B7_NS()
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 efficientnet-b7 (Functional  (None, 7, 7, 2560)       64097680  
 )                                                               
                                                                 
 global_average_pooling2d (G  (None, 2560)             0         
 lobalAveragePooling2D)                                          
                                                                 
 dense (Dense)               (None, 128)               327808    
                                                                 
 dense_1 (Dense)             (None, 64)                8256      
                                                                 
 dense_2 (Dense)             (None, 2)                 130       
                                                                 
Total params: 64,433,874
Trainable params: 64,123,154
No

### V(T) Beta

In [None]:
from keras.applications.efficientnet import EfficientNetB7
from keras.applications.inception_resnet_v2 import InceptionResNetV2

# Define a single input layer
input_layer = Input(shape=(img_res[0], img_res[1], img_res[2]))

# Create EfficientNetB7 model with the shared input
efficientnet = EfficientNetB7(include_top=True, weights=None, input_tensor=input_layer, classes=4)
# efficientnet.load_weights('EfficientNetB7_PRET.h5', by_name=True, skip_mismatch=True)
efficientnet_output = efficientnet.output

# Create InceptionResNetV2 model with the shared input
inceptionresnet = InceptionResNetV2(include_top=True, weights=None, input_tensor=input_layer, classes=2)

# Rename the layers of the InceptionResNetV2 model
for layer in inceptionresnet.layers:
    layer._name = layer.name + str("_inceptionresnet")

inceptionresnet_output = inceptionresnet.output

# Concatenate the outputs of both models
combined_output = Concatenate()([efficientnet_output, inceptionresnet_output])

# Create a new model using the combined output
combined_model = Model(inputs=input_layer, outputs=combined_output)

# Create the final output model
ffc_leyer1 = Dense(512, activation="relu", kernel_regularizer=l2(0.1))(combined_model.output)
BatchNormalization1 = BatchNormalization()(ffc_leyer1)
ffc_leyer2 = Dense(512, activation="relu", kernel_regularizer=l2(0.1))(BatchNormalization1)
BatchNormalization2 = BatchNormalization()(ffc_leyer2)
final_output = Dense(2, activation="softmax")(BatchNormalization2)

# Create the overall model
model = Model(inputs=input_layer, outputs=final_output)

# Compile the model
opt = SGD(learning_rate=0.0005, momentum=0.85, decay=0.001)
model.compile(optimizer=opt, loss="categorical_crossentropy", metrics=["accuracy", "binary_accuracy"])

# Print the summary of the model
model.summary()

## Loading the model

### Loading the full model

In [12]:
# conf
PRMC = False
CEC_opt = Adagrad(lr=0.00001)
# CEC_opt = SGD(learning_rate=0.00001, momentum=0.9, decay=0.0005)
# main
try:
    if SAVE_TYPE == "TF":
        model = load_model("PAI_model", compile=PRMC)
    else:
        model = load_model("PAI_model.h5", compile=PRMC)
except (ImportError, IOError) as e:
    print(f"\033[91mfailed to load the model ERROR:\n{e}")
else:
    print("\033[92mloading model done.")
    if not PRMC:
        print("Compiling the ai model...")
        model.compile(optimizer=CEC_opt, loss="categorical_crossentropy", metrics=["categorical_accuracy", "accuracy"])
        print("done.")

[92mloading model done.
Compiling the ai model...
done.


### Loading model weights

In [None]:
model.load_weights("PAI_model_weights.h5")

## Training

In [8]:
# build_lrfn conf
lr_start = 0.00001
lr_max = 0.00005
lr_min = 0.000001
lr_rampup_epochs = 5
lr_sustain_epochs = 2
lr_exp_decay = 0.8


# build_lrfn
def build_lrfn(
    lr_start=lr_start,
    lr_max=lr_max,
    lr_min=lr_min,
    lr_rampup_epochs=lr_rampup_epochs,
    lr_sustain_epochs=lr_sustain_epochs,
    lr_exp_decay=lr_exp_decay,
):
    lr_max = lr_max * tf.distribute.get_strategy().num_replicas_in_sync

    def lrfn(epoch):
        if epoch < lr_rampup_epochs:
            lr = (lr_max - lr_start) / lr_rampup_epochs * epoch + lr_start
        elif epoch < lr_rampup_epochs + lr_sustain_epochs:
            lr = lr_max
        else:
            lr = (lr_max - lr_min) * lr_exp_decay ** (epoch - lr_rampup_epochs - lr_sustain_epochs) + lr_min
        return lr

    return lrfn


lrfn = build_lrfn()
lr_schedule = tf.keras.callbacks.LearningRateScheduler(lrfn, verbose=1)

# early_stopping
early_stopping = EarlyStopping(monitor="val_accuracy", patience=12, verbose=1, restore_best_weights=True)
# TensorBoard
log_dir = "logs/fit/" + datetime.datetime.now().strftime("y%Y_m%m_d%d-h%H_m%M_s%S")
tensorboard_callback = TensorBoard(log_dir=log_dir, write_images=True, histogram_freq=1)
print("Log dir:", log_dir)
# training the model
print("Training the model...\n")
history = model.fit(
    x_train,
    y_train,
    epochs=256,
    batch_size=Conf_batch_size,
    validation_data=(x_test, y_test),
    verbose="auto",
    callbacks=[early_stopping, tensorboard_callback, lr_schedule],
)
print("Training done.\n")

Log dir: logs/fit/y2023_m10_d17-h15_m39_s32
Training the model...


Epoch 1: LearningRateScheduler setting learning rate to 1e-05.
Epoch 1/256

Epoch 2: LearningRateScheduler setting learning rate to 1.8000000000000004e-05.
Epoch 2/256

Epoch 3: LearningRateScheduler setting learning rate to 2.6000000000000002e-05.
Epoch 3/256

Epoch 4: LearningRateScheduler setting learning rate to 3.4000000000000007e-05.
Epoch 4/256

Epoch 5: LearningRateScheduler setting learning rate to 4.2000000000000004e-05.
Epoch 5/256

Epoch 6: LearningRateScheduler setting learning rate to 5e-05.
Epoch 6/256

Epoch 7: LearningRateScheduler setting learning rate to 4.2000000000000004e-05.
Epoch 7/256

Epoch 8: LearningRateScheduler setting learning rate to 3.5600000000000005e-05.
Epoch 8/256

Epoch 9: LearningRateScheduler setting learning rate to 3.0480000000000006e-05.
Epoch 9/256

Epoch 10: LearningRateScheduler setting learning rate to 2.6384000000000004e-05.
Epoch 10/256

Epoch 11: LearningRateScheduler se

## Saving model weights


In [9]:
# Save the weights
print("Saving weights...")
model.save_weights("PAI_model_weights.h5")
print("Saving full model...")
if SAVE_TYPE == "TF":
    print("Saving full model tf format...")
    model.save("PAI_model", save_format="tf")
else:
    try:
        model.save("PAI_model.h5")
    except ValueError:
        print("failed to save in .h5 format!")
        print("Saving full model in tf format...")
        model.save("PAI_model", save_format="tf")

Saving weights...
Saving full model...


## Analyse model Training performance

In [None]:
try:
    # loss
    plt.plot(history.history["loss"], label="loss")
    try:
        plt.plot(history.history["val_loss"], label="val_loss", color="orange")
    except (ValueError, NameError):
        print("\033[91mfailed to load val_loss.")
    plt.title("Model Loss")
    plt.ylabel("Loss")
    plt.xlabel("Epoch")
    plt.show()
    # acc
    plt.plot(history.history["accuracy"], label="accuracy")
    try:
        plt.plot(history.history["val_accuracy"], label="val_accuracy", color="orange")
    except (ValueError, NameError):
        print("\033[91mfailed to load val_accuracy.")
    plt.title("Model Accuracy")
    plt.ylabel("Accuracy")
    plt.xlabel("Epoch")
    plt.show()
except (ValueError, NameError):
    print("\033[91mfailed to load model history.")

## Analyse model Predicting performance

### Gradcam heatmap

In [None]:
def make_gradcam_heatmap(
    img_array, model, last_conv_layer_name, second_last_conv_layer_name=None, pred_index=None, threshold=0, sensitivity_map=1.0
):
    grad_model = tf.keras.models.Model([model.inputs], [model.get_layer(last_conv_layer_name).output, model.output])

    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(preds[0])
        class_channel = preds[:, pred_index]

    grads = tape.gradient(class_channel, last_conv_layer_output)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)

    # Apply threshold
    heatmap = np.where(heatmap > threshold, heatmap, 0)

    # Adjust sensitivity
    heatmap = heatmap**sensitivity_map

    if second_last_conv_layer_name is not None:
        grad_model_second = tf.keras.models.Model([model.inputs], [model.get_layer(second_last_conv_layer_name).output, model.output])

        with tf.GradientTape() as tape:
            second_last_conv_layer_output, preds = grad_model_second(img_array)
            class_channel = preds[:, pred_index]

        grads = tape.gradient(class_channel, second_last_conv_layer_output)
        pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
        second_last_conv_layer_output = second_last_conv_layer_output[0]
        heatmap_second = second_last_conv_layer_output @ pooled_grads[..., tf.newaxis]
        heatmap_second = tf.squeeze(heatmap_second)
        heatmap_second = tf.maximum(heatmap_second, 0) / tf.math.reduce_max(heatmap_second)

        # Apply threshold
        heatmap_second = np.where(heatmap_second > threshold, heatmap_second, 0)

        # Adjust sensitivity
        heatmap_second = heatmap_second**sensitivity_map

        # Average the two heatmaps
        heatmap = (heatmap + heatmap_second) / 2.0

    return heatmap

### Main test

In [None]:
import seaborn as sns
from sklearn.metrics import confusion_matrix, accuracy_score
from scipy.stats import binom
from tqdm import tqdm

prob_L = 0.9995
tick_spacing = 5
if SAVE_TYPE == "TF":
    # Load the pre-trained model
    model = load_model("PAI_model")
else:
    # Load the pre-trained model
    model = load_model("PAI_model.h5")

# Ensure the model's input_shape matches your data
assert model.input_shape[1:] == (img_res[0], img_res[1], img_res[2]), "Model's input shape doesn't match data."

# Make predictions on validation data
val_predictions = model.predict(x_val)
val_predictions = np.argmax(val_predictions, axis=1)

# Make predictions on test data
test_predictions = model.predict(x_test)
test_predictions = np.argmax(test_predictions, axis=1)

# Convert y_val and y_test from one-hot encoder to their original form
y_val_original = np.argmax(y_val, axis=1)
y_test_original = np.argmax(y_test, axis=1)

# Calculate accuracy on validation data
val_accuracy = accuracy_score(y_val_original, val_predictions)

# Calculate accuracy on test data
test_accuracy = accuracy_score(y_test_original, test_predictions)

print(f"The accuracy of the model on validation data is {val_accuracy:.2%}")
print(f"The accuracy of the model on test data is {test_accuracy:.2%}")

# Visualize the predictions on validation data as a grid of squares
plt.figure(figsize=(12, 6))
for i in range(10):
    plt.subplot(2, 5, i + 1)
    plt.imshow(x_val[i])
    plt.title(f"True: {y_val_original[i]}\nPredicted: {val_predictions[i]}")
    plt.axis("off")
plt.tight_layout()
plt.show()
# Heatmap
plt.figure(figsize=(12, 6))
for i in range(10):
    plt.subplot(2, 5, i + 1)
    img = x_val[i]
    heatmap = make_gradcam_heatmap(img[np.newaxis, ...], model, "top_conv", sensitivity_map=0.95)
    heatmap = cv2.resize(heatmap, (img.shape[1], img.shape[0]))
    heatmap = np.uint8(255 * heatmap)
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
    superimposed_img = heatmap * 0.0019 + img
    superimposed_img = np.clip(superimposed_img, 0, 1)  # ensure the values are in the range [0, 1]
    plt.imshow(superimposed_img)
    plt.title(f"True: {y_val_original[i]}\nPredicted: {val_predictions[i]}")
    plt.axis("off")
plt.tight_layout()
plt.show()

# Define the list of labels
labels = ["NORMAL", "PNEUMONIA"]

# Create a confusion matrix for validation data
val_cm = confusion_matrix(y_val_original, val_predictions)

# Create a confusion matrix for test data
test_cm = confusion_matrix(y_test_original, test_predictions)

# Plot the confusion matrix as a heatmap for validation data
plt.figure(figsize=(8, 6))
sns.heatmap(val_cm, annot=True, cmap="Blues", fmt="d", xticklabels=labels, yticklabels=labels)
plt.title("Confusion Matrix - Validation Data")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()

# Plot the confusion matrix as a heatmap for test data
plt.figure(figsize=(8, 6))
sns.heatmap(test_cm, annot=True, cmap="Blues", fmt="d", xticklabels=labels, yticklabels=labels)
plt.title("Confusion Matrix - Test Data")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()

# Define the range of test data sizes to use
data_sizes = range(1, len(x_test), 4)
# Calculate the probability of a wrong prediction based on test accuracy
prob_wrong = 1 - test_accuracy

# Create a list to store the number of incorrect predictions for each test data size
incorrect_predictions = []

# Generate predictions and track incorrect predictions for each data size
for size in tqdm(data_sizes, desc="Predicting", unit="dpb"):
    # Subset of test data
    x_test_subset = x_test[:size]
    y_test_subset = y_test[:size]

    # Make predictions on the subset of test data
    test_predictions = model.predict(x_test_subset, batch_size=1, verbose=0, max_queue_size=120, workers=1, use_multiprocessing=False)
    test_predictions = np.argmax(test_predictions, axis=1)
    y_test_original_subset = np.argmax(y_test_subset, axis=1)

    # Calculate the number of incorrect predictions
    incorrect_preds = np.sum(test_predictions != y_test_original_subset)
    incorrect_predictions.append(incorrect_preds)

# Plot the number of incorrect predictions vs. the number of data points
plt.figure(figsize=(10, 6))
plt.plot(data_sizes, incorrect_predictions)
plt.xlabel("Number of Data Points")
plt.ylabel("Number of Incorrect Predictions")
# Add gridlines for the x and y axes
plt.grid(True)

# Change the tick spacing for the x and y axes
plt.xticks(np.arange(min(data_sizes), max(data_sizes) + 1, 50))
plt.yticks(np.arange(0, max(incorrect_predictions) + 5, 3))

plt.title("Number of Incorrect Predictions vs. Number of Data Points")
plt.show()

# Define the range of test data sizes to use
data_sizes = range(1, len(x_test), 1)

# Calculate the probability of a wrong prediction based on test accuracy
prob_wrong = 1 - test_accuracy

# Create a list to store the probability of getting at least one wrong answer for each test data size
probabilities = []

# Calculate the probability of getting at least one wrong answer for each data size
for size in data_sizes:
    # Calculate the cumulative distribution function (CDF) of the binomial distribution at 0
    cdf = binom.cdf(0, size, prob_wrong)
    # Subtract the CDF from 1 to get the probability of getting at least one wrong answer
    prob = 1 - cdf
    probabilities.append(prob)

# Find the index of the first data point that has a probability greater than prob_L%
index = next((i for i, p in enumerate(probabilities) if p > prob_L), len(probabilities))

# Limit the x-axis to the first data point that has a probability greater than prob_L%
data_sizes = data_sizes[: index + 1]
probabilities = probabilities[: index + 1]

# Plot the probability vs. the number of data points
plt.figure(figsize=(10, 6))
plt.plot(data_sizes, probabilities)
plt.xlabel("Number of Data Points")
plt.ylabel("Probability")

# Add gridlines for the x and y axes
plt.grid(True)

# Change the tick spacing for the x and y axes
plt.xticks(np.arange(min(data_sizes), max(data_sizes) + 1, tick_spacing + 2))
plt.yticks(np.arange(0, max(probabilities) + 0.1, tick_spacing / 100))

plt.ylim(top=1.01)

plt.title("Probability of Getting at Least One Wrong Answer vs. Number of Data Points")
plt.show()