**UNET- Semantic Segmentation Model Training Implementation**

**Step 1:** Configure Dataset Filepaths & Load functions

In [18]:
import os
import glob

# Replace the "home/username/dev/" with your own directory path
work_dir= "/home/aazam/dev" + "/Visual-Saliency-Prediction-UNET-Model/"
# Verify image paths
if not os.path.exists(work_dir):
    print(f"Path does not exist: {work_dir}")
else:
    print(f"Path exists: {work_dir}")

    # Create lists of image and mask file paths
    image_paths = glob.glob(work_dir + 'Dataset/Images/*.png')
    mask_paths = glob.glob(work_dir + 'Dataset/Masks/*.png')
    print('Total Images in Dataset:', len(image_paths))

    # Misc Paths
    pretrain_modelpath = work_dir + "Pre-trained_Model/"
    logpath = work_dir + 'Logs'
    os.makedirs(pretrain_modelpath, exist_ok=True)
    os.makedirs(logpath, exist_ok=True)

    # Create new name for Model which will be trained
    modelpath = pretrain_modelpath + 'unetmodel_v'+str(len(glob.glob(pretrain_modelpath + '/*.keras')))+'.keras'

# Resizing images is optional, since dataset image size is 256x256x3
SIZE_X = 128
SIZE_Y = 128
n_classes = 1  # Number of classes for segmentation
# Define your parameters
batch_size = 32
epochs = 20
num_data = len(image_paths)


Path exists: /home/aazam/dev/Visual-Saliency-Prediction-UNET-Model/
Total Images in Dataset: 13426


In [20]:
import os
os.environ["SM_FRAMEWORK"] = "tf.keras" # Indicates that TensorFlow's Keras API  will be used for training
os.environ["CUDA_VISIBLE_DEVICES"]="0"  # specify which GPU(s) to be used

import tensorflow as tf
from segmentation_models import Unet
from segmentation_models.losses import bce_jaccard_loss
from segmentation_models.metrics import iou_score

# Restrict TensorFlow to only use the GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
  try:
    # Restrict TensorFlow to only use the first GPU
    tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
    # Set a memory limit
    config = tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024)
    tf.config.experimental.set_virtual_device_configuration(gpus[0], [config])

  except RuntimeError as e:
    # Visible devices must be set before GPUs have been initialized
    print(e)

# Define the dataset loading and preprocessing function
def preprocess_stage(image_path, mask_path):
    # Load and preprocess image
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, channels=3)
    image = tf.image.resize(image, (SIZE_X, SIZE_Y))
    image = tf.image.convert_image_dtype(image, tf.float32)

    # Load and preprocess mask
    mask = tf.io.read_file(mask_path)
    mask = tf.image.decode_png(mask, channels=1)
    mask = tf.image.resize(mask, (SIZE_X, SIZE_Y))
    mask = tf.math.round(mask / 255.0)

    return image, mask

# Adopted U-Net model architecture from "segmentation_models" library
# Utilized ResNet34 as the backbone, used initial weights from pretrained network ImageNet (Transfer Learning)
def unet_model(input_shape):
    model = Unet(backbone_name='resnet34', input_shape=input_shape, encoder_weights='imagenet', classes=1, activation='sigmoid')
    model.compile(optimizer='adam', loss=bce_jaccard_loss, metrics=[iou_score])
    return model


**Step 2:** Prepare & Split datasets to three batches sets (Training Set, Validation Set & Testing Set)

In [21]:
# Calculate the number of samples in each set
train_split = 0.6
validate_split = 0.2
test_split = 1-(train_split + validate_split)

num_train = int(train_split * num_data)
num_valid = int(validate_split * num_data)

# Convert your lists into TensorFlow Datasets
dataset = tf.data.Dataset.from_tensor_slices((image_paths, mask_paths))

# Shuffle the dataset
dataset = dataset.shuffle(len(image_paths), seed=42)

# Split the dataset
dataset_train = dataset.take(num_train)
dataset_remaining = dataset.skip(num_train)
dataset_valid = dataset_remaining.take(num_valid)
dataset_test = dataset_remaining.skip(num_valid)

print('Load Dataset and prepare for training:')
print(f"Total Dataset Images: {num_data}",
      f"\nTrain Split Images ({round(train_split*100)}%) : {len(dataset_train)}",
      f"\nValidate Split Images ({round(validate_split*100)}%) : {len(dataset_valid)}",
      f"\nTest Split Images ({round(test_split*100)}%) : {len(dataset_test)}"
      )

# Apply your loading and preprocessing function
dataset_train = dataset_train.map(preprocess_stage)
dataset_valid = dataset_valid.map(preprocess_stage)
dataset_test = dataset_test.map(preprocess_stage)

# Batch the data
dataset_train = dataset_train.batch(batch_size)
dataset_valid = dataset_valid.batch(batch_size)
dataset_test = dataset_test.batch(batch_size)

# Prefetch for performance
dataset_train = dataset_train.prefetch(buffer_size=tf.data.AUTOTUNE)
dataset_valid = dataset_valid.prefetch(buffer_size=tf.data.AUTOTUNE)
dataset_test = dataset_test.prefetch(buffer_size=tf.data.AUTOTUNE)

Load Dataset and prepare for training:
Total Dataset Images: 13426 
Train Split Images (60%) : 8055 
Validate Split Images (20%) : 2685 
Test Split Images (20%) : 2686


**Step 3:** Train & Validate the Unet Model

In [23]:
import matplotlib.pyplot as plt
from tensorflow import keras

# Define model
input_shape = (SIZE_X, SIZE_Y, 3)  # Adjust based on the size of your images
model = unet_model(input_shape)

