Pix2Pix Inference

In [43]:
import os
import numpy as np
from PIL import Image
import tensorflow as tf
import tifffile
from sklearn.metrics import jaccard_score
import imageio

# -----------------------------
# Load and Preprocess Function
# -----------------------------

def load_image(path):
    """
    Load and preprocess an image from file path.
    Supports TIFF, PNG, JPG, and JPEG formats.
    """
    if path.lower().endswith(('.tif', '.tiff')):
        img = tifffile.imread(path)  # Use tifffile for reading .tif images
        # Convert the image to uint8 if it's in a different type
        if img.dtype != 'uint8':
            img = (img / img.max() * 255).astype(np.uint8)  # Normalize to 8-bit
        img = Image.fromarray(img)  # Convert to a PIL image
    else:
        img = Image.open(path)  # Use PIL for other formats
    
    img = img.resize((256, 256)).convert('L')  # Convert to grayscale
    return np.array(img) / 255.0  # Normalize to [0, 1]

def get_stacked_images(before_dir, after_dir):
    """
    Load, preprocess, and stack before, after, and the difference as (256, 256, 3).
    """
    before_files = sorted(os.listdir(before_dir))
    after_files = sorted(os.listdir(after_dir))
    
    inputs = []
    filenames = []

    for bfile, afile in zip(before_files, after_files):
        # Only process files with valid image extensions
        if bfile.lower().endswith(('.png', '.jpg', '.jpeg', '.tif', '.tiff')) and afile.lower().endswith(('.png', '.jpg', '.jpeg', '.tif', '.tiff')):
            before = load_image(os.path.join(before_dir, bfile))
            after = load_image(os.path.join(after_dir, afile))
            
            # Compute the difference between the before and after images
            diff_channel = np.expand_dims(before - after, axis=-1)  # shape: (256, 256, 1)
            
            # Stack before, after, and diff as the 3-channel input
            stacked = np.concatenate([np.expand_dims(before, axis=-1), np.expand_dims(after, axis=-1), diff_channel], axis=-1)  # (256, 256, 3)
            inputs.append(stacked)
            filenames.append(os.path.splitext(bfile)[0])  # use base name

    return np.array(inputs), filenames

def get_target_images(target_dir):
    """
    Load and preprocess target mask images as (256, 256, 1).
    """
    target_files = sorted(os.listdir(target_dir))
    targets = []
    
    for tfile in target_files:
        if tfile.lower().endswith(('.png', '.jpg', '.jpeg', '.tif', '.tiff')):
            target = load_image(os.path.join(target_dir, tfile))
            targets.append(target[..., np.newaxis])  # (256, 256, 1)
    
    return np.array(targets)

# -----------------------------
# IoU Calculation Function
# -----------------------------

def compute_iou(pred, target):
    """
    Compute the Intersection over Union (IoU) between the predicted and target masks.
    """
    pred_bin = (pred > 0.5).astype(np.uint8).flatten()
    target_bin = target.astype(np.uint8).flatten()
    return jaccard_score(target_bin, pred_bin)

# -----------------------------
# Run Inference and Save Masks
# -----------------------------

def run_inference(model_path, before_dir, after_dir, target_dir, output_dir):
    """
    Run inference on the images in the specified directories, generate predicted masks,
    and compare them to the actual target masks to compute the IoU.
    """
    print("Loading model...")
    model = tf.keras.models.load_model(model_path)

    print("Reading input images...")
    input_images, filenames = get_stacked_images(before_dir, after_dir)
    print(f"Loaded {len(filenames)} input images.")

    print("Reading target masks...")
    target_images = get_target_images(target_dir)
    print(f"Loaded {len(target_images)} target masks.")

    print("Running inference...")
    predictions = model.predict(input_images, verbose=1)

    print("Calculating IoU and saving masks...")
    os.makedirs(output_dir, exist_ok=True)
    iou_scores = []

    for i, (pred, target, name) in enumerate(zip(predictions, target_images, filenames)):
        # Calculate IoU
        iou = compute_iou(pred, target)
        iou_scores.append(iou)

        # Convert predicted mask to image and save
        mask_img = (pred.squeeze() * 255).astype(np.uint8)
        imageio.imwrite(os.path.join(output_dir, f"{name}_pred_mask.png"), mask_img)

        # Print IoU for this image
        print(f"{name} | IoU: {iou:.4f}")

    print(f"Average IoU: {np.mean(iou_scores):.4f}")
    print(f"Masks and IoU scores saved to: {output_dir}")




In [None]:
# -----------------------------
# Entry Point
# -----------------------------

if __name__ == "__main__":
    model_path = r"D:\Projects\Flood_Mapping\pix2pix_final.keras"  # Replace with your model path
    before_dir = "test_after"  # Folder of before images
    after_dir = "tesr_before"    # Folder of after images
    target_dir = "target_mask"    # Folder of target masks
    output_dir = "predicted_masks_pix2pix"  # Folder to save predicted masks

    run_inference(model_path, before_dir, after_dir, target_dir, output_dir)  # Run inference and compute IoU

