In [None]:
from clear_folder import clear_folder
import logging
from split_data import *
from keras.callbacks import ModelCheckpoint, EarlyStopping, CSVLogger
import numpy as np
import tensorflow as tf
from keras.utils import Sequence
import os
import numpy as np
import tensorflow as tf
from keras.utils import Sequence
import os
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc, confusion_matrix, ConfusionMatrixDisplay

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
logging.getLogger('tensorflow').disabled = True
# Initialize the data generator

datapath = '/home/abidhasan/Documents/Indicate_FH/data'
train_dir = '/home/abidhasan/Documents/Indicate_FH/train_val_test/train'
test_dir = '/home/abidhasan/Documents/Indicate_FH/train_val_test/test'
model_dir = '/home/abidhasan/Documents/Indicate_FH/saved_model/'
figpath = '/home/abidhasan/Documents/Indicate_FH/performance_figures'
checkpointpath = '/home/abidhasan/Documents/Indicate_FH/checkpoints'
batch_size = 64
size = 224
dropout_rt = 0.4
image_size = (size, size)
n_channels = 3
epochs = 100

train_data_ratio = 0.8
val_data_ratio = 0.0
test_data_ratio = 0.2

# ModelCheckpoint callback saves a model at some interval.
# File name includes epoch and validation accuracy.


class DataGenerator(Sequence):
    def __init__(self, image_dir, batch_size, image_size, n_channels=3, shuffle=True):
        self.image_dir = image_dir
        self.batch_size = batch_size
        self.image_size = image_size
        self.n_channels = n_channels
        self.shuffle = shuffle
        self.supported_formats = ['.jpg', '.jpeg', '.png', '.gif', '.bmp']
        self.image_files, self.labels = self._load_image_files()
        self.indexes = np.arange(len(self.image_files))
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.image_files) / self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        batch_files = [self.image_files[k] for k in indexes]
        batch_labels = [self.labels[k] for k in indexes]

        X = np.empty((self.batch_size, *self.image_size,
                     self.n_channels), dtype=np.float32)
        y = np.empty((self.batch_size), dtype=np.float32)

        for i, file in enumerate(batch_files):
            image = self.load_image(file)
            if image is not None:
                X[i,] = image
                y[i] = batch_labels[i]

        return X, y

    def _load_image_files(self):
        image_files = []
        labels = []
        for label, category in enumerate(['not_effected', 'effected']):
            category_dir = os.path.join(self.image_dir, category)
            for file_name in os.listdir(category_dir):
                if any(file_name.lower().endswith(ext) for ext in self.supported_formats):
                    image_files.append(os.path.join(category_dir, file_name))
                    labels.append(label)
        return image_files, labels

    def load_image(self, image_path):
        try:
            image = tf.io.read_file(image_path)
            image = tf.image.decode_image(image, channels=self.n_channels)
            image = tf.image.resize(image, self.image_size)
            image = image / 255.0
            return image.numpy()
        except:
            # If the image cannot be decoded, return None
            return None

    def on_epoch_end(self):
        if self.shuffle:
            combined = list(zip(self.image_files, self.labels))
            np.random.shuffle(combined)
            self.image_files, self.labels = zip(*combined)

In [None]:
clear_folder('/home/abidhasan/Documents/Indicate_FH/train_val_test')
split_data(datapath, train_data_ratio, val_data_ratio, test_data_ratio)

In [None]:
# Function to count images in each class
import os


def count_images(directory):
    affected_count = len(os.listdir(os.path.join(directory, 'effected')))
    not_affected_count = len(os.listdir(
        os.path.join(directory, 'not_effected')))
    return affected_count, not_affected_count


# Count images in train set
train_affected_count, train_not_affected_count = count_images(train_dir)

# Count images in test set
test_affected_count, test_not_affected_count = count_images(test_dir)

# Print the counts
print(
    f'Train set: Affected - {train_affected_count}, Not Affected - {train_not_affected_count}')
print(
    f'Test set: Affected - {test_affected_count}, Not Affected - {test_not_affected_count}')

In [None]:
from keras.applications import VGG16
from keras.layers import Dense, Flatten
from keras.models import Model

# Load the pre-trained VGG16 model
vgg16_model = VGG16(weights='imagenet', include_top=False,
                   input_shape=(224, 224, 3))
# Freeze the initial layers and fine-tune the later layers
for layer in vgg16_model.layers[:15]:
    layer.trainable = False