# Model Checkpoint - save model after every epochs
# Save Best only - latest best model will not be overwritten
checkpointer = tf.keras.callbacks.ModelCheckpoint(modelpath,verbose=1,save_best_only=True)

# Early Stopping - TensorBoard Callbacks
callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=3,monitor='val_loss'),
    tf.keras.callbacks.TensorBoard(log_dir= logpath),
    checkpointer
]

# Train the model
history = model.fit(dataset_train, 
                    validation_data = dataset_valid, 
                    epochs = epochs, 
                    callbacks = callbacks)

# Plot training and validation accuracy
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['iou_score'], label='Training IoU')
plt.plot(history.history['val_iou_score'], label='Validation IoU')
plt.title('Training and Validation IoU')
plt.xlabel('Epoch')
plt.ylabel('IoU')
plt.legend()
# Plot training and validation loss
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.tight_layout()
plt.show()
print(f"Trained Model has been stored:{modelpath}")

Epoch 1/20


2024-06-04 23:28:39.213213: E tensorflow/core/util/util.cc:131] oneDNN supports DT_INT32 only on platforms with AVX-512. Falling back to the default Eigen-based implementation if present.


[1m136/252[0m [32m━━━━━━━━━━[0m[37m━━━━━━━━━━[0m [1m2:56[0m 2s/step - iou_score: 0.3945 - loss: 0.9082

KeyboardInterrupt: 

**Step 4:** Evaluate the trained model

In [None]:
import os

from keras.models import load_model
from keras.utils import plot_model
# Directory containing your model files
model_directory = drive_dir + 'Trained Models/'
# List all files in the directory
model_files = [f for f in os.listdir(model_directory) if f.endswith(".h5")]
# Display the list of model files
print("Available model files:")
for i, model_file in enumerate(model_files, start=1):
    print(f"{i}. {model_file}")
# Ask the user to select a model file
try:
    selected_index = int(input("\nEnter the number corresponding to the model file you want to select: ")) - 1
    selected_model = model_files[selected_index]
    selected_model_path = os.path.join(model_directory, selected_model)
    print(f"\nSelected model: {selected_model_path}")
except (ValueError, IndexError):
    print("Invalid selection. Please enter a valid number.")

# Load the Model
model = keras.models.load_model(selected_model_path, compile=False)

In [None]:
import matplotlib.pyplot as plt
from PIL import Image

# Function to create a mask from predicted values
def create_mask(pred_mask):
    pred_mask = tf.argmax(pred_mask, axis=-1)
    pred_mask = pred_mask[..., tf.newaxis]
    return pred_mask[0]

rows = 5
dataset_samples = dataset_test

# Set up the subplots
fig, axs = plt.subplots(rows, 3, figsize=(9, rows*3))

for i, (image, mask, filepath) in enumerate(dataset_samples):
    # Assuming `model` is already defined and trained
    pred_mask = model.predict(image)
    pred_mask_np = create_mask(pred_mask).numpy()

    image_path = filepath[0].numpy().decode('utf-8')
    # Load the image using PIL (Python Imaging Library)
    img = Image.open(image_path)
    target_size=(SIZE_X,SIZE_Y)
    # Resize the image
    img = img.resize(target_size)
    row = i % rows
    axs[row, 0].imshow(img)
    axs[row, 0].set_title(f'Image {i + 1}')
    axs[row, 0].axis('off')
    axs[row, 1].imshow(mask[0],cmap='gray')
    axs[row, 1].set_title('Ground Truth Mask')
    axs[row, 1].axis('off')
    axs[row, 2].imshow(pred_mask[0],cmap='gray')
    axs[row, 2].set_title('Predicted Mask')
    axs[row, 2].axis('off')

plt.show()


In [None]:
import numpy as np
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import tensorflow as tf

# Function to create a confusion matrix
def create_confusion_matrix(model, dataset):
    true_labels = []
    predicted_labels = []
    for images, masks, _ in dataset:
        # Get model predictions
        predictions = model.predict(images)
        # Append flattened masks and predictions to the lists
        true_labels.append(tf.reshape(masks, [-1]).numpy())
        predicted_labels.append(tf.reshape(predictions, [-1]).numpy())
    # Concatenate the lists to create NumPy arrays
    true_labels = np.concatenate(true_labels)
    predicted_labels = np.concatenate(predicted_labels)
    # Use vectorized operations to create confusion matrix
    threshold = 0.5
    true_labels_binary = (true_labels > threshold).astype(int)
    predicted_labels_binary = (predicted_labels > threshold).astype(int)
    cm = confusion_matrix(true_labels_binary, predicted_labels_binary)
    return cm
# Get confusion matrix for dataset_test
confusion_mat = create_confusion_matrix(model, dataset_test)

In [None]:
# Plot the confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_mat, annot=True, fmt='d', cmap='Blues', xticklabels=['0', '1'], yticklabels=['0', '1'])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()
# Extract true positives, true negatives, false positives, and false negatives from the confusion matrix
tn, fp, fn, tp = confusion_mat.ravel()
# Calculate metrics
accuracy = (tp + tn) / (tp + tn + fp + fn)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0  # To avoid division by zero
recall = tp / (tp + fn) if (tp + fn) > 0 else 0  # To avoid division by zero
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0  # To avoid division by zero
print('\nTrained Model Results:')
print(f"Accuracy: {round(accuracy*100)}%")
print(f"Precision:  {round(precision*100)}%")
print(f"Recall:  {round(recall*100)}%")
print(f"F1 Score: {round(f1*100)}%")