<a href="https://colab.research.google.com/github/Osondu-ifunanya/River-course-change-detection-using-DL-based-image-segmentation/blob/main/River_Course_Change.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

import os
import numpy as np
import cv2
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import layers, models

# -------------------------------
# 1. Utility: Generate synthetic river image + mask
# -------------------------------
def generate_river_image(img_h=128, img_w=128, n_curves=1, width_range=(6,14), jitter=10):
    """
    Create a synthetic grayscale image and binary river mask.
    River drawn as a smooth polyline (Beziers / splines approximated by cv2.polylines) with thickness.
    """
    canvas = np.zeros((img_h, img_w), dtype=np.uint8)
    mask = np.zeros((img_h, img_w), dtype=np.uint8)

    for _ in range(n_curves):
        # Generate control points across the image with random vertical/horizontal variation
        n_pts = np.random.randint(4, 7)
        xs = np.linspace(0, img_w - 1, n_pts).astype(np.int32)
        ys = (img_h * (0.2 + 0.6 * np.random.rand(n_pts))).astype(np.int32)
        # add jitter
        ys = np.clip(ys + np.random.randint(-jitter, jitter, size=n_pts), 0, img_h-1)
        pts = np.stack([xs, ys], axis=1)

        # Draw polyline on mask with variable thickness
        thickness = np.random.randint(width_range[0], width_range[1])
        cv2.polylines(mask, [pts], isClosed=False, color=255, thickness=thickness)

        # Fill a slightly wider polygon to make river continuous
        for t in range(-thickness//2, thickness//2 + 1):
            cv2.polylines(mask, [pts + np.array([0, t])], isClosed=False, color=255, thickness=1)

    # Smooth mask with morphological closing to make river contiguous
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=2)
    mask = (mask > 0).astype(np.uint8)

    # Create a background texture for the image (terrain / vegetation)
    terrain = (cv2.randn(np.zeros((img_h, img_w), dtype=np.float32), 0.5, 0.12) * 255).astype(np.uint8)
    terrain = cv2.GaussianBlur(terrain, (7,7), 0)
    terrain = cv2.normalize(terrain, None, 20, 120, cv2.NORM_MINMAX)

    # Make water darker or lighter depending on a random factor
    water_val = np.random.randint(20, 70)  # water reflectance
    image = terrain.copy()
    image[mask == 1] = water_val + np.random.randint(-8, 8, size=mask.sum())

    # Add some speckle/noise and slight brightness variation
    noise = (np.random.randn(img_h, img_w) * 6).astype(np.int16)
    image = np.clip(image.astype(np.int16) + noise, 0, 255).astype(np.uint8)

    return image, mask

# -------------------------------
# 2. Create synthetic before/after dataset
# -------------------------------
def create_dataset(n_samples=200, img_h=128, img_w=128, change_prob=0.3):
    """
    Generate synthetic pairs (T1 image, T2 image) and masks (T1 mask, T2 mask).
    change_prob controls probability that a pair has a river change between T1 and T2.
    """
    imgs_t1 = []
    imgs_t2 = []
    masks_t1 = []
    masks_t2 = []
    for i in range(n_samples):
        img1, mask1 = generate_river_image(img_h, img_w)
        # Decide whether to modify river for T2
        if np.random.rand() < change_prob:
            # create a modified river by regenerating with shifted control points
            img2, mask2 = generate_river_image(img_h, img_w)
            # to make changes realistic, combine parts: sometimes add or remove sections
            # randomly toggle small regions
            change_mask = np.zeros_like(mask1)
            # create some small random blobs of change
            for _ in range(np.random.randint(1,4)):
                cx = np.random.randint(10, img_w-10)
                cy = np.random.randint(10, img_h-10)
                r = np.random.randint(5, 20)
                cv2.circle(change_mask, (cx, cy), r, 1, -1)
            # mix masks
            mask2 = np.where(change_mask==1, mask2, mask1)  # keep mask2 in changed regions, else mask1
            # regenerate image2 by painting water where mask2 ==1
            img2 = img1.copy()
            img2[mask2 == 1] = np.random.randint(20, 70)  # water tone
            # add small noise
            img2 = np.clip(img2.astype(int) + np.random.randint(-8,8, img2.shape), 0, 255).astype(np.uint8)
        else:
            # no change
            img2 = img1.copy()
            mask2 = mask1.copy()

        imgs_t1.append(img1)
        imgs_t2.append(img2)
        masks_t1.append(mask1)
        masks_t2.append(mask2)

    return (np.array(imgs_t1), np.array(imgs_t2),
            np.array(masks_t1), np.array(masks_t2))

# Create dataset
IMG_H, IMG_W = 128, 128
N_PAIRS = 600
imgs_t1, imgs_t2, masks_t1, masks_t2 = create_dataset(N_PAIRS, IMG_H, IMG_W, change_prob=0.4)
print("Generated dataset:", imgs_t1.shape, masks_t1.shape)