for layer in vgg16_model.layers[15:]:
    layer.trainable = True
# Add new classification layers

model = tf.keras.models.Sequential()
model.add(vgg16_model)
model.add(tf.keras.layers.Flatten())
model.add(Dense(256, activation='relu'))
model.add(tf.keras.layers.Dropout(0.5))
model.add(tf.keras.layers.Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy',
              optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy'])

model.summary()


x = Flatten()(vgg16_model.output)
x = Dense(256, activation='relu')(x)
output = Dense(1, activation='softmax')(x)
# Create the new model
model = Model(inputs=vgg16_model.input, outputs=output)
# Compile and train the model on the new dataset
model.compile(optimizer='adam', loss='categorical_crossentropy',
              metrics=['accuracy'])


In [None]:


checckpoint_filepath = checkpointpath + \
    '/weights-improvement-{epoch:02d}-{val_loss:.2f}.hdf5'
# Use Mode = max for accuracy and min for loss.
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=20)
mc = ModelCheckpoint(checckpoint_filepath, monitor='val_loss',
                     mode='min', verbose=1, save_best_only=True)
# CSVLogger logs epoch, acc, loss, val_acc, val_loss
log_csv = CSVLogger(checkpointpath+'/my_logs.csv', separator=',', append=False)
callbacks_list = [mc, es, log_csv]

train_gen = DataGenerator(image_dir=train_dir, batch_size=batch_size,
                          image_size=image_size, n_channels=n_channels)
test_gen = DataGenerator(image_dir=test_dir, batch_size=batch_size,
                         image_size=image_size, n_channels=n_channels, shuffle=False)

# Train the model in batches
steps_per_epoch = len(train_gen)
validation_steps = len(test_gen)


history = model.fit(train_gen, epochs=epochs, steps_per_epoch=steps_per_epoch,
                    validation_data=test_gen, validation_steps=validation_steps)

# Save the model weights after training
model.save_weights(model_dir+'vgg16_'+f"{epochs}"+'.h5')

In [None]:


# Plotting the Model performcaes, train Vs Validation accuracy and train vs Validation Losses.
f, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
t = f.suptitle('VGG16(with_weight) Performance', fontsize=12)
f.subplots_adjust(top=0.85, wspace=0.3)
max_epoch = len(history.history['accuracy'])+1
epoch_list = list(range(1, max_epoch))
ax1.plot(epoch_list, history.history['accuracy'], label='Train Accuracy')
ax1.plot(
    epoch_list, history.history['val_accuracy'], label='Validation Accuracy')
ax1.set_xticks(np.arange(1, max_epoch, 5))
ax1.set_ylabel('Accuracy Value')
ax1.set_xlabel('Epoch')
ax1.set_title('Accuracy')
l1 = ax1.legend(loc="best")


ax2.plot(epoch_list, history.history['loss'], label='Train Loss')
ax2.plot(epoch_list, history.history['val_loss'], label='Validation Loss')
ax2.set_xticks(np.arange(1, max_epoch, 5))
ax2.set_ylabel('Loss Value')
ax2.set_xlabel('Epoch')
ax2.set_title('Loss')
l2 = ax2.legend(loc="best")
plt.show()

plt.savefig(figpath+'/VGG16_TRAIN_VS_VALIDATION_LOSS_with_weights.png',
            bbox_inches='tight')

In [None]:
# Evaluate the model on the test data
loss, accuracy = model.evaluate(test_gen, steps=len(test_gen))
print(f'Test accuracy: {accuracy * 100:.2f}%')

# Get predictions for the test dataset
predictions = model.predict(test_gen, steps=len(test_gen))
predicted_classes = (predictions > 0.5).astype("int32")

# Print predicted classes
print(predicted_classes)

# Get true labels and predictions
true_labels = []
for i in range(len(test_gen)):
    _, labels = test_gen[i]
    true_labels.extend(labels)

true_labels = np.array(true_labels)

# Calculate ROC curve and AUC
fpr, tpr, thresholds = roc_curve(true_labels, predictions)
roc_auc = auc(fpr, tpr)

# Plot the ROC curve
plt.figure(figsize=(10, 5))
plt.plot(fpr, tpr, color='darkorange', lw=2,
         label=f'ROC curve (area = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc='lower right')
plt.show()

In [None]:
# Compute the confusion matrix
cm = confusion_matrix(true_labels, predicted_classes)

# Display the confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[
                              'Not Affected', 'Affected'])
disp.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.show()