In [None]:
!apt-get install poppler-utils
!pip install pdf2image

Layouts: https://github.com/PrakharSaxena144/Minor-Project/blob/main/input%20data%20layouts_5.pdf

Dataset for Unet: https://drive.google.com/drive/folders/1bCpjt5gj1pcKABma2espjTPzSGuPFwtA?usp=drive_link

## Basic Codes

### Image Rotator

In [None]:
from PIL import Image

def rotate_image_pil(image_path, angle, output_path="rotated_image.png"):
    img = Image.open(image_path)
    rotated = img.rotate(angle, expand=True)  # expand keeps full image
    rotated.save(output_path)
    print(f"Rotated image saved at: {output_path}")

# Example
rotate_image_pil("/content/edges.png", -40, "rotated_cw.png")

# With angle = 90 â†’ rotates counterclockwise
# With angle = -90 â†’ rotates clockwise

### Selectable Text Remover

In [None]:
import fitz  # PyMuPDF
import os

# --- Configuration ---
INPUT_PDF = '/content/input data layouts_5.pdf'
OUTPUT_PDF = 'map_no_selectable_text.pdf'

# --- Main Script ---

def remove_selectable_text(input_path, output_path):
    """
    Opens a PDF, finds all selectable text on each page, redacts it,
    and saves the result to a new PDF.
    """
    if not os.path.exists(input_path):
        print(f"Error: Input file not found at '{input_path}'")
        return

    print(f"Opening '{input_path}'...")
    # Open the PDF
    doc = fitz.open(input_path)

    # Iterate through each page of the document
    for page_num, page in enumerate(doc):
        print(f"  - Processing page {page_num + 1}/{len(doc)}...")

        # 1. Find all text "words" on the page.
        # This returns a list of items, where each item has the
        # coordinates (x0, y0, x1, y1) and the text itself.
        words = page.get_text("words")

        # 2. Add a redaction annotation for each word's bounding box.
        # A redaction securely removes content from the specified area.
        for word in words:
            # The first 4 elements of the 'word' item are its coordinates
            bbox = fitz.Rect(word[:4])
            page.add_redact_annot(bbox)

        # 3. Apply all the redactions on the page.
        # This step permanently removes the content.
        page.apply_redactions()

    # Save the modified document to the output path
    # 'garbage=4' cleans up unused objects, 'deflate=True' compresses the file
    print(f"Saving cleaned PDF to '{output_path}'...")
    doc.save(output_path, garbage=4, deflate=True)
    doc.close()

    print("Done! All selectable text has been removed. âœ¨")

# Run the main function
remove_selectable_text(INPUT_PDF, OUTPUT_PDF)

# SSIM
## (Structural Similarity Index Measure)

Used for comparing 2 images.

Things it do:

Tiling,Tiling comparison, Marking as red (Greater than a certain threshold), Changed Area Overlay

Link:
https://colab.research.google.com/drive/1uxKlL402nxPk1Se2jkFoEcyIM0MMzW6g#scrollTo=EYfy32RxDGx-

###### Without Overlay (Overlay means showing regions that are marked as different)

In [None]:
# Tile-based hybrid SSIM + intensity change detection
# Red-transparent overlay on the actual satellite image (presentation-quality)
#
# Requirements:
# pip install opencv-python scikit-image matplotlib numpy Pillow

import os
import math
import cv2
import numpy as np
import matplotlib.pyplot as plt
from skimage.metrics import structural_similarity as ssim
from PIL import Image

# -----------------------------
# User-configurable parameters
# -----------------------------
SATELLITE_PATH = "/content/color_segmented.png"      # actual satellite image (RGB/BGR)
LAYOUT_PATH    = "/content/grayscale_edges/edges_page_7.png"  # planned layout image
TILE_SIZE      = 32      # user tile size (changeable)
HYBRID_THRESH  = 0.3      # medium sensitivity
SSIM_WEIGHT    = 0.5      # weight for (1 - ssim)
INT_WEIGHT     = 0.5      # weight for normalized intensity diff
OUTPUT_DIR     = "/content/output_results"
SAVE_TILE_COMPARISONS = True  # save every tile side-by-side
MAX_SAMPLE_TILES_DISPLAY = 8  # how many tile comparisons to show inline

# -----------------------------
# Helpers
# -----------------------------
def ensure_dir(d):
    os.makedirs(d, exist_ok=True)

def load_image_bgr(path):
    img = cv2.imread(path, cv2.IMREAD_COLOR)
    if img is None:
        raise FileNotFoundError(f"Could not read image: {path}")
    return img

def pad_to_multiple(img, tile_size, pad_color=(0,0,0)):
    h, w = img.shape[:2]
    pad_h = (math.ceil(h / tile_size) * tile_size) - h
    pad_w = (math.ceil(w / tile_size) * tile_size) - w
    if pad_h == 0 and pad_w == 0:
        return img, (0,0)
    # bottom and right padding
    padded = cv2.copyMakeBorder(img, 0, pad_h, 0, pad_w, cv2.BORDER_REFLECT)
    return padded, (pad_h, pad_w)

def create_tile_coords(img_shape, tile_size):
    h, w = img_shape[:2]
    coords = []
    for y in range(0, h, tile_size):
        for x in range(0, w, tile_size):
            coords.append((x, y))
    return coords

def compute_tile_stats(tileA, tileB):
    # Convert to gray
    tAg = cv2.cvtColor(tileA, cv2.COLOR_BGR2GRAY) if tileA.ndim == 3 else tileA.copy()
    tBg = cv2.cvtColor(tileB, cv2.COLOR_BGR2GRAY) if tileB.ndim == 3 else tileB.copy()

    # Ensure same size
    if tAg.shape != tBg.shape:
        tBg = cv2.resize(tBg, (tAg.shape[1], tAg.shape[0]), interpolation=cv2.INTER_AREA)

    # SSIM: choose win_size <= min(dim) and odd and >=3 (skimage requires >=3, default 7)
    min_side = min(tAg.shape[0], tAg.shape[1])
    win_size = min(7, min_side)
    if win_size < 3:
        # fallback: if extremely small (shouldn't happen because we pad), just return neutral
        ssim_val = 1.0
    else:
        if win_size % 2 == 0:
            win_size -= 1
        try:
            ssim_val = ssim(tAg, tBg, win_size=win_size)
        except Exception:
            ssim_val = ssim(tAg, tBg, data_range=tAg.max()-tAg.min() if tAg.max()!=tAg.min() else 1)

    # Intensity difference normalized [0,1]
    absdiff = np.abs(tAg.astype(np.float32) - tBg.astype(np.float32))
    mean_diff = absdiff.mean()  # 0..255
    norm_diff = mean_diff / 255.0

    return ssim_val, norm_diff

