In [1]:
# Topographic Position Index (TPI) from DEM — tiled version
# Requirements:
#   pip install rasterio numpy scipy tqdm   (tqdm optional)
#
# Usage: adjust dem_path, tpi_out_path, window_size (odd), tile_size (e.g., 2048)
# Then run in your Python environment.

import math
import numpy as np
import rasterio
from rasterio.windows import Window
from scipy.ndimage import convolve

try:
    from tqdm import tqdm
    _has_tqdm = True
except Exception:
    _has_tqdm = False

# ---------- USER INPUT ----------
dem_path = r"C:\Users\Pekham\Downloads\Kolkata_DEM_20km.tif"         # <- CHANGE THIS
tpi_out_path = r"D:\Urban flood\Kolkata_TPI.tif"          # <- CHANGE THIS

# Window size for TPI kernel (must be odd: 11, 21, 33, ...)
window_size = 33   # e.g. 33x33 pixels

# Tile size to process at a time (in pixels, affects memory). Choose based on RAM.
# Note: tile_size should be >= 1 and can be e.g. 1024, 2048, 4096 depending on RAM.
tile_size = 2048
# -------------------------------

if window_size % 2 == 0:
    raise ValueError("window_size must be odd (e.g., 11, 21, 33).")

kernel = np.ones((window_size, window_size), dtype="float32")
radius = window_size // 2  # number of pixels to pad around each tile for correct convolution

# Helper to compute tile windows over raster dimensions
def sliding_tiles(width, height, tile_w, tile_h):
    """Yield (col_off, row_off, w, h) tiles covering the raster."""
    for row_off in range(0, height, tile_h):
        h = min(tile_h, height - row_off)
        for col_off in range(0, width, tile_w):
            w = min(tile_w, width - col_off)
            yield col_off, row_off, w, h