# -------------------------------
# 3. Prepare data for U-Net (combine T1 and T2 as separate samples)
# -------------------------------
# We'll train a simple U-Net to segment rivers on single-date images using combined (T1+T2) samples.
X = np.concatenate([imgs_t1, imgs_t2], axis=0)[..., np.newaxis] / 255.0  # shape (2N, H, W, 1)
Y = np.concatenate([masks_t1, masks_t2], axis=0)[..., np.newaxis].astype(np.float32)

# Split into train/val/test
X_train, X_temp, Y_train, Y_temp = train_test_split(X, Y, test_size=0.3, random_state=42)
X_val, X_test, Y_val, Y_test = train_test_split(X_temp, Y_temp, test_size=0.5, random_state=42)

print("Train/Val/Test shapes:", X_train.shape, X_val.shape, X_test.shape)

# -------------------------------
# 4. Define a simple U-Net model
# -------------------------------
def unet(input_shape=(IMG_H, IMG_W, 1)):
    inputs = layers.Input(shape=input_shape)

    # Encoder
    c1 = layers.Conv2D(32, 3, activation='relu', padding='same')(inputs)
    c1 = layers.Conv2D(32, 3, activation='relu', padding='same')(c1)
    p1 = layers.MaxPool2D((2,2))(c1)

    c2 = layers.Conv2D(64, 3, activation='relu', padding='same')(p1)
    c2 = layers.Conv2D(64, 3, activation='relu', padding='same')(c2)
    p2 = layers.MaxPool2D((2,2))(c2)

    c3 = layers.Conv2D(128, 3, activation='relu', padding='same')(p2)
    c3 = layers.Conv2D(128, 3, activation='relu', padding='same')(c3)
    p3 = layers.MaxPool2D((2,2))(c3)

    # Bottleneck
    b = layers.Conv2D(256, 3, activation='relu', padding='same')(p3)
    b = layers.Conv2D(256, 3, activation='relu', padding='same')(b)

    # Decoder
    u1 = layers.UpSampling2D((2,2))(b)
    u1 = layers.concatenate([u1, c3])
    c4 = layers.Conv2D(128, 3, activation='relu', padding='same')(u1)
    c4 = layers.Conv2D(128, 3, activation='relu', padding='same')(c4)

    u2 = layers.UpSampling2D((2,2))(c4)
    u2 = layers.concatenate([u2, c2])
    c5 = layers.Conv2D(64, 3, activation='relu', padding='same')(u2)
    c5 = layers.Conv2D(64, 3, activation='relu', padding='same')(c5)

    u3 = layers.UpSampling2D((2,2))(c5)
    u3 = layers.concatenate([u3, c1])
    c6 = layers.Conv2D(32, 3, activation='relu', padding='same')(u3)
    c6 = layers.Conv2D(32, 3, activation='relu', padding='same')(c6)

    outputs = layers.Conv2D(1, 1, activation='sigmoid')(c6)

    model = models.Model(inputs, outputs)
    return model

model = unet(input_shape=(IMG_H, IMG_W, 1))
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

# -------------------------------
# 5. Train the model
# -------------------------------
EPOCHS = 12
BATCH = 8

callbacks = [
    tf.keras.callbacks.ModelCheckpoint("unet_river_seg.h5", save_best_only=True, monitor='val_loss'),
    tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=4, restore_best_weights=True)
]

history = model.fit(
    X_train, Y_train,
    validation_data=(X_val, Y_val),
    epochs=EPOCHS,
    batch_size=BATCH,
    callbacks=callbacks
)

# -------------------------------
# 6. Predict masks for T1 and T2 pairs and compute change map
# -------------------------------
# Predict on test pairs (we'll use original pairing order)
n_pairs = imgs_t1.shape[0]
# Predict masks for T1 images that are in the test set
# We need indices of test samples among combined array; easier: predict full arrays and then map back
X_all = np.concatenate([imgs_t1, imgs_t2], axis=0)[..., np.newaxis] / 255.0
preds_all = model.predict(X_all, batch_size=16)
preds_all_bin = (preds_all[...,0] > 0.5).astype(np.uint8)

# Split back
preds_t1 = preds_all_bin[:n_pairs]
preds_t2 = preds_all_bin[n_pairs:]

# For evaluation, compute mask-level IoU on test subset
def iou(a, b):
    a = a.astype(bool)
    b = b.astype(bool)
    inter = np.logical_and(a, b).sum()
    union = np.logical_or(a, b).sum()
    return inter / union if union > 0 else 1.0