Loading model...
Reading input images...
Loaded 9 input images.
Reading target masks...
Loaded 9 target masks.
Running inference...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
Calculating IoU and saving masks...
SEN_Gorakhpur_20170825_0051 | IoU: 0.4671
SEN_Gorakhpur_20170825_0052 | IoU: 0.4348
SEN_Gorakhpur_20170825_0053 | IoU: 0.5389
SEN_Gorakhpur_20170825_0054 | IoU: 0.3428
SEN_Gorakhpur_20170825_0055 | IoU: 0.3844
SEN_Gorakhpur_20170825_0056 | IoU: 0.3167
SEN_Gorakhpur_20170825_0057 | IoU: 0.4023
SEN_Gorakhpur_20170825_0058 | IoU: 0.5167
SEN_Gorakhpur_20170825_0059 | IoU: 0.5149
Average IoU: 0.4354
Masks and IoU scores saved to: output_masks


Unet++ Inference

In [None]:
def dice_loss(y_true, y_pred, smooth=1e-6):
    y_true = tf.reshape(y_true, [-1])
    y_pred = tf.reshape(y_pred, [-1])
    intersection = tf.reduce_sum(y_true * y_pred)
    return 1 - (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

# Combined loss function (BCE + Dice)
def combo_loss(y_true, y_pred):
    bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
    return bce + dice_loss(y_true, y_pred)

# Function to calculate Intersection over Union (IoU)
def compute_iou(pred, target):
    """
    Compute the Intersection over Union (IoU) between the predicted and target masks.
    """
    pred_bin = (pred > 0.5).astype(np.uint8).flatten()
    target_bin = target.astype(np.uint8).flatten()
    return jaccard_score(target_bin, pred_bin)


# Inference function
def run_inference(model_path, before_dir, after_dir, target_dir, output_dir):
    print("Loading model...")

    # Load the model with the custom loss function
    model = tf.keras.models.load_model(model_path, custom_objects={"combo_loss": combo_loss})

    print("Reading input images...")
    input_images, filenames = get_stacked_images(before_dir, after_dir)

    print(f"Loaded {len(filenames)} image pairs for inference.")

    # Get the target masks
    target_masks = []
    target_files = sorted(os.listdir(target_dir))
    for file in target_files:
        if file.lower().endswith(('.png', '.jpg', '.jpeg', '.tif', '.tiff')):
            mask = load_image(os.path.join(target_dir, file))
            target_masks.append(mask)
    target_masks = np.array(target_masks)

    # Run inference
    predictions = model.predict(input_images, verbose=1)

    # Convert predictions to binary masks
    binary_masks = (predictions > 0.5).astype(np.uint8)

    # Calculate IoU for each image
    ious = []
    for pred_mask, true_mask in zip(binary_masks, target_masks):
        iou = compute_iou(pred_mask, true_mask)
        ious.append(iou)

    # Save predicted masks
    os.makedirs(output_dir, exist_ok=True)
    for mask, name in zip(binary_masks, filenames):
        mask_img = (mask.squeeze() * 255).astype(np.uint8)
        imageio.imwrite(os.path.join(output_dir, f"{name}_pred_mask.png"), mask_img)

    # Print IoU scores for each image
    for i, iou in enumerate(ious):
        print(f"IoU for {filenames[i]}: {iou:.4f}")

    # Print average IoU
    print(f"Average IoU: {np.mean(ious):.4f}")
    print(f"Masks saved to: {output_dir}")

# Entry Point
if __name__ == "__main__":
    model_path = r"D:\Projects\Flood_Mapping\unetpp_final.keras"  # Replace with your model path
    before_dir = "test_after"  # Folder of before images
    after_dir = "tesr_before"    # Folder of after images
    target_dir = "target_mask"    # Folder of target masks
    output_dir = "predicted_masks_unetplusplus"  # Folder to save predicted masks

    run_inference(model_path, before_dir, after_dir, target_dir, output_dir)

Loading model...
Reading input images...
Loaded 9 image pairs for inference.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 11s/step
IoU for SEN_Gorakhpur_20170825_0051: 0.7563
IoU for SEN_Gorakhpur_20170825_0052: 0.7531
IoU for SEN_Gorakhpur_20170825_0053: 0.7346
IoU for SEN_Gorakhpur_20170825_0054: 0.7110
IoU for SEN_Gorakhpur_20170825_0055: 0.7517
IoU for SEN_Gorakhpur_20170825_0056: 0.8213
IoU for SEN_Gorakhpur_20170825_0057: 0.7667
IoU for SEN_Gorakhpur_20170825_0058: 0.7361
IoU for SEN_Gorakhpur_20170825_0059: 0.7393
Average IoU: 0.7522
Masks saved to: output_masks