with rasterio.open(dem_path) as src:
    profile = src.profile.copy()
    dem_width = src.width
    dem_height = src.height
    dem_crs = src.crs
    dem_transform = src.transform
    nodata = src.nodata

    # Decide output nodata
    if nodata is None:
        tpi_nodata = -9999.0
        out_nodata = tpi_nodata
    else:
        # reuse input nodata for marking invalid areas
        out_nodata = nodata
        tpi_nodata = nodata

    # Update profile for single-band float32 output
    out_profile = profile.copy()
    out_profile.update(
        dtype="float32",
        count=1,
        compress="lzw",
        nodata=out_nodata
    )

    # Create (or overwrite) output and write zeros (or nodata) so the file exists and has correct size
    with rasterio.open(tpi_out_path, "w", **out_profile) as dst:
        # initialize output with nodata if specified, else zeros
        init_value = out_nodata if out_nodata is not None else 0.0
        # write in blocks to avoid huge memory usage
        # We'll fill blocks as we compute them below, so no need to pre-fill entire file.

        # Prepare iteration list to enable progress bar
        tiles = list(sliding_tiles(dem_width, dem_height, tile_size, tile_size))
        iterator = tqdm(tiles, desc="Tiles") if _has_tqdm else tiles

        for tile in iterator:
            col_off, row_off, w, h = tile

            # Compute read window including padding (clamp to raster extents)
            read_col_off = max(col_off - radius, 0)
            read_row_off = max(row_off - radius, 0)
            read_col_end = min(col_off + w + radius, dem_width)
            read_row_end = min(row_off + h + radius, dem_height)
            read_w = read_col_end - read_col_off
            read_h = read_row_end - read_row_off

            read_window = Window(read_col_off, read_row_off, read_w, read_h)

            # Read DEM block (with padding)
            dem_block = src.read(1, window=read_window, boundless=False).astype("float32", copy=False)

            # Build valid mask for this block
            if nodata is None:
                valid_mask = np.isfinite(dem_block)
            else:
                valid_mask = (dem_block != nodata) & np.isfinite(dem_block)

            # Replace invalid with 0 for convolution sums (we'll account for counts)
            dem_block_filled = dem_block.copy()
            dem_block_filled[~valid_mask] = 0.0

            # Compute sum and count across neighborhood using convolution
            sum_elev = convolve(dem_block_filled, kernel, mode="nearest")
            count_valid = convolve(valid_mask.astype("float32"), kernel, mode="nearest")

            # Avoid division by zero
            mean_elev = np.empty_like(sum_elev, dtype="float32")
            with np.errstate(invalid="ignore", divide="ignore"):
                mean_elev = sum_elev / count_valid
            mean_elev[count_valid == 0] = np.nan  # mark positions without any valid neighbors

            # Compute TPI on the block
            tpi_block = dem_block - mean_elev

            # Now extract the central region corresponding to the original tile (without padding)
            # central window offsets relative to read_window
            inner_col_off = col_off - read_col_off
            inner_row_off = row_off - read_row_off
            inner_col_end = inner_col_off + w
            inner_row_end = inner_row_off + h

            tpi_inner = tpi_block[inner_row_off:inner_row_end, inner_col_off:inner_col_end]

            # Prepare write window corresponding to tile position
            write_window = Window(col_off, row_off, w, h)

            # Where original DEM was nodata (in the inner area), set output nodata
            # We must also map which positions in the tpi_inner have count_valid == 0 (no neighbors) -> nodata
            # Extract the corresponding valid mask and count for the inner region
            valid_inner = valid_mask[inner_row_off:inner_row_end, inner_col_off:inner_col_end]
            count_inner = count_valid[inner_row_off:inner_row_end, inner_col_off:inner_col_end]

            # Prepare output array to write
            out_arr = np.full((h, w), tpi_nodata, dtype="float32")

            # Positions that are valid and have at least 1 neighbor -> keep computed tpi
            keep_mask = (valid_inner) & (count_inner > 0)
            # For positions where count_inner > 0 but dem is valid=False, we still want nodata (as original dem had nodata)
            # So keep_mask already requires valid_inner True.

            out_arr[keep_mask] = tpi_inner[keep_mask].astype("float32")

            # For positions where dem is valid but count_inner == 0 (very small rasters with no neighbors) mark nodata
            # out_arr already filled with tpi_nodata

            # Write to output
            dst.write(out_arr, 1, window=write_window)

        # done iterating tiles
    print("✅ TPI saved to:", tpi_out_path)


Tiles: 100%|█████████████████████████████████████████████████████████████████████████████| 1/1 [00:04<00:00,  4.58s/it]

✅ TPI saved to: D:\Urban flood\Kolkata_TPI.tif





In [1]:
# Topographic Wetness Index (TWI) from DEM — tiled version (no external DEM libs)
# Requirements:
#   pip install rasterio numpy scipy tqdm   (tqdm optional)
#
# Usage: adjust dem_path, twi_out_path, tile_size, margin etc. Then run in your Python environment.
#
# Notes:
#  - This implementation fills local pits inside each tile using a priority-flood
#    (seeded from the tile + margin boundary). Larger margin reduces tile-edge artifacts.
#  - Flow accumulation is computed with a D8-steepest-descent rule and a topological
#    accumulation (stack/queue). For very large DEMs and high resolutions this can be slow;
#    increase tile_size when memory allows or consider a native tool (WhiteboxTools/TauDEM)
#    for production runs.
#  - The algorithm treats tile margin as "open" boundary (so flow can leave through it).
#    Ensure margin is large enough for stable drainage patterns.
# ------------------------------------------------------------------------------

import math
import heapq
import numpy as np
import rasterio
from rasterio.windows import Window
from scipy.ndimage import gaussian_filter  # optional smoothing if needed

try:
    from tqdm import tqdm
    _has_tqdm = True
except Exception:
    _has_tqdm = False