# Evaluate over the test subset indices
# map test indices: we split combined array into train/val/test earlier; find indices for test samples corresponding to t1/t2
# Simpler: compute IoU across entire dataset for demonstration and also per-pair change detection
ious_t1 = []
ious_t2 = []
for i in range(n_pairs):
    iou1 = iou(preds_t1[i], masks_t1[i])
    iou2 = iou(preds_t2[i], masks_t2[i])
    ious_t1.append(iou1)
    ious_t2.append(iou2)

print(f"Mean IoU T1 (all pairs): {np.mean(ious_t1):.3f}")
print(f"Mean IoU T2 (all pairs): {np.mean(ious_t2):.3f}")

# Compute change maps and compare with true change
change_pred = np.zeros((n_pairs, IMG_H, IMG_W), dtype=np.int8)  # -1 loss, 0 no-change, 1 gain
change_true = np.zeros_like(change_pred)

for i in range(n_pairs):
    # change_pred: 1 if water appeared in T2 but not in T1 (gain), -1 if lost
    p1 = preds_t1[i]
    p2 = preds_t2[i]
    change_pred[i] = (p2.astype(int) - p1.astype(int))
    # true change: based on masks
    m1 = masks_t1[i]
    m2 = masks_t2[i]
    change_true[i] = (m2.astype(int) - m1.astype(int))

# Summary stats
# fraction of pixels with predicted change vs true change
frac_true_change = np.mean(change_true != 0)
frac_pred_change = np.mean(change_pred != 0)
print(f"Fraction true-change pixels: {frac_true_change:.4f}")
print(f"Fraction predicted-change pixels: {frac_pred_change:.4f}")

# Compute per-pair change detection accuracy (pixel-wise)
from sklearn.metrics import precision_score, recall_score, f1_score

# Flatten pixel-wise arrays
flat_true = (change_true != 0).astype(int).flatten()
flat_pred = (change_pred != 0).astype(int).flatten()

print("Change detection (any change) - precision/recall/f1:")
print(precision_score(flat_true, flat_pred), recall_score(flat_true, flat_pred), f1_score(flat_true, flat_pred))

# -------------------------------
# 7. Visualize some examples (before, after, true change, predicted change)
# -------------------------------
def plot_pair(idx):
    fig, axes = plt.subplots(2, 3, figsize=(10,7))
    axes[0,0].imshow(imgs_t1[idx], cmap='gray'); axes[0,0].set_title('T1 Image')
    axes[0,1].imshow(masks_t1[idx], cmap='Blues'); axes[0,1].set_title('T1 True Mask')
    axes[0,2].imshow(preds_t1[idx], cmap='Blues'); axes[0,2].set_title('T1 Pred Mask')
    axes[1,0].imshow(imgs_t2[idx], cmap='gray'); axes[1,0].set_title('T2 Image')
    axes[1,1].imshow(masks_t2[idx], cmap='Blues'); axes[1,1].set_title('T2 True Mask')
    axes[1,2].imshow(preds_t2[idx], cmap='Blues'); axes[1,2].set_title('T2 Pred Mask')
    for ax in axes.flatten():
        ax.axis('off')
    plt.suptitle(f'Pair {idx} - IoU T1={iou(preds_t1[idx], masks_t1[idx]):.2f}, IoU T2={iou(preds_t2[idx], masks_t2[idx]):.2f}')
    plt.tight_layout()
    plt.show()

# show a few pairs
for idx in [2, 10, 25]:
    plot_pair(idx)

# show predicted vs true change for a pair
def plot_change(idx):
    plt.figure(figsize=(8,4))
    plt.subplot(1,2,1); plt.imshow(change_true[idx], cmap='bwr', vmin=-1, vmax=1); plt.title('True Change (-1 loss,1 gain)')
    plt.colorbar()
    plt.subplot(1,2,2); plt.imshow(change_pred[idx], cmap='bwr', vmin=-1, vmax=1); plt.title('Predicted Change (-1 loss,1 gain)')
    plt.colorbar()
    plt.suptitle(f'Change maps for pair {idx}')
    plt.show()

plot_change(10)

# -------------------------------
# 8. Export per-pair change summary to CSV
# -------------------------------
summary = []
for i in range(n_pairs):
    n_true_loss = np.sum(change_true[i] == -1)
    n_true_gain = np.sum(change_true[i] == 1)
    n_pred_loss = np.sum(change_pred[i] == -1)
    n_pred_gain = np.sum(change_pred[i] == 1)
    summary.append({
        'pair_id': i,
        'true_loss_pixels': int(n_true_loss),
        'true_gain_pixels': int(n_true_gain),
        'pred_loss_pixels': int(n_pred_loss),
        'pred_gain_pixels': int(n_pred_gain),
        'iou_t1': float(iou(preds_t1[i], masks_t1[i])),
        'iou_t2': float(iou(preds_t2[i], masks_t2[i]))
    })

df_summary = pd.DataFrame(summary)
csv_path = "river_change_summary.csv"
df_summary.to_csv(csv_path, index=False)
print(f"Exported summary to: {csv_path}")
