* UNet architecture
* Latest model

In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras import backend as K
import cv2
import matplotlib.pyplot as plt
from datetime import datetime
from sklearn.metrics import confusion_matrix, accuracy_score, recall_score, precision_score, f1_score
import seaborn as sns
from tensorflow.keras.callbacks import EarlyStopping

# Directory paths
train_img_dir = r"C:\Users\Jaber\OneDrive - University of Florida\Educational\GitHub\Datasets\ImageSegmentation\Dental_XRay_Computacional_Vision_Segmentation\Dental X_Ray\train"
train_mask_dir = r"C:\Users\Jaber\OneDrive - University of Florida\Educational\GitHub\ImageSegmentation\Datasets\Dental_XRay_Computacional_Vision_Segmentation\Dental X_Ray\train\train_mask"
test_img_dir = r"C:\Users\Jaber\OneDrive - University of Florida\Educational\GitHub\Datasets\ImageSegmentation\Dental_XRay_Computacional_Vision_Segmentation\Dental X_Ray\test"
test_mask_dir = r"C:\Users\Jaber\OneDrive - University of Florida\Educational\GitHub\ImageSegmentation\Datasets\Dental_XRay_Computacional_Vision_Segmentation\Dental X_Ray\test\test_mask"
valid_img_dir = r"C:\Users\Jaber\OneDrive - University of Florida\Educational\GitHub\Datasets\ImageSegmentation\Dental_XRay_Computacional_Vision_Segmentation\Dental X_Ray\valid"
valid_mask_dir = r"C:\Users\Jaber\OneDrive - University of Florida\Educational\GitHub\ImageSegmentation\Datasets\Dental_XRay_Computacional_Vision_Segmentation\Dental X_Ray\valid\valid_mask"

# Image generator to load data in batches
def image_generator(img_dir, mask_dir, batch_size, img_size=(256, 256)):
    img_files = os.listdir(img_dir)
    while True:
        images = []
        masks = []
        for img_file in img_files:
            img_path = os.path.join(img_dir, img_file)
            mask_file = img_file + "_mask.png"
            mask_path = os.path.join(mask_dir, mask_file)

            if os.path.exists(mask_path):
                # Load image and mask
                img = load_img(img_path, color_mode='rgb', target_size=img_size)
                img = img_to_array(img) / 255.0
                mask = load_img(mask_path, color_mode='grayscale', target_size=img_size)
                mask = img_to_array(mask) / 255.0

                images.append(img)
                masks.append(mask)

            if len(images) == batch_size:
                yield np.array(images), np.array(masks)
                images = []
                masks = []

# U-Net Model
def unet_model(input_size=(256, 256, 3)):
    inputs = tf.keras.layers.Input(input_size)

    # Encoder
    c1 = tf.keras.layers.Conv2D(64, 3, activation='relu', padding='same')(inputs)
    c1 = tf.keras.layers.Conv2D(64, 3, activation='relu', padding='same')(c1)
    p1 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(c1)

    c2 = tf.keras.layers.Conv2D(128, 3, activation='relu', padding='same')(p1)
    c2 = tf.keras.layers.Conv2D(128, 3, activation='relu', padding='same')(c2)
    p2 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(c2)

    c3 = tf.keras.layers.Conv2D(256, 3, activation='relu', padding='same')(p2)
    c3 = tf.keras.layers.Conv2D(256, 3, activation='relu', padding='same')(c3)
    p3 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(c3)

    c4 = tf.keras.layers.Conv2D(512, 3, activation='relu', padding='same')(p3)
    c4 = tf.keras.layers.Conv2D(512, 3, activation='relu', padding='same')(c4)
    p4 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(c4)

    # Bottleneck
    b = tf.keras.layers.Conv2D(1024, 3, activation='relu', padding='same')(p4)
    b = tf.keras.layers.Conv2D(1024, 3, activation='relu', padding='same')(b)

    # Decoder
    u1 = tf.keras.layers.Conv2DTranspose(512, 2, strides=(2, 2), padding='same')(b)
    u1 = tf.keras.layers.concatenate([u1, c4])
    c5 = tf.keras.layers.Conv2D(512, 3, activation='relu', padding='same')(u1)
    c5 = tf.keras.layers.Conv2D(512, 3, activation='relu', padding='same')(c5)

    u2 = tf.keras.layers.Conv2DTranspose(256, 2, strides=(2, 2), padding='same')(c5)
    u2 = tf.keras.layers.concatenate([u2, c3])
    c6 = tf.keras.layers.Conv2D(256, 3, activation='relu', padding='same')(u2)
    c6 = tf.keras.layers.Conv2D(256, 3, activation='relu', padding='same')(c6)

    u3 = tf.keras.layers.Conv2DTranspose(128, 2, strides=(2, 2), padding='same')(c6)
    u3 = tf.keras.layers.concatenate([u3, c2])
    c7 = tf.keras.layers.Conv2D(128, 3, activation='relu', padding='same')(u3)
    c7 = tf.keras.layers.Conv2D(128, 3, activation='relu', padding='same')(c7)

    u4 = tf.keras.layers.Conv2DTranspose(64, 2, strides=(2, 2), padding='same')(c7)
    u4 = tf.keras.layers.concatenate([u4, c1])
    c8 = tf.keras.layers.Conv2D(64, 3, activation='relu', padding='same')(u4)
    c8 = tf.keras.layers.Conv2D(64, 3, activation='relu', padding='same')(c8)

    outputs = tf.keras.layers.Conv2D(1, 1, activation='sigmoid')(c8)
    model = tf.keras.Model(inputs=[inputs], outputs=[outputs])
    return model