# ---------- USER INPUT ----------
dem_path = r"C:\Users\Pekham\Downloads\Kolkata_DEM_20km.tif"  # <- CHANGE THIS
twi_out_path = r"D:\Urban flood\India_TWI.tif"    # <- CHANGE THIS

tile_size = 2048        # tile interior size (pixels)
margin = 128            # margin (pixels) around each tile for edge effects
nodata_out = -9999.0    # output nodata
min_tan = 1e-6          # minimum tan(slope) to avoid division by zero
# -------------------------------

# Helper: generate tile windows (col_off, row_off, w, h)
def sliding_tiles(width, height, tile_w, tile_h):
    for row_off in range(0, height, tile_h):
        h = min(tile_h, height - row_off)
        for col_off in range(0, width, tile_w):
            w = min(tile_w, width - col_off)
            yield col_off, row_off, w, h

# Priority-flood pit filling (priority queue) seeded by border cells
def priority_flood_fill(elev):
    """
    Fill depressions in elev (2D numpy array) using priority-flood.
    Borders are treated as drains (seed).
    elev: float64 array with np.nan for nodata (we will treat nan as high)
    returns filled_elev (float64)
    """
    rows, cols = elev.shape
    filled = elev.copy()
    # Replace nan with a large value so they don't act as sinks internally,
    # but we'll mark them as visited and keep them as nan in the final output.
    nan_mask = np.isnan(filled)
    large = np.nanmax(filled[np.isfinite(filled)]) if np.any(np.isfinite(filled)) else 0.0
    if np.isnan(large):
        large = 0.0
    filled[nan_mask] = large + 1e6

    visited = np.zeros_like(filled, dtype=np.uint8)
    heap = []

    # push border cells to heap
    for r in range(rows):
        for c in (0, cols - 1):
            if visited[r, c] == 0:
                heapq.heappush(heap, (float(filled[r, c]), r, c))
                visited[r, c] = 1
    for c in range(cols):
        for r in (0, rows - 1):
            if visited[r, c] == 0:
                heapq.heappush(heap, (float(filled[r, c]), r, c))
                visited[r, c] = 1

    # 8-neighbors
    nbrs = [(-1, -1), (-1, 0), (-1, 1),
            (0, -1),           (0, 1),
            (1, -1),  (1, 0),  (1, 1)]

    while heap:
        elev_val, r, c = heapq.heappop(heap)
        for dr, dc in nbrs:
            rr = r + dr
            cc = c + dc
            if rr < 0 or rr >= rows or cc < 0 or cc >= cols:
                continue
            if visited[rr, cc]:
                continue
            visited[rr, cc] = 1
            neighbor_elev = filled[rr, cc]
            # If neighbor is lower than current popped elevation, raise it
            if neighbor_elev < elev_val:
                filled[rr, cc] = elev_val
                heapq.heappush(heap, (float(filled[rr, cc]), rr, cc))
            else:
                heapq.heappush(heap, (float(neighbor_elev), rr, cc))

    # revert huge values back to nan
    filled[nan_mask] = np.nan
    return filled