# -----------------------------
# Main pipeline
# -----------------------------
def run_tile_change_detection(sat_path, layout_path, tile_size=TILE_SIZE,
                              hybrid_thresh=HYBRID_THRESH,
                              ssim_w=SSIM_WEIGHT, int_w=INT_WEIGHT,
                              out_dir=OUTPUT_DIR, save_tile_comparisons=SAVE_TILE_COMPARISONS):
    ensure_dir(out_dir)
    ensure_dir(os.path.join(out_dir, "tiles_comparisons"))

    # Step 1: Load images
    sat = load_image_bgr(sat_path)
    layout = load_image_bgr(layout_path)
    print(f"Loaded sat: {sat.shape}, layout: {layout.shape}")

    # Step 2: Align by resizing layout to satellite shape (preserve sat resolution)
    layout_resized = cv2.resize(layout, (sat.shape[1], sat.shape[0]), interpolation=cv2.INTER_AREA)
    print(f"Aligned layout to satellite size: {sat.shape}")

    # Save intermediate step visualizations
    cv2.imwrite(os.path.join(out_dir, "step1_satellite.png"), sat)
    cv2.imwrite(os.path.join(out_dir, "step1_layout_aligned.png"), layout_resized)
    print("Saved: step1_satellite.png, step1_layout_aligned.png")

    # Step 3: Pad images so dimensions divisible by tile_size (avoid tiny edge tiles)
    sat_padded, (pad_h, pad_w) = pad_to_multiple(sat, tile_size)
    layout_padded, _ = pad_to_multiple(layout_resized, tile_size)
    print(f"Padded images -> new shape: {sat_padded.shape} (pad_h={pad_h}, pad_w={pad_w})")

    cv2.imwrite(os.path.join(out_dir, "step2_satellite_padded.png"), sat_padded)
    cv2.imwrite(os.path.join(out_dir, "step2_layout_padded.png"), layout_padded)
    print("Saved: step2_satellite_padded.png, step2_layout_padded.png")

    # Step 4: Iterate tiles and compute hybrid change score
    coords = create_tile_coords(sat_padded.shape, tile_size)
    heat_values = np.zeros((sat_padded.shape[0] // tile_size, sat_padded.shape[1] // tile_size), dtype=np.float32)
    change_mask_tiles = np.zeros_like(heat_values, dtype=np.uint8)

    comparisons_for_display = []  # store small set for display

    for idx, (x, y) in enumerate(coords):
        tx = x; ty = y
        tileA = sat_padded[ty:ty+tile_size, tx:tx+tile_size]
        tileB = layout_padded[ty:ty+tile_size, tx:tx+tile_size]

        ssim_val, norm_diff = compute_tile_stats(tileA, tileB)

        # hybrid score: higher => more change
        hybrid_score = ssim_w * (1.0 - ssim_val) + int_w * norm_diff
        # store into heat array
        row = ty // tile_size
        col = tx // tile_size
        heat_values[row, col] = hybrid_score

        # mark changed tile if hybrid_score > threshold
        if hybrid_score > hybrid_thresh:
            change_mask_tiles[row, col] = 1
        else:
            change_mask_tiles[row, col] = 0

        # Save side-by-side comparison image for this tile (sat | layout | diff visualization)
        if save_tile_comparisons:
            # create visualization: sat (left), layout (mid), absdiff (right)
            tA_rgb = tileA.copy()
            tB_rgb = tileB.copy()
            tA_gray = cv2.cvtColor(tA_rgb, cv2.COLOR_BGR2GRAY)
            tB_gray = cv2.cvtColor(tB_rgb, cv2.COLOR_BGR2GRAY)
            absd = cv2.absdiff(tA_gray, tB_gray)
            # scale absd for display
            absd_vis = cv2.cvtColor(absd, cv2.COLOR_GRAY2BGR)
            # add a small colored overlay for hybrid score (red intensity)
            red_overlay = np.zeros_like(tA_rgb)
            red_int = int(np.clip(hybrid_score, 0, 1) * 255)
            red_overlay[:, :, 2] = red_int
            # blend overlay with actual tile for the rightmost small preview (optional)
            blended = cv2.addWeighted(tA_rgb, 0.7, red_overlay, 0.3, 0)

            comparison = np.hstack([tA_rgb, tB_rgb, absd_vis, blended])
            fname = os.path.join(out_dir, "tiles_comparisons", f"tile_{row}_{col}_hyb{hybrid_score:.3f}.png")
            cv2.imwrite(fname, comparison)

            # keep a small set for inline display
            if len(comparisons_for_display) < MAX_SAMPLE_TILES_DISPLAY:
                comparisons_for_display.append((fname, hybrid_score, (row, col), ssim_val, norm_diff))

    # Step 5: Build the pixel-level change heatmap (upsample tile heat to image size)
    tile_rows, tile_cols = heat_values.shape
    heatmap = cv2.resize(heat_values, (sat_padded.shape[1], sat_padded.shape[0]), interpolation=cv2.INTER_NEAREST)

    # Normalize heatmap 0..1
    heatmap_norm = (heatmap - heatmap.min()) / (heatmap.max() - heatmap.min() + 1e-8)

    # Create red overlay where heatmap intensity > 0 (we'll apply transparency)
    overlay = sat_padded.copy().astype(np.float32)
    red_layer = np.zeros_like(overlay)
    red_layer[:, :, 2] = (heatmap_norm * 255).astype(np.uint8)  # red channel intensity proportional to heat

    # binary mask of significant change by tile mask (expand tiles)
    change_mask_pixels = cv2.resize(change_mask_tiles.astype(np.uint8),
                                    (sat_padded.shape[1], sat_padded.shape[0]), interpolation=cv2.INTER_NEAREST)

    # Create red-transparent overlay: alpha = heatmap_norm * change_mask_pixels
    alpha = (heatmap_norm * change_mask_pixels).astype(np.float32)  # 0..1
    alpha_3 = np.repeat(alpha[:, :, np.newaxis], 3, axis=2)

    overlay_out = (overlay * (1 - alpha_3) + red_layer.astype(np.float32) * (alpha_3)).astype(np.uint8)

    # Crop back to original size (remove padding) for saving/display
    h0, w0 = sat.shape[:2]
    overlay_cropped = overlay_out[0:h0, 0:w0]
    heatmap_cropped = (heatmap_norm * 255).astype(np.uint8)[0:h0, 0:w0]
    change_mask_cropped = change_mask_pixels[0:h0, 0:w0] * 255

    # Step 6: Save and display results after major steps
    cv2.imwrite(os.path.join(out_dir, "step3_tile_heat_raw.png"), (heatmap_norm * 255).astype(np.uint8))
    cv2.imwrite(os.path.join(out_dir, "step3_tile_binary_mask.png"), change_mask_cropped)
    cv2.imwrite(os.path.join(out_dir, "final_overlay_on_satellite.png"), overlay_cropped)

    print(f"\nSaved outputs to: {out_dir}")
    print(" - step3_tile_heat_raw.png (tile intensity heatmap, 0..255)")
    print(" - step3_tile_binary_mask.png (binary changed tiles mask)")
    print(" - final_overlay_on_satellite.png (satellite with red-transparent overlay)")

    # Inline displays
    plt.figure(figsize=(14,6))
    plt.subplot(1,3,1)
    plt.imshow(cv2.cvtColor(sat, cv2.COLOR_BGR2RGB))
    plt.title("Original Satellite")
    plt.axis("off")

    plt.subplot(1,3,2)
    plt.imshow(cv2.cvtColor(layout_resized, cv2.COLOR_BGR2RGB))
    plt.title("Aligned Layout")
    plt.axis("off")

    plt.subplot(1,3,3)
    plt.imshow(cv2.cvtColor(overlay_cropped, cv2.COLOR_BGR2RGB))
    plt.title("Overlay (Red = Deviation)")
    plt.axis("off")

    plt.show()

    # Show heatmap separately (red intensity)
    plt.figure(figsize=(8,6))
    plt.imshow(heatmap_cropped, cmap="hot")
    plt.title("Tile Hybrid Heatmap (hot)")
    plt.axis("off")
    plt.colorbar()
    plt.show()

    # Show some tile comparisons
    if comparisons_for_display:
        print("\nSample tile comparisons saved (sat | layout | absdiff | blended overlay):")
        for fname, hybrid_score, (r,c), ssim_val, norm_diff in comparisons_for_display:
            print(f" Tile ({r},{c}): hybrid={hybrid_score:.3f}, ssim={ssim_val:.3f}, int_diff={norm_diff:.3f}  -> {fname}")
            comp = Image.open(fname)
            display(comp)  # in notebooks this will show the image
    else:
        print("No tile comparisons to display.")

    # Summary stats
    total_tiles = tile_rows * tile_cols
    changed_tiles = int(change_mask_tiles.sum())
    percent_changed_tiles = (changed_tiles / total_tiles) * 100.0
    mean_hybrid = float(heat_values.mean())

    print("\n=== SUMMARY ===")
    print(f"Tile size: {tile_size} px")
    print(f"Tiles: {total_tiles} (rows={tile_rows}, cols={tile_cols})")
    print(f"Changed tiles: {changed_tiles}")
    print(f"Change coverage (tiles): {percent_changed_tiles:.2f}%")
    print(f"Mean hybrid score: {mean_hybrid:.4f}")

    return {
        "heat_values": heat_values,
        "change_mask_tiles": change_mask_tiles,
        "overlay_path": os.path.join(out_dir, "final_overlay_on_satellite.png"),
        "heatmap_path": os.path.join(out_dir, "step3_tile_heat_raw.png"),
        "tile_comparisons_dir": os.path.join(out_dir, "tiles_comparisons")
    }

# -----------------------------
# Run (interactive tile size)
# -----------------------------
if __name__ == "__main__":
    ensure_dir(OUTPUT_DIR)
    try:
        ts_input = input(f"Enter tile size (default {TILE_SIZE}): ").strip()
        if ts_input != "":
            TILE_SIZE = int(ts_input)
    except Exception:
        pass

    # Use medium sensitivity (threshold=0.3) as requested
    res = run_tile_change_detection(SATELLITE_PATH, LAYOUT_PATH, tile_size=TILE_SIZE,
                                    hybrid_thresh=HYBRID_THRESH)


With Overlay

In [None]:
# Tile-based hybrid SSIM + intensity change detection
# Red-transparent overlay on the actual satellite image (presentation-quality)
#
# Requirements:
# pip install opencv-python scikit-image matplotlib numpy Pillow

import os
import math
import cv2
import numpy as np
import matplotlib.pyplot as plt
from skimage.metrics import structural_similarity as ssim
from PIL import Image

# -----------------------------
# User-configurable parameters
# -----------------------------
SATELLITE_PATH = "/content/color_segmented.png"      # actual satellite image (RGB/BGR)
LAYOUT_PATH    = "/content/color_segmented/segmented_page_3.png"  # planned layout image
TILE_SIZE      = 32      # user tile size (changeable)
HYBRID_THRESH  = 0.6      # medium sensitivity
SSIM_WEIGHT    = 0.5      # weight for (1 - ssim)
INT_WEIGHT     = 0.5      # weight for normalized intensity diff
OUTPUT_DIR     = "/content/output_results"
SAVE_TILE_COMPARISONS = True  # save every tile side-by-side
MAX_SAMPLE_TILES_DISPLAY = 8  # how many tile comparisons to show inline

# -----------------------------
# Helpers
# -----------------------------
def ensure_dir(d):
    os.makedirs(d, exist_ok=True)

def load_image_bgr(path):
    img = cv2.imread(path, cv2.IMREAD_COLOR)
    if img is None:
        raise FileNotFoundError(f"Could not read image: {path}")
    return img

def pad_to_multiple(img, tile_size, pad_color=(0,0,0)):
    h, w = img.shape[:2]
    pad_h = (math.ceil(h / tile_size) * tile_size) - h
    pad_w = (math.ceil(w / tile_size) * tile_size) - w
    if pad_h == 0 and pad_w == 0:
        return img, (0,0)
    # bottom and right padding
    padded = cv2.copyMakeBorder(img, 0, pad_h, 0, pad_w, cv2.BORDER_REFLECT)
    return padded, (pad_h, pad_w)

def create_tile_coords(img_shape, tile_size):
    h, w = img_shape[:2]
    coords = []
    for y in range(0, h, tile_size):
        for x in range(0, w, tile_size):
            coords.append((x, y))
    return coords

def compute_tile_stats(tileA, tileB):
    # Convert to gray
    tAg = cv2.cvtColor(tileA, cv2.COLOR_BGR2GRAY) if tileA.ndim == 3 else tileA.copy()
    tBg = cv2.cvtColor(tileB, cv2.COLOR_BGR2GRAY) if tileB.ndim == 3 else tileB.copy()

    # Ensure same size
    if tAg.shape != tBg.shape:
        tBg = cv2.resize(tBg, (tAg.shape[1], tAg.shape[0]), interpolation=cv2.INTER_AREA)

    # SSIM: choose win_size <= min(dim) and odd and >=3 (skimage requires >=3, default 7)
    min_side = min(tAg.shape[0], tAg.shape[1])
    win_size = min(7, min_side)
    if win_size < 3:
        # fallback: if extremely small (shouldn't happen because we pad), just return neutral
        ssim_val = 1.0
    else:
        if win_size % 2 == 0:
            win_size -= 1
        try:
            ssim_val = ssim(tAg, tBg, win_size=win_size)
        except Exception:
            ssim_val = ssim(tAg, tBg, data_range=tAg.max()-tAg.min() if tAg.max()!=tAg.min() else 1)

    # Intensity difference normalized [0,1]
    absdiff = np.abs(tAg.astype(np.float32) - tBg.astype(np.float32))
    mean_diff = absdiff.mean()  # 0..255
    norm_diff = mean_diff / 255.0

    return ssim_val, norm_diff

# -----------------------------
# Main pipeline
# -----------------------------
def run_tile_change_detection(sat_path, layout_path, tile_size=TILE_SIZE,
                              hybrid_thresh=HYBRID_THRESH,
                              ssim_w=SSIM_WEIGHT, int_w=INT_WEIGHT,
                              out_dir=OUTPUT_DIR, save_tile_comparisons=SAVE_TILE_COMPARISONS):
    ensure_dir(out_dir)
    ensure_dir(os.path.join(out_dir, "tiles_comparisons"))

    # Step 1: Load images
    sat = load_image_bgr(sat_path)
    layout = load_image_bgr(layout_path)
    print(f"Loaded sat: {sat.shape}, layout: {layout.shape}")

    # Step 2: Align by resizing layout to satellite shape (preserve sat resolution)
    layout_resized = cv2.resize(layout, (sat.shape[1], sat.shape[0]), interpolation=cv2.INTER_AREA)
    print(f"Aligned layout to satellite size: {sat.shape}")

    # Save intermediate step visualizations
    cv2.imwrite(os.path.join(out_dir, "step1_satellite.png"), sat)
    cv2.imwrite(os.path.join(out_dir, "step1_layout_aligned.png"), layout_resized)
    print("Saved: step1_satellite.png, step1_layout_aligned.png")

    # Step 3: Pad images so dimensions divisible by tile_size (avoid tiny edge tiles)
    sat_padded, (pad_h, pad_w) = pad_to_multiple(sat, tile_size)
    layout_padded, _ = pad_to_multiple(layout_resized, tile_size)
    print(f"Padded images -> new shape: {sat_padded.shape} (pad_h={pad_h}, pad_w={pad_w})")

    cv2.imwrite(os.path.join(out_dir, "step2_satellite_padded.png"), sat_padded)
    cv2.imwrite(os.path.join(out_dir, "step2_layout_padded.png"), layout_padded)
    print("Saved: step2_satellite_padded.png, step2_layout_padded.png")

    # Step 4: Iterate tiles and compute hybrid change score
    coords = create_tile_coords(sat_padded.shape, tile_size)
    heat_values = np.zeros((sat_padded.shape[0] // tile_size, sat_padded.shape[1] // tile_size), dtype=np.float32)
    change_mask_tiles = np.zeros_like(heat_values, dtype=np.uint8)

    comparisons_for_display = []  # store small set for display

    for idx, (x, y) in enumerate(coords):
        tx = x; ty = y
        tileA = sat_padded[ty:ty+tile_size, tx:tx+tile_size]
        tileB = layout_padded[ty:ty+tile_size, tx:tx+tile_size]

        ssim_val, norm_diff = compute_tile_stats(tileA, tileB)

        # hybrid score: higher => more change
        hybrid_score = ssim_w * (1.0 - ssim_val) + int_w * norm_diff
        # store into heat array
        row = ty // tile_size
        col = tx // tile_size
        heat_values[row, col] = hybrid_score

        # mark changed tile if hybrid_score > threshold
        if hybrid_score > hybrid_thresh:
            change_mask_tiles[row, col] = 1
        else:
            change_mask_tiles[row, col] = 0

        # Save side-by-side comparison image for this tile (sat | layout | diff visualization)
        if save_tile_comparisons:
            # create visualization: sat (left), layout (mid), absdiff (right)
            tA_rgb = tileA.copy()
            tB_rgb = tileB.copy()
            tA_gray = cv2.cvtColor(tA_rgb, cv2.COLOR_BGR2GRAY)
            tB_gray = cv2.cvtColor(tB_rgb, cv2.COLOR_BGR2GRAY)
            absd = cv2.absdiff(tA_gray, tB_gray)
            # scale absd for display
            absd_vis = cv2.cvtColor(absd, cv2.COLOR_GRAY2BGR)
            # add a small colored overlay for hybrid score (red intensity)
            red_overlay = np.zeros_like(tA_rgb)
            red_int = int(np.clip(hybrid_score, 0, 1) * 255)
            red_overlay[:, :, 2] = red_int
            # blend overlay with actual tile for the rightmost small preview (optional)
            blended = cv2.addWeighted(tA_rgb, 0.7, red_overlay, 0.3, 0)

            comparison = np.hstack([tA_rgb, tB_rgb, absd_vis, blended])
            fname = os.path.join(out_dir, "tiles_comparisons", f"tile_{row}_{col}_hyb{hybrid_score:.3f}.png")
            cv2.imwrite(fname, comparison)

            # keep a small set for inline display
            if len(comparisons_for_display) < MAX_SAMPLE_TILES_DISPLAY:
                comparisons_for_display.append((fname, hybrid_score, (row, col), ssim_val, norm_diff))

    # Step 5: Build the pixel-level change heatmap (upsample tile heat to image size)
    tile_rows, tile_cols = heat_values.shape
    heatmap = cv2.resize(heat_values, (sat_padded.shape[1], sat_padded.shape[0]), interpolation=cv2.INTER_NEAREST)

    # Normalize heatmap 0..1
    heatmap_norm = (heatmap - heatmap.min()) / (heatmap.max() - heatmap.min() + 1e-8)

    # Create red overlay where heatmap intensity > 0 (we'll apply transparency)
    overlay = sat_padded.copy().astype(np.float32)
    red_layer = np.zeros_like(overlay)
    red_layer[:, :, 2] = (heatmap_norm * 255).astype(np.uint8)  # red channel intensity proportional to heat

    # binary mask of significant change by tile mask (expand tiles)
    change_mask_pixels = cv2.resize(change_mask_tiles.astype(np.uint8),
                                    (sat_padded.shape[1], sat_padded.shape[0]), interpolation=cv2.INTER_NEAREST)

    # Create red-transparent overlay: alpha = heatmap_norm * change_mask_pixels
    alpha = (heatmap_norm * change_mask_pixels).astype(np.float32)  # 0..1
    alpha_3 = np.repeat(alpha[:, :, np.newaxis], 3, axis=2)

    overlay_out = (overlay * (1 - alpha_3) + red_layer.astype(np.float32) * (alpha_3)).astype(np.uint8)

    # Crop back to original size (remove padding) for saving/display
    h0, w0 = sat.shape[:2]
    overlay_cropped = overlay_out[0:h0, 0:w0]
    heatmap_cropped = (heatmap_norm * 255).astype(np.uint8)[0:h0, 0:w0]
    change_mask_cropped = change_mask_pixels[0:h0, 0:w0] * 255

    # Step 6: Save and display results after major steps
    cv2.imwrite(os.path.join(out_dir, "step3_tile_heat_raw.png"), (heatmap_norm * 255).astype(np.uint8))
    cv2.imwrite(os.path.join(out_dir, "step3_tile_binary_mask.png"), change_mask_cropped)
    cv2.imwrite(os.path.join(out_dir, "final_overlay_on_satellite.png"), overlay_cropped)

    print(f"\nSaved outputs to: {out_dir}")
    print(" - step3_tile_heat_raw.png (tile intensity heatmap, 0..255)")
    print(" - step3_tile_binary_mask.png (binary changed tiles mask)")
    print(" - final_overlay_on_satellite.png (satellite with red-transparent overlay)")
    # ---------------------------------------------------
    # Step 7: Quantify percentage of area under deviation
    # ---------------------------------------------------
    print("\nCalculating percentage of area under deviation...")

    # Use normalized heatmap and change mask for finer per-pixel deviation
    deviation_threshold = hybrid_thresh  # same as your hybrid threshold
    deviation_mask = (heatmap_norm > deviation_threshold).astype(np.uint8)

    # Compute area percentage
    changed_pixels = np.sum(deviation_mask)
    total_pixels = deviation_mask.size
    deviation_percent_area = (changed_pixels / total_pixels) * 100.0

    print(f"ðŸ“Š Estimated deviation area: {deviation_percent_area:.2f}% of total region.")

    # Optional overlay visualization (red = deviation) â€“ fixed for shape/channel mismatch
    heat_colored = cv2.applyColorMap((deviation_mask * 255).astype(np.uint8), cv2.COLORMAP_HOT)
    heat_colored = cv2.resize(heat_colored, (sat.shape[1], sat.shape[0]))  # match satellite size

    # ensure both have 3 channels
    if len(sat.shape) == 2:
        sat_color = cv2.cvtColor(sat, cv2.COLOR_GRAY2BGR)
    else:
        sat_color = sat.copy()

    # Blend red heatmap overlay (red = deviation)
    overlay_dev = cv2.addWeighted(sat_color, 0.7, heat_colored, 0.3, 0)


    plt.figure(figsize=(10, 6))
    plt.imshow(cv2.cvtColor(overlay_dev, cv2.COLOR_BGR2RGB))
    plt.title(f"Deviation Area Overlay ({deviation_percent_area:.2f}% area changed)")
    plt.axis('off')
    plt.show()

    # Save overlay for reference
    cv2.imwrite(os.path.join(out_dir, "step4_deviation_area_overlay.png"), overlay_dev)
    print("Saved: step4_deviation_area_overlay.png (Deviation heat overlay)")


    # Inline displays
    plt.figure(figsize=(14,6))
    plt.subplot(1,3,1)
    plt.imshow(cv2.cvtColor(sat, cv2.COLOR_BGR2RGB))
    plt.title("Original Satellite")
    plt.axis("off")

    plt.subplot(1,3,2)
    plt.imshow(cv2.cvtColor(layout_resized, cv2.COLOR_BGR2RGB))
    plt.title("Aligned Layout")
    plt.axis("off")

    plt.subplot(1,3,3)
    plt.imshow(cv2.cvtColor(overlay_cropped, cv2.COLOR_BGR2RGB))
    plt.title("Overlay (Red = Deviation)")
    plt.axis("off")

    plt.show()

    # Show heatmap separately (red intensity)
    plt.figure(figsize=(8,6))
    plt.imshow(heatmap_cropped, cmap="hot")
    plt.title("Tile Hybrid Heatmap (hot)")
    plt.axis("off")
    plt.colorbar()
    plt.show()

    # Show some tile comparisons
    if comparisons_for_display:
        print("\nSample tile comparisons saved (sat | layout | absdiff | blended overlay):")
        for fname, hybrid_score, (r,c), ssim_val, norm_diff in comparisons_for_display:
            print(f" Tile ({r},{c}): hybrid={hybrid_score:.3f}, ssim={ssim_val:.3f}, int_diff={norm_diff:.3f}  -> {fname}")
            comp = Image.open(fname)
            display(comp)  # in notebooks this will show the image
    else:
        print("No tile comparisons to display.")

    # Summary stats
    total_tiles = tile_rows * tile_cols
    changed_tiles = int(change_mask_tiles.sum())
    percent_changed_tiles = (changed_tiles / total_tiles) * 100.0
    mean_hybrid = float(heat_values.mean())

    print("\n=== SUMMARY ===")
    print(f"Tile size: {tile_size} px")
    print(f"Tiles: {total_tiles} (rows={tile_rows}, cols={tile_cols})")
    print(f"Changed tiles: {changed_tiles}")
    print(f"Change coverage (tiles): {percent_changed_tiles:.2f}%")
    print(f"Mean hybrid score: {mean_hybrid:.4f}")

    return {
        "heat_values": heat_values,
        "change_mask_tiles": change_mask_tiles,
        "overlay_path": os.path.join(out_dir, "final_overlay_on_satellite.png"),
        "heatmap_path": os.path.join(out_dir, "step3_tile_heat_raw.png"),
        "tile_comparisons_dir": os.path.join(out_dir, "tiles_comparisons")
    }

# -----------------------------
# Run (interactive tile size)
# -----------------------------
if __name__ == "__main__":
    ensure_dir(OUTPUT_DIR)
    try:
        ts_input = input(f"Enter tile size (default {TILE_SIZE}): ").strip()
        if ts_input != "":
            TILE_SIZE = int(ts_input)
    except Exception:
        pass

    # Use medium sensitivity (threshold=0.3) as requested
    res = run_tile_change_detection(SATELLITE_PATH, LAYOUT_PATH, tile_size=TILE_SIZE,
                                    hybrid_thresh=HYBRID_THRESH)


# Segmentation

Goal: Want to achieve clear road & wall bundaries on grayscale.

## Approach 1
### Applying K means and Gray scale segementation

Link: https://colab.research.google.com/drive/11gVZr-GnOizhG8OowdTCLfaaOLKD-Ish#scrollTo=xD2OlEsxyQmT

Outcome: Good result on Map layouts, but algorithm capture noise in real satellite images (like trees, bushes, vehicles)

#### Image extractor from pdf and applying K means and Gray scale segementation

In [None]:
import os
import numpy as np
import cv2
from pdf2image import convert_from_path
import matplotlib.pyplot as plt
from sklearn.cluster import MiniBatchKMeans
from PIL import Image

# ------------------------
# Configurable parameters
# ------------------------
KMEANS_SAMPLE_PIXELS = 500_000   # sample for training only (output stays full resolution)
N_CLUSTERS = 5
KMEANS_N_INIT = 10
CANNY_THRESHOLDS = (100, 200)

# ------------------------
# Helpers
# ------------------------
def convert_pdf_to_images(pdf_file_path, dpi=300):
    """Convert PDF pages to PIL images."""
    try:
        pages = convert_from_path(pdf_file_path, dpi=dpi)
        print(f"Converted PDF to {len(pages)} pages at {dpi} dpi.")
        return pages
    except Exception as e:
        print("Error converting PDF:", e)
        return None

def ensure_dir(d):
    os.makedirs(d, exist_ok=True)

def save_images_to_pdf_pillow(image_paths, output_pdf="output.pdf"):
    """Save images as a multi-page PDF without rescaling (lossless)."""
    if not image_paths:
        print("No images to save as PDF.")
        return None
    pil_images = [Image.open(p).convert("RGB") for p in image_paths]
    first, rest = pil_images[0], pil_images[1:]
    first.save(output_pdf, save_all=True, append_images=rest)
    print(f"PDF saved with {len(pil_images)} pages: {output_pdf}")
    return output_pdf

# ------------------------
# 1) Color segmentation
# ------------------------
def color_segmentation_kmeans(input_pdf, output_dir="color_segmented"):
    ensure_dir(output_dir)
    pil_pages = convert_pdf_to_images(input_pdf)
    if not pil_pages:
        return []

    saved_paths = []
    for idx, pil in enumerate(pil_pages, start=1):
        print(f"\n[Color KMeans] Page {idx}...")
        cv_img = cv2.cvtColor(np.array(pil), cv2.COLOR_RGB2BGR)

        h, w = cv_img.shape[:2]
        flat = cv_img.reshape((-1, 3)).astype(np.float32)

        # Sample subset for clustering (for speed/memory only)
        n_pixels = flat.shape[0]
        if n_pixels > KMEANS_SAMPLE_PIXELS:
            sample_idx = np.random.choice(n_pixels, size=KMEANS_SAMPLE_PIXELS, replace=False)
            sample_flat = flat[sample_idx]
        else:
            sample_flat = flat

        kmeans = MiniBatchKMeans(n_clusters=N_CLUSTERS, n_init=KMEANS_N_INIT, random_state=42)
        kmeans.fit(sample_flat)

        labels = kmeans.predict(flat)
        segmented_flat = kmeans.cluster_centers_[labels]
        segmented = segmented_flat.reshape((h, w, 3)).astype(np.uint8)

        out_path = os.path.join(output_dir, f"segmented_page_{idx}.png")
        cv2.imwrite(out_path, segmented)
        saved_paths.append(out_path)
        print(f"Saved segmented PNG: {out_path}")

        plt.imshow(cv2.cvtColor(segmented, cv2.COLOR_BGR2RGB))
        plt.title(f"Segmented Page {idx}")
        plt.axis('off')
        plt.show()

    return saved_paths

# ------------------------
# 2) Grayscale + sharpen + edges
# ------------------------
def grayscale_sharpen_edges(image_paths, output_dir="grayscale_edges"):
    ensure_dir(output_dir)
    if not image_paths:
        print("No images to process.")
        return []

    saved = []
    for idx, p in enumerate(image_paths, start=1):
        print(f"\n[Grayscale Enhanced] Page {idx} - {p}")
        img = cv2.imread(p, cv2.IMREAD_COLOR)
        if img is None:
            print(f"Failed to read {p}. Skipping.")
            continue

        # Step 1: Sharpen (optional but helps edges pop)
        sharpening_kernel = np.array([[-1, -1, -1],
                                      [-1,  9, -1],
                                      [-1, -1, -1]])
        sharpened = cv2.filter2D(img, -1, sharpening_kernel)

        # Step 2: Convert to grayscale
        gray = cv2.cvtColor(sharpened, cv2.COLOR_BGR2GRAY)

        # Step 3: Denoise while keeping edges
        denoised = cv2.bilateralFilter(gray, d=9, sigmaColor=75, sigmaSpace=75)

        # Step 4: Contrast enhancement (CLAHE)
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
        enhanced = clahe.apply(denoised)

        # Step 5: Auto Canny thresholds
        median_val = np.median(enhanced)
        low = int(max(0, 0.66 * median_val))
        high = int(min(255, 1.33 * median_val))

        print(f"Auto Canny thresholds â†’ low={low}, high={high}")
        edges = cv2.Canny(enhanced, low, high)

        # Save output
        out_path = os.path.join(output_dir, f"edges_page_{idx}.png")
        cv2.imwrite(out_path, edges)
        saved.append(out_path)
        print(f"Saved enhanced edge image: {out_path}")

        # Show preview
        plt.imshow(edges, cmap='gray')
        plt.title(f"Enhanced Edges Page {idx}")
        plt.axis('off')
        plt.show()

    return saved

# ------------------------
# Combined pipeline
# ------------------------
def process_pdf_pipeline(input_pdf,
                         color_out_dir="color_segmented",
                         edges_out_dir="grayscale_edges",
                         color_pdf="color_segmented_output.pdf",
                         edges_pdf="grayscale_edges_output.pdf"):
    print("STEP 1: Color segmentation")
    segmented_paths = color_segmentation_kmeans(input_pdf, output_dir=color_out_dir)

    print("\nSTEP 2: Grayscale + edges")
    edge_paths = grayscale_sharpen_edges(segmented_paths, output_dir=edges_out_dir)

    print("\nSTEP 3: Save outputs as PDFs (lossless, no resizing)")
    if segmented_paths:
        save_images_to_pdf_pillow(segmented_paths, output_pdf=color_pdf)
    if edge_paths:
        save_images_to_pdf_pillow(edge_paths, output_pdf=edges_pdf)

    print("\nPipeline completed.")
    return segmented_paths, edge_paths, (color_pdf, edges_pdf)

# ------------------------
# Example usage
# ------------------------
if __name__ == "__main__":
    input_pdf = "/content/map_no_selectable_text.pdf"
    process_pdf_pipeline(input_pdf)

#### Single image Segementation (If you don't working on pdf)

In [None]:
### Cell 1: Setup and Imports

# Before running this code, you need to install the necessary libraries.
# You can do this by running the following commands in your terminal or a Jupyter cell:
# pip install opencv-python Pillow numpy matplotlib

import os
import cv2
import numpy as np
import matplotlib.pyplot as plt

# The path to the satellite image file you want to process.
# Please replace 'satellite_image.jpg' with the actual path to your file.
# Make sure the image file is in the same directory as this notebook, or provide the full path.
image_path = '/content/segmented_image_page_8.png'

### Cell 2: Load and Display Original Image

# This section loads the satellite image from the file system and displays it.
def load_image(image_file_path):
    """
    Loads an image from the specified file path.

    Args:
        image_file_path (str): The path to the image file.

    Returns:
        numpy.ndarray: The image as a NumPy array (OpenCV format), or None if an error occurs.
    """
    if not os.path.exists(image_file_path):
        print(f"Error: The file '{image_file_path}' does not exist.")
        return None

    try:
        # Load the image using OpenCV
        img = cv2.imread(image_file_path)
        if img is None:
            print("Error: Could not read the image file. Check file format and permissions.")
            return None

        print(f"Successfully loaded image from '{image_file_path}'.")
        return img
    except Exception as e:
        print(f"An error occurred while loading the image: {e}")
        return None

# Load the satellite image
satellite_image = load_image(image_path)

if satellite_image is not None:
    # Display the original image
    plt.imshow(cv2.cvtColor(satellite_image, cv2.COLOR_BGR2RGB))
    plt.title('Original Satellite Image')
    plt.axis('off')
    plt.show()

### Cell 3: Apply Edge Detection

# This section applies the Canny edge detection algorithm, a popular technique
# for finding sharp changes in image intensity, which correspond to edges.
# This is a form of image segmentation where the segments are defined by their boundaries.

if satellite_image is not None:
    # Convert the image to grayscale, as Canny edge detection works on single-channel images.
    gray_image = cv2.cvtColor(satellite_image, cv2.COLOR_BGR2GRAY)

    # Apply Canny edge detection
    # The thresholds control which edges are detected.
    # A low threshold of 100 and a high threshold of 200 are common starting points.
    low_threshold = 100
    high_threshold = 200

    print(f"\nApplying Canny edge detection with thresholds: low={low_threshold}, high={high_threshold}...")
    edges = cv2.Canny(gray_image, low_threshold, high_threshold)

    # Display the resulting edges
    plt.imshow(edges, cmap='gray')
    plt.title('Canny Edge Detection Result')
    plt.axis('off')
    plt.show()

    # Save the resulting image
    output_path = 'edges.png'
    cv2.imwrite(output_path, edges)
    print(f"\nEdge-detected image saved as '{output_path}'")
else:
    print("Could not find the image to perform edge detection. Please check the previous cell.")


## Approach 2
Applied SSIM directly on Satellite image and layout directly

Link: https://colab.research.google.com/drive/1uxKlL402nxPk1Se2jkFoEcyIM0MMzW6g#scrollTo=rp6I0BnjDGPy

Outcome: Better results, but same noise issue

## Aprroach 3
### Trained Unet model on "massachusetts-roads-dataset"
Link: https://colab.research.google.com/drive/1uYOffbsvZ_2j7SpfCBEv6sdGg-X8JSNk

Outcome: Model fail on local aligarh maps

## Approach 4
Created Hand labels from Satellite images and then trained the Unet model on Alig Dataset

Compared Layouts with Unet Model Output (Satellite images segmentations)...

Model Traing available on the following link

https://colab.research.google.com/drive/1i4Mg1hItzSevyb_l0D3BsrV2MZYnXnqw

Outcome: Model failure, due to less data item (53 images) model learns only black pixels

## Approch 5
### Transfer Learning

Fine Tune Cuda's ResNet model on our own Alig dataset

Link: https://colab.research.google.com/drive/1i4Mg1hItzSevyb_l0D3BsrV2MZYnXnqw

Outcome: Best results till now...

Best result when training for 50 epochs

In [None]:
# =========================================================================
# 1. SETUP, IMPORTS, AND PATHS
# =========================================================================

# Install necessary libraries silently
!pip install -q -U segmentation-models-pytorch albumentations > /dev/null
!pip install scikit-image --upgrade

import os, cv2
import numpy as np
import pandas as pd
import random, tqdm
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

import warnings
warnings.filterwarnings("ignore")

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import albumentations as album
import segmentation_models_pytorch as smp
from skimage.metrics import structural_similarity as ssim
import torch.optim as optim
import torch.nn.functional as F

print("Libraries loaded successfully.")

# --- Define Paths and Constants ---
# Assuming 'Alig_Dataset' was uploaded directly to the Colab working directory.
# NOTE: The mask folder name is assumed to be 'Label' based on the last input. Change to 'Label_filled' if applicable.
LOCAL_PATH = './Alig_Dataset/'
x_train_dir = os.path.join(LOCAL_PATH, 'Satellite_img')
y_train_dir = os.path.join(LOCAL_PATH, 'Label')
x_valid_dir = os.path.join(LOCAL_PATH, 'Satellite_img')
y_valid_dir = os.path.join(LOCAL_PATH, 'Label')

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

# =========================================================================
# 2. HELPER FUNCTIONS, CLASSES, AND U-NET ARCHITECTURE (TRANSFER LEARNING)
# =========================================================================

# --- Define 2-Class Binary Segmentation ---
class_names = ['background', 'building_outline']
class_rgb_values = [
    [0, 0, 0],      # Black for Background
    [255, 255, 255] # White for Building Outline
]
select_classes = ['background', 'building_outline']
select_class_indices = [class_names.index(cls.lower()) for cls in select_classes]
select_class_rgb_values =  np.array(class_rgb_values)[select_class_indices]

# --- Visualization and Encoding Functions ---
def visualize(**images):
    n_images = len(images)
    plt.figure(figsize=(20,8))
    for idx, (name, image) in enumerate(images.items()):
        plt.subplot(1, n_images, idx + 1)
        plt.xticks([]); plt.yticks([])
        plt.title(name.replace('_',' ').title(), fontsize=20)
        if image.ndim == 2 or (image.ndim == 3 and image.shape[2] == 1):
            plt.imshow(image.squeeze(), cmap='gray')
        else:
            plt.imshow(image)
    plt.show()

def one_hot_encode(label, label_values):
    semantic_map = []
    for colour in label_values:
        equality = np.equal(label, colour)
        class_map = np.all(equality, axis = -1)
        semantic_map.append(class_map)
    semantic_map = np.stack(semantic_map, axis=-1)
    return semantic_map.astype('float')

def reverse_one_hot(image):
    x = np.argmax(image, axis = -1)
    return x

def colour_code_segmentation(image, label_values):
    colour_codes = np.array(label_values)
    x = colour_codes[image.astype(int)]
    return x

def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1).astype('float32')

def get_preprocessing(preprocessing_fn=None):
    _transform = []
    if preprocessing_fn:
        _transform.append(album.Lambda(image=preprocessing_fn))
    _transform.append(album.Lambda(image=to_tensor, mask=to_tensor))
    return album.Compose(_transform)

def get_training_augmentation():
    train_transform = [
        album.RandomCrop(height=256, width=256, always_apply=True),
        album.OneOf([album.HorizontalFlip(p=1), album.VerticalFlip(p=1), album.RandomRotate90(p=1)], p=0.75),
    ]
    return album.Compose(train_transform)

def get_validation_augmentation():
    test_transform = [album.PadIfNeeded(min_height=512, min_width=512, always_apply=True, border_mode=0),]
    return album.Compose(test_transform)


# --- U-Net Model Initialization (TRANSFER LEARNING FIX) ---
model = smp.Unet(
    encoder_name="resnet34",       # Use ResNet34 encoder
    encoder_weights="imagenet",    # Load weights pre-trained on ImageNet
    in_channels=3,                 # RGB input
    classes=len(select_classes),   # 2 output classes (Background, Building)
)
model.to(DEVICE)
print(f"U-Net Model (ResNet34, ImageNet pre-trained) initialized.")


# =========================================================================
# 3. CUSTOM DATASET AND DATALOADERS
# =========================================================================

class AligarhDataset(torch.utils.data.Dataset):

    def __init__(self, images_dir, masks_dir, is_train=True, augmentation=None, preprocessing=None, class_rgb_values=None):

        # Diagnostics: Checks existence of files
        try:
            satellite_files = sorted(os.listdir(images_dir))
            file_ids = sorted([f[1:-4] for f in satellite_files if f.startswith('S') and f.endswith('.png')])
        except FileNotFoundError:
            file_ids = []

        total_files_found = len(file_ids)
        if total_files_found == 0:
            self.image_paths = []; self.mask_paths = []; return

        split_idx = int(total_files_found * 0.8)

        if is_train:
            self.file_ids = file_ids[:split_idx]
        else:
            self.file_ids = file_ids[split_idx:]

        self.image_paths = [os.path.join(images_dir, f'S{id_}.png') for id_ in self.file_ids]
        self.mask_paths = [os.path.join(masks_dir, f'M{id_}.png') for id_ in self.file_ids]

        self.class_rgb_values = class_rgb_values
        self.augmentation = augmentation
        self.preprocessing = preprocessing

    def __getitem__(self, i):
        # NOTE: num_workers=0 is assumed in DataLoader to handle cv2.imread safely
        image_bgr = cv2.imread(self.image_paths[i])
        mask_bgr = cv2.imread(self.mask_paths[i])

        if image_bgr is None or mask_bgr is None:
             raise RuntimeError(f"Failed to load image or mask: {self.image_paths[i]}")

        image = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
        mask = cv2.cvtColor(mask_bgr, cv2.COLOR_BGR2RGB)

        mask = one_hot_encode(mask, self.class_rgb_values).astype('float')

        if self.augmentation:
            sample = self.augmentation(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']

        if self.preprocessing:
            sample = self.preprocessing(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']

        return image, mask

    def __len__(self): return len(self.image_paths)

# --- Initialize DataLoaders ---
train_dataset = AligarhDataset(x_train_dir, y_train_dir, is_train=True, augmentation=get_training_augmentation(), preprocessing=get_preprocessing(preprocessing_fn=None), class_rgb_values=select_class_rgb_values)
valid_dataset = AligarhDataset(x_valid_dir, y_valid_dir, is_train=False, augmentation=get_validation_augmentation(), preprocessing=get_preprocessing(preprocessing_fn=None), class_rgb_values=select_class_rgb_values)

# num_workers=0 is set for stability with cv2.imread
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=0)
valid_loader = DataLoader(valid_dataset, batch_size=1, shuffle=False, num_workers=0)

print(f"\nDataLoaders initialized (Train: {len(train_dataset)}, Valid: {len(valid_dataset)}).")


# =========================================================================
# 4. TRAINING AND VALIDATION EXECUTION
# =========================================================================

TRAINING = True
EPOCHS = 50 # Increased epochs for stability and learning with small data
loss = nn.BCEWithLogitsLoss()
optimizer = optim.Adam([dict(params=model.parameters(), lr=0.00008)])

# --- Training and Validation Loop Classes (Standard PyTorch) ---
class CustomTrainEpoch:
    def __init__(self, model, loss, optimizer, device):
        self.model = model; self.loss = loss; self.optimizer = optimizer; self.device = device

    def run(self, data_loader):
        self.model.train(); total_loss = 0
        pbar = tqdm.tqdm(data_loader)
        for x_batch, y_batch in pbar:
            x_batch, y_batch = x_batch.to(self.device), y_batch.to(self.device)
            self.optimizer.zero_grad()
            y_pred_logits = self.model(x_batch)
            loss_value = self.loss(y_pred_logits, y_batch)
            loss_value.backward(); self.optimizer.step()
            total_loss += loss_value.item()
            pbar.set_description(f"Loss: {loss_value.item():.4f}")
        return {'loss': total_loss / len(data_loader)}

class CustomValidEpoch(CustomTrainEpoch):
    def run(self, data_loader):
        self.model.eval(); total_loss = 0; total_iou = 0
        pbar = tqdm.tqdm(data_loader)
        with torch.no_grad():
            for x_batch, y_batch in pbar:
                x_batch, y_batch = x_batch.to(self.device), y_batch.to(self.device)
                y_pred_logits = self.model(x_batch)
                loss_value = self.loss(y_pred_logits, y_batch); total_loss += loss_value.item()

                y_pred_prob = torch.sigmoid(y_pred_logits); y_pred_mask = (y_pred_prob > 0.5).float()

                intersection = (y_pred_mask * y_batch).sum()
                union = y_pred_mask.sum() + y_batch.sum() - intersection
                iou = (intersection + 1e-6) / (union + 1e-6)
                total_iou += iou.item()

                pbar.set_description(f"Val Loss: {loss_value.item():.4f}, IoU: {iou.item():.4f}")

        return {'loss': total_loss / len(data_loader), 'iou_score': total_iou / len(data_loader)}

# --- Training Loop Execution ---
if TRAINING:
    best_iou_score = 0.0
    for i in range(0, EPOCHS):
        print('\nEpoch: {}'.format(i))
        CustomTrainEpoch(model, loss, optimizer, DEVICE).run(train_loader)
        valid_logs = CustomValidEpoch(model, loss, optimizer, DEVICE).run(valid_loader)

        if valid_logs['iou_score'] > best_iou_score:
            best_iou_score = valid_logs['iou_score']
            torch.save(model.state_dict(), './best_model.pth')
            print('Model saved!')


# =========================================================================
# 5. PREDICTION AND ACCURACY ASSESSMENT (IoU)
# =========================================================================

# --- 1. Load Trained Model ---
try:
    best_model = smp.Unet(encoder_name="resnet34", in_channels=3, classes=len(select_classes))
    state_dict = torch.load('./best_model.pth', map_location=DEVICE)
    best_model.load_state_dict(state_dict, strict=False)

    best_model.to(DEVICE); best_model.eval()
    print('\nLoaded trained U-Net model successfully.')
except Exception as e:
    print(f"[CRITICAL ERROR] Model loading failed. Ensure training finished successfully. Error: {e}")
    exit()

# --- 2. Prediction Helper ---
def predict_image(model, image, device, target_size=(512, 512)):
    image_tensor = get_preprocessing()(image=image)['image']
    x_tensor = torch.from_numpy(image_tensor).to(device).unsqueeze(0)

    with torch.no_grad():
        pred_logits = model(x_tensor)

    pred_prob = torch.sigmoid(pred_logits).squeeze().cpu().numpy()
    predicted_mask_binary = (pred_prob[1, :, :] > 0.5).astype(np.uint8) * 255

    return cv2.resize(predicted_mask_binary, target_size, interpolation=cv2.INTER_NEAREST)

def calculate_iou_for_single_image(pred_mask_binary, gt_mask_path, target_size=(512, 512)):
    """Calculates Intersection over Union (IoU) between the predicted mask and the ground truth mask."""
    gt_mask_raw = cv2.cvtColor(cv2.imread(gt_mask_path), cv2.COLOR_BGR2RGB)
    gt_mask_gray = cv2.cvtColor(gt_mask_raw, cv2.COLOR_RGB2GRAY)
    gt_mask_resized = cv2.resize(gt_mask_gray, target_size, interpolation=cv2.INTER_NEAREST)

    gt_tensor = torch.from_numpy(gt_mask_resized).float() / 255.0
    pred_tensor = torch.from_numpy(pred_mask_binary).float() / 255.0

    intersection = (pred_tensor * gt_tensor).sum()
    union = pred_tensor.sum() + gt_tensor.sum() - intersection

    iou = (intersection.item() + 1e-6) / (union.item() + 1e-6)
    return iou


# --- 3. EXECUTION: Predict and Compare against Ground Truth ---
TARGET_SIZE = (512, 512)
S_PATH = os.path.join(x_train_dir, 'S05.png')
M_PATH = os.path.join(y_train_dir, 'M05.png')

# Load Input Image
satellite_image_raw = cv2.cvtColor(cv2.imread(S_PATH), cv2.COLOR_BGR2RGB)

# Generate the Actual Segmentation Mask (U-Net Output)
predicted_mask_binary = predict_image(best_model, satellite_image_raw, DEVICE, target_size=TARGET_SIZE)

# Calculate IoU Score
iou_score = calculate_iou_for_single_image(predicted_mask_binary, M_PATH, TARGET_SIZE)

# Load Ground Truth Mask for Visualization
ground_truth_mask = cv2.cvtColor(cv2.imread(M_PATH), cv2.COLOR_BGR2RGB)
ground_truth_mask_resized = cv2.resize(cv2.cvtColor(ground_truth_mask, cv2.COLOR_RGB2GRAY), TARGET_SIZE, interpolation=cv2.INTER_NEAREST)


print(f"\n=======================================================")
print(f"âœ… U-Net Prediction vs. Ground Truth (M05.png) IoU Score: {iou_score:.4f}")
print(f"=======================================================")

visualize(
    Satellite_Input = satellite_image_raw,
    Ground_Truth_Mask = ground_truth_mask_resized,
    Predicted_Mask_UNet = predicted_mask_binary
)

### Generate segmentated image for input satellite image using best_model.pth

In [None]:
# =========================================================================
# 6. DEPLOYMENT: PREDICT ON UNSEEN USER IMAGES AND SAVE MASK
# =========================================================================

import os, cv2
import numpy as np
import torch
import torch.nn.functional as F
import segmentation_models_pytorch as smp
import matplotlib.pyplot as plt

# --- 1. Load Trained Model (Setup) ---
try:
    # Model architecture must match the trained model (ResNet34, 2 classes)
    best_model = smp.Unet(encoder_name="resnet34", in_channels=3, classes=2)
    state_dict = torch.load('./best_model.pth', map_location='cuda' if torch.cuda.is_available() else 'cpu')
    best_model.load_state_dict(state_dict, strict=False)

    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    best_model.to(DEVICE);
    best_model.eval()
    print('Deployment model loaded successfully.')
except Exception as e:
    print(f"[CRITICAL ERROR] Failed to load model for deployment. Ensure training was successful. Error: {e}")
    exit()

# --- 2. Prediction Function (Includes Saving) ---
def predict_and_visualize_user_image(image_path, model, device, save_filename, target_size=(512, 512)):

    # Load and preprocess the user image
    image_bgr = cv2.imread(image_path)
    if image_bgr is None:
        print(f"ERROR: Could not find image at path: {image_path}")
        return

    original_image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)

    # Preprocessing
    # NOTE: Assuming the 'get_preprocessing' function from the previous code is available
    image_tensor = get_preprocessing()(image=original_image_rgb)['image']
    x_tensor = torch.from_numpy(image_tensor).to(device).unsqueeze(0)

    # Run Prediction
    with torch.no_grad():
        pred_logits = model(x_tensor)

    pred_prob = torch.sigmoid(pred_logits).squeeze().cpu().numpy()
    predicted_mask_binary = (pred_prob[1, :, :] > 0.5).astype(np.uint8) * 255

    # --- Morphological Closing Fix: Fills lines/gaps into solid shapes ---
    kernel = np.ones((5, 5), np.uint8)
    predicted_mask_binary = cv2.morphologyEx(predicted_mask_binary, cv2.MORPH_CLOSE, kernel)

    predicted_mask_resized = cv2.resize(predicted_mask_binary, target_size, interpolation=cv2.INTER_NEAREST)

    # --- CRITICAL ACTION: SAVE THE MASK ---
    cv2.imwrite(save_filename, predicted_mask_resized)
    print(f"âœ… Generated Segmentation Mask successfully saved as: {save_filename}")

    # --- Visualization ---
    plt.figure(figsize=(15, 6))

    plt.subplot(1, 2, 1)
    plt.imshow(original_image_rgb)
    plt.title(f"Input Image: {os.path.basename(image_path)}")
    plt.axis('off')

    plt.subplot(1, 2, 2)
    plt.imshow(predicted_mask_resized, cmap='gray')
    plt.title("Generated Segmentation Mask")
    plt.axis('off')

    plt.show()
    return predicted_mask_resized

# =========================================================================
# 3. USER INTERFACE (Test and Save)
# =========================================================================

# ===> CHANGE THIS ID TO TEST ANY IMAGE (e.g., 'S15' for S15.png) <===
TEST_IMAGE_ID = '1'
USER_INPUT_IMAGE_PATH = '/content/SV2_Etah.png'

# ===> DEFINE THE OUTPUT FILENAME HERE <===
OUTPUT_FILENAME = f"segmented_mask_{TEST_IMAGE_ID}.png"

# ----------------------------------------------------
# Execute the prediction function
# ----------------------------------------------------
final_mask = predict_and_visualize_user_image(USER_INPUT_IMAGE_PATH, best_model, DEVICE, OUTPUT_FILENAME)

# Testing
### Test run on different possibilites
ResNet on

With Road

Without Road

Link: https://colab.research.google.com/drive/1twvb8eTJhAsOl9ndvcDifQF_ra8KobMV

Massachusetts-road-dataset:

...To be continue...