# Define custom metrics
def custom_precision(y_true, y_pred):
    y_pred_bin = K.round(y_pred)
    true_positives = K.sum(K.round(y_true * y_pred_bin))
    predicted_positives = K.sum(y_pred_bin)
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def custom_recall(y_true, y_pred):
    y_pred_bin = K.round(y_pred)
    true_positives = K.sum(K.round(y_true * y_pred_bin))
    possible_positives = K.sum(y_true)
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def custom_specificity(y_true, y_pred):
    y_pred_bin = K.round(y_pred)
    true_negatives = K.sum(K.round((1 - y_true) * (1 - y_pred_bin)))
    possible_negatives = K.sum(1 - y_true)
    specificity = true_negatives / (possible_negatives + K.epsilon())
    return specificity

def custom_f1(y_true, y_pred):
    precision = custom_precision(y_true, y_pred)
    recall = custom_recall(y_true, y_pred)
    return 2 * (precision * recall) / (precision + recall + K.epsilon())

# Define Focal Loss
def focal_loss_fixed(y_true, y_pred):
    gamma = 2.0
    alpha = 0.25
    epsilon = K.epsilon()
    y_pred = K.clip(y_pred, epsilon, 1. - epsilon)
    cross_entropy = -y_true * K.log(y_pred) - (1 - y_true) * K.log(1 - y_pred)
    weight = alpha * y_true * K.pow((1 - y_pred), gamma) + (1 - alpha) * (1 - y_true) * K.pow(y_pred, gamma)
    loss = weight * cross_entropy
    return K.mean(loss)

# Compile the model
model = unet_model()
model.compile(optimizer='adam', loss=focal_loss_fixed, metrics=['accuracy', custom_precision, custom_recall, custom_specificity, custom_f1])

# Batch size for training
batch_size = 16

# Create data generators
train_gen = image_generator(train_img_dir, train_mask_dir, batch_size)
valid_gen = image_generator(valid_img_dir, valid_mask_dir, batch_size)
test_gen = image_generator(test_img_dir, test_mask_dir, batch_size)

# Number of steps per epoch
steps_per_epoch = len(os.listdir(train_img_dir)) // batch_size
validation_steps = len(os.listdir(valid_img_dir)) // batch_size
test_steps = len(os.listdir(test_img_dir)) // batch_size

# Custom callback to print more metrics at each batch and epoch for training, validation, and test sets
class MetricsCallback(tf.keras.callbacks.Callback):
    def __init__(self, total_batches, X_valid, y_valid, X_test, y_test):
        super().__init__()
        self.batch_counter = 1
        self.total_batches = total_batches
        self.current_epoch = 1
        self.X_valid = X_valid
        self.y_valid = y_valid
        self.X_test = X_test
        self.y_test = y_test

    def on_epoch_begin(self, epoch, logs=None):
        self.current_epoch = epoch + 1
        print(f"\nEpoch {self.current_epoch}/{self.params['epochs']}")

    def on_batch_end(self, batch, logs=None):
        logs = logs or {}
        accuracy = logs.get('accuracy', 0)
        loss = logs.get('loss', 0)
        precision = logs.get('custom_precision', 0)
        recall = logs.get('custom_recall', 0)
        f1 = logs.get('custom_f1', 0)
        specificity = logs.get('custom_specificity', 0)
        current_time = datetime.now().strftime("%H:%M:%S")
        print(f"Batch {self.batch_counter}/{self.total_batches} ━━━━━━━━━━━━━━━━━━━━ {current_time}")
        print(f"Accuracy: {accuracy:.4f} - Precision: {precision:.4f} - Recall: {recall:.4f} - Specificity: {specificity:.4f} - F1: {f1:.4f} - Loss: {loss:.4f}\n")
        self.batch_counter += 1

    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        accuracy = logs.get('accuracy', 0)
        val_accuracy = logs.get('val_accuracy', 0)
        loss = logs.get('loss', 0)
        val_loss = logs.get('val_loss', 0)
        precision = logs.get('custom_precision', 0)
        val_precision = logs.get('val_custom_precision', 0)
        recall = logs.get('custom_recall', 0)
        val_recall = logs.get('val_custom_recall', 0)
        f1 = logs.get('custom_f1', 0)
        val_f1 = logs.get('val_custom_f1', 0)
        specificity = logs.get('custom_specificity', 0)
        val_specificity = logs.get('val_custom_specificity', 0)
        print(f"Epoch {epoch+1}/{self.params['epochs']}")
        print(f"Train - Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, Specificity: {specificity:.4f}, F1: {f1:.4f}, Loss: {loss:.4f}")
        print(f"Validation - Accuracy: {val_accuracy:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, Specificity: {val_specificity:.4f}, F1: {val_f1:.4f}, Loss: {val_loss:.4f}\n")
        test_loss, test_accuracy, test_precision, test_recall, test_specificity, test_f1 = self.model.evaluate(self.X_test, self.y_test, verbose=0)
        print(f"Test Set Results - Accuracy: {test_accuracy:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}, Specificity: {test_specificity:.4f}, F1: {test_f1:.4f}, Loss: {test_loss:.4f}\n")
        self.batch_counter = 1

# Define early stopping
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# Initialize the custom callback with validation and test data
metrics_callback = MetricsCallback(total_batches=steps_per_epoch, X_valid=valid_gen, y_valid=valid_gen, X_test=test_gen, y_test=test_gen)

# Train the model
history = model.fit(
    train_gen,
    steps_per_epoch=steps_per_epoch,
    epochs=20,
    validation_data=valid_gen,
    validation_steps=validation_steps,
    callbacks=[metrics_callback, early_stopping],
    verbose=0
)

# Save the model
model.save('dental_xray_unet_model.h5')

# Evaluate on the training set
train_gen_full = image_generator(train_img_dir, train_mask_dir, batch_size)
y_train_pred = model.predict(train_gen_full, steps=steps_per_epoch)
y_train_pred_bin = (y_train_pred > 0.5).astype(np.uint8)

# Confusion Matrix for training
conf_matrix_train = confusion_matrix(y_train_pred_bin.flatten(), y_train_pred_bin.flatten())
plt.figure(figsize=(6, 4))
sns.heatmap(conf_matrix_train, annot=True, fmt="d", cmap="Blues")
plt.title("Confusion Matrix for Train")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()

# Confusion Matrix for validation set
y_valid_pred = model.predict(valid_gen, steps=validation_steps)
y_valid_pred_bin = (y_valid_pred > 0.5).astype(np.uint8)

conf_matrix_valid = confusion_matrix(y_valid_pred_bin.flatten(), y_valid_pred_bin.flatten())
plt.figure(figsize=(6, 4))
sns.heatmap(conf_matrix_valid, annot=True, fmt="d", cmap="Blues")
plt.title("Confusion Matrix for Validation")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()

# Confusion Matrix for test set
y_test_pred = model.predict(test_gen, steps=test_steps)
y_test_pred_bin = (y_test_pred > 0.5).astype(np.uint8)

conf_matrix_test = confusion_matrix(y_test_pred_bin.flatten(), y_test_pred_bin.flatten())
plt.figure(figsize=(6, 4))
sns.heatmap(conf_matrix_test, annot=True, fmt="d", cmap="Blues")
plt.title("Confusion Matrix for Test")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()

# Visualization: Show input image, true mask, and predicted mask for a few samples
def visualize_predictions(images, true_masks, pred_masks, title):
    for i in range(3):
        plt.figure(figsize=(12, 4))
        plt.subplot(1, 3, 1)
        plt.imshow(images[i].squeeze(), cmap='gray')
        plt.title('Original Image')
        plt.subplot(1, 3, 2)
        plt.imshow(true_masks[i].squeeze(), cmap='gray')
        plt.title('Ground Truth Mask')
        plt.subplot(1, 3, 3)
        plt.imshow(pred_masks[i].squeeze(), cmap='gray')
        plt.title('Predicted Mask')
        plt.suptitle(title)
        plt.show()

# Visualize predictions for training set
visualize_predictions(X_train, y_train, y_train_pred_bin, "Train Set Predictions")

# Visualize predictions for validation set
visualize_predictions(X_valid, y_valid, y_valid_pred_bin, "Validation Set Predictions")

# Visualize predictions for test set
visualize_predictions(X_test, y_test, y_test_pred_bin, "Test Set Predictions")



Epoch 1/20
Batch 1/298 ━━━━━━━━━━━━━━━━━━━━ 17:00:44
Accuracy: 0.8921 - Precision: 0.0296 - Recall: 0.0980 - Specificity: 0.9135 - F1: 0.0455 - Loss: 0.1255

Batch 2/298 ━━━━━━━━━━━━━━━━━━━━ 17:01:24
Accuracy: 0.9314 - Precision: 0.0148 - Recall: 0.0490 - Specificity: 0.9568 - F1: 0.0227 - Loss: 0.1055