# compute D8 flow direction (receiver indices) from a filled DEM
def compute_d8_receivers(filled):
    """
    For each cell, find its receiver (row,col) index where it flows to (the neighbor with steepest descent).
    If no lower neighbor exists, receiver is None (sink or flat edge).
    Returns receivers arrays of shape (rows, cols, 2) with -1 for no receiver.
    """
    rows, cols = filled.shape
    # create arrays to hold receiver indices (-1 means none)
    recv_r = -np.ones((rows, cols), dtype=np.int32)
    recv_c = -np.ones((rows, cols), dtype=np.int32)

    # neighbor offsets and their distances (for slope weighting)
    nbrs = [(-1, -1), (-1, 0), (-1, 1),
            (0, -1),           (0, 1),
            (1, -1),  (1, 0),  (1, 1)]
    # distance (approx) for diagonal neighbors
    nbr_dist = [math.sqrt(2), 1.0, math.sqrt(2),
                1.0,           1.0,
                math.sqrt(2), 1.0, math.sqrt(2)]

    for r in range(1, rows - 1):
        for c in range(1, cols - 1):
            elev = filled[r, c]
            if np.isnan(elev):
                continue
            max_drop = 0.0
            best = None
            for (dr_dc, dist) in zip(nbrs, nbr_dist):
                dr, dc = dr_dc
                nr, nc = r + dr, c + dc
                nb_elev = filled[nr, nc]
                if np.isnan(nb_elev):
                    continue
                drop = (elev - nb_elev) / dist  # steeper effective drop per distance
                if drop > max_drop:
                    max_drop = drop
                    best = (nr, nc)
            if best is not None:
                recv_r[r, c] = best[0]
                recv_c[r, c] = best[1]
            else:
                # no lower neighbor -> leave as -1 (local pit/flat that wasn't lowered)
                recv_r[r, c] = -1
                recv_c[r, c] = -1

    # edges: set to -1 (they are boundaries; flow allowed out through margin)
    return recv_r, recv_c

# compute flow accumulation given receivers: topological approach
def compute_flow_accumulation(recv_r, recv_c):
    """
    Compute D8 flow accumulation (number of contributing cells incl. self).
    recv_r, recv_c: receiver indices as int arrays with -1 for none.
    Returns accumulation array float64.
    """
    rows, cols = recv_r.shape
    acc = np.ones((rows, cols), dtype=np.float64)  # each cell contributes itself

    # compute number of inflowing neighbors for each cell
    inflow_count = np.zeros((rows, cols), dtype=np.int32)
    for r in range(rows):
        for c in range(cols):
            rr = recv_r[r, c]
            cc = recv_c[r, c]
            if rr >= 0 and cc >= 0:
                inflow_count[rr, cc] += 1

    # queue cells with zero inflow (sources)
    from collections import deque
    q = deque()
    for r in range(rows):
        for c in range(cols):
            if inflow_count[r, c] == 0:
                q.append((r, c))

    # propagate accumulation downstream
    while q:
        r, c = q.popleft()
        rr = recv_r[r, c]
        cc = recv_c[r, c]
        if rr >= 0 and cc >= 0:
            acc[rr, cc] += acc[r, c]
            inflow_count[rr, cc] -= 1
            if inflow_count[rr, cc] == 0:
                q.append((rr, cc))

    return acc

# compute slope (radians) from filled DEM using central differences
def compute_slope_radians(filled, xres, yres):
    """
    Compute slope in radians from filled DEM. xres,yres are pixel sizes in map units.
    """
    # use central differences; handle nan by filling temporarily with nearest valid via small gaussian (or ignore)
    # We'll compute gradients with np.gradient, replacing nan with local mean if needed
    dem = filled.copy()
    nan_mask = np.isnan(dem)
    if np.any(nan_mask):
        # simple inpainting: replace nans with neighbors' mean via convolution could be costly; use nearest valid fill
        # For safety, replace remaining nans with large value and later mark slope as nan where original was nan
        dem[nan_mask] = np.nanmean(dem[~nan_mask]) if np.any(~nan_mask) else 0.0

    dz_dy, dz_dx = np.gradient(dem, yres, xres)  # note order: first axis is y (rows), second is x (cols)
    slope = np.arctan(np.sqrt(dz_dx**2 + dz_dy**2))
    # Where original was nan, set slope to nan
    slope[nan_mask] = np.nan
    return slope

# ---------------- Main tiled processing ----------------
with rasterio.open(dem_path) as src:
    profile = src.profile.copy()
    dem_width = src.width
    dem_height = src.height
    transform = src.transform
    crs = src.crs
    src_nodata = src.nodata

    # determine output nodata
    out_nodata = nodata_out if nodata_out is not None else -9999.0

    out_profile = profile.copy()
    out_profile.update(dtype="float32", count=1, nodata=out_nodata, compress="lzw")

    # create/overwrite output file
    with rasterio.open(twi_out_path, "w", **out_profile) as dst:
        tiles = list(sliding_tiles(dem_width, dem_height, tile_size, tile_size))
        iterator = tqdm(tiles, desc="Tiles") if _has_tqdm else tiles

        for col_off, row_off, w, h in iterator:
            # compute read window including margin padding
            read_col_off = max(col_off - margin, 0)
            read_row_off = max(row_off - margin, 0)
            read_col_end = min(col_off + w + margin, dem_width)
            read_row_end = min(row_off + h + margin, dem_height)
            read_w = read_col_end - read_col_off
            read_h = read_row_end - read_row_off

            read_window = Window(read_col_off, read_row_off, read_w, read_h)

            dem_block = src.read(1, window=read_window, boundless=False).astype("float64", copy=False)

            # Build valid mask
            if src_nodata is None:
                valid_mask = np.isfinite(dem_block)
            else:
                valid_mask = (dem_block != src_nodata) & np.isfinite(dem_block)

            # set np.nan for invalids
            dem_block_f = dem_block.copy()
            dem_block_f[~valid_mask] = np.nan

            # ---- pit-fill the read block using priority-flood (seeded from block boundary) ----
            filled = priority_flood_fill(dem_block_f)

            # ---- compute D8 receivers and flow accumulation ----
            # Pad one-cell border to avoid indexing issues (we already have margin, so interior should be ok)
            recv_r, recv_c = compute_d8_receivers(filled)
            fa = compute_flow_accumulation(recv_r, recv_c)  # number of contributing pixels incl. self

            # ---- compute slope (radians) ----
            xres = transform.a
            yres = abs(transform.e)
            slope = compute_slope_radians(filled, xres, yres)  # radians

            # ---- compute specific catchment area As and TWI ----
            cell_area = abs(xres * yres)
            As = fa * cell_area  # in map units^2
            tan_slope = np.tan(slope)
            # avoid zero by clipping
            tan_slope_clipped = tan_slope.copy()
            # wherever slope is nan (due to nodata), keep nan
            with np.errstate(invalid="ignore"):
                tan_slope_clipped[np.isfinite(tan_slope_clipped) & (np.abs(tan_slope_clipped) < min_tan)] = min_tan

            # compute TWI, but keep positions that were invalid originally as nan
            twi_block = np.full_like(As, np.nan, dtype=np.float64)
            valid_positions = np.isfinite(As) & np.isfinite(tan_slope_clipped)
            with np.errstate(divide="ignore", invalid="ignore"):
                twi_block[valid_positions] = np.log(As[valid_positions] / tan_slope_clipped[valid_positions])

            # ---- extract inner tile region (without margin) to write ----
            inner_col_off = col_off - read_col_off
            inner_row_off = row_off - read_row_off
            inner_col_end = inner_col_off + w
            inner_row_end = inner_row_off + h

            twi_inner = twi_block[inner_row_off:inner_row_end, inner_col_off:inner_col_end]

            # Build write window and output array
            write_window = Window(col_off, row_off, w, h)
            out_arr = np.full((h, w), out_nodata, dtype="float32")

            # Determine where original DEM had valid data for inner block
            valid_inner = valid_mask[inner_row_off:inner_row_end, inner_col_off:inner_col_end]
            # Where twi_inner is finite and original dem valid -> write value
            write_mask = np.isfinite(twi_inner) & valid_inner
            out_arr[write_mask] = twi_inner[write_mask].astype("float32")

            # Write block
            dst.write(out_arr, 1, window=write_window)

        # finished tiles
    print("✅ TWI saved to:", twi_out_path)


Tiles: 100%|█████████████████████████████████████████████████████████████████████████████| 1/1 [00:26<00:00, 26.71s/it]

✅ TWI saved to: D:\Urban flood\India_TWI.tif



