# Transconstinetal multitemporal and multifactor probabilistic corridors

This notebook has two main sections:<br>
    - The first, dedicated to the generation of cost surfaces in GEE (cells 1-4).<br>
    - The second, dedicated to the generation of the probabilistic corridors (cells 5-12).<br>
<br>
Instructions are provided as markdown cells but also within the code, be sure to follow them for an efficient use.

## Google Earth Engine (GEE)-based multitemporal multifactor cost surfaces
**Behavior:** This notebook **forces a fresh OAuth flow every run** (no cached tokens), so it’s safe to share and run on different machines.

**How to run**
1. Run **Cell 1** to install exact-range versions (once per machine or after kernel reset).
2. Run **Cell 2** to check versions.
3. Run **Cell 3** to authenticate (you’ll be prompted) and initialize EE (optionally provide a Cloud Project ID).
4. Run **Cell 4** for the generation of the cost surface.

**Notes**
- Tokens are wiped from typical EE cache folders at the start of each run.
- In Colab, auth is ephemeral; this notebook still enforces a fresh flow.
- If you use a Google Cloud project with billing/exports, have the Project ID ready.


In [28]:
# Installs with compatibility pins
import sys, subprocess, importlib

PKGS = [
    # Core
    "earthengine-api>=0.1.390,<0.2",
    "geemap>=0.32,<0.40",
    # Auth & GCP I/O
    "google-auth>=2.20,<3",
    "google-auth-oauthlib>=1.0,<2",
    "google-cloud-storage>=2.10,<3",
    # Notebook info (optional but helpful)
    "ipykernel>=6.25,<7",
    "jupyterlab>=4.0,<5",
]

def ensure(pkgs):
    # Install missing or unpinned packages into the current environment/kernel
    cmd = [sys.executable, "-m", "pip", "install", "--upgrade", "--quiet", *pkgs]
    print("Installing/validating packages…")
    subprocess.check_call(cmd)

try:
    import ee, geemap  # quick check; if this fails, we install everything
except Exception:
    ensure(PKGS)

# Write a requirements lock (range-pinned) alongside the notebook
with open("requirements_gee.txt", "w", encoding="utf-8") as f:
    f.write("\n".join(PKGS) + "\n")

print("✔ Environment ready. Wrote 'requirements_gee.txt' with compatibility pins.")


✔ Environment ready. Wrote 'requirements_gee.txt' with compatibility pins.


In [29]:
import pkgutil, importlib, sys
mods = ["ee", "geemap", "google.auth", "google.oauth2", "google.cloud.storage"]
for m in mods:
    try:
        mod = importlib.import_module(m)
        version = getattr(mod, "__version__", "(no __version__ attr)")
        print(f"{m:<24} {version}")
    except Exception as e:
        print(f"{m:<24} not found ({e})")

print("\nPython:", sys.version)


ee                       1.6.15
geemap                   0.36.6
google.auth              2.41.1
google.oauth2            (no __version__ attr)
google.cloud.storage     3.4.1

Python: 3.11.5 | packaged by Anaconda, Inc. | (main, Sep 11 2023, 13:26:23) [MSC v.1916 64 bit (AMD64)]


In [None]:
# Fresh EE OAuth flow on every run
import os, shutil
from pathlib import Path

def wipe_ee_tokens():
    candidates = [
        Path.home() / ".config" / "earthengine",                    # Linux/macOS
        Path.home() / "AppData" / "Roaming" / "earthengine",        # Windows
    ]
    for d in candidates:
        if d.exists():
            try:
                shutil.rmtree(d, ignore_errors=True)
            except Exception as e:
                print(f"Warning: could not remove {d}: {e}")

wipe_ee_tokens()

import ee

# Notebook/CLI-friendly OAuth. This will open a link or prompt inline (e.g., Colab).
ee.Authenticate(auth_mode="notebook")

# Optional: Ask for a Google Cloud Project ID (needed for some exports/quotas)
PROJECT_ID = os.environ.get("EE_PROJECT") or input(
    "EE Cloud Project ID (press Enter to skip): "
).strip() or None

# Initialize EE
if PROJECT_ID:
    ee.Initialize(project=PROJECT_ID)
else:
    ee.Initialize()

# Quick sanity check (forces a tiny API call)
print("User profile email:", ee.data.getAssetRoots()[0]["id"].split(":")[0] if ee.data.getAssetRoots() else "(no asset roots)")
print("EE initialized ✔")


In [40]:
# Cost surface map: UI + compute + preview + export (Drive / Local)

import ee, geemap, math, os, time, io, requests
from IPython.display import display
import ipywidgets as W

# UI controls
month_choices = [
    ("Whole year","yr"),("Jan","jan"),("Feb","feb"),("Mar","mar"),
    ("Apr","apr"),("May","may"),("Jun","jun"),("Jul","jul"),
    ("Aug","aug"),("Sep","sep"),("Oct","oct"),("Nov","nov"),("Dec","dec")
]
month = W.Dropdown(options=month_choices, value="yr", description="Period:", layout=W.Layout(width="260px"))

max_cost = W.BoundedFloatText(value=40.0, min=1, max=9999, step=1.0, description="Max cost:", layout=W.Layout(width="260px"))
bit_depth = W.RadioButtons(options=[("16-bit (recommended)","16"),("8-bit","8")], value="16", description="Depth:")
use_water_attraction = W.ToggleButtons(options=[("No","n"),("Yes","y")], value="n", description="Water attract.:")
out_res = W.FloatText(value=None, description="Out res (m):", placeholder="blank = DSM native", layout=W.Layout(width="260px"))
out_res_hint = W.HTML("<div style='margin-top:-10px;font-size:10px;color:#666'>0 = DSM native resolution</div>")

export_target = W.Dropdown(options=[("Google Drive","drive"),("None (preview only)","none")],
                           value="none", description="Export to:", layout=W.Layout(width="260px"))
export_name = W.Text(value="costsurf_SilkRoute", description="Name:", layout=W.Layout(width="300px"))

run_btn = W.Button(description="Build, Preview & (Optionally) Export", button_style="primary", icon="play")
status = W.HTML()

controls = W.HBox([
    W.VBox([month, max_cost, bit_depth]),
    W.VBox([use_water_attraction, out_res, out_res_hint, export_target]),
    W.VBox([export_name, run_btn, status]),
])

display(controls)

# Geometry & base
geometry = ee.Geometry.Polygon([
    [26.034856936314327, 31.965400777310872],
    [34.19255939893447,27.520378920323584],
    [70.85299048338345, 19.854238685800006],
    [90.28374707240327,20.074324423110866],
    [112.29527848792249,21.22664649657822],
    [112.23560562718355,47.3578548447329],
    [70.6364691148242, 53.122300330081565],
    [25.974978246994738, 47.85290888472198],
    [26.034856936314327,31.965400777310872]
])

Map = geemap.Map(center=[37.566, 67.601], zoom=5)  # change as necessary
display(Map)

# Helpers
def _period_from_choice(choice):
    # Returns (ee.Filter.dayOfYear, monthNum, altNum, periodName)
    per_map = {
        "yr":  (ee.Filter.dayOfYear(1, 364), None, None, "yr"),
        "jan": (ee.Filter.dayOfYear(1, 31), "00", "01", "jan"),
        "feb": (ee.Filter.dayOfYear(32, 59), "01", "02", "feb"),
        "mar": (ee.Filter.dayOfYear(60, 90), "02", "03", "mar"),
        "apr": (ee.Filter.dayOfYear(91, 120), "03", "04", "apr"),
        "may": (ee.Filter.dayOfYear(121, 151), "04", "05", "may"),
        "jun": (ee.Filter.dayOfYear(152, 181), "05", "06", "jun"),
        "jul": (ee.Filter.dayOfYear(182, 212), "06", "07", "jul"),
        "aug": (ee.Filter.dayOfYear(213, 243), "07", "08", "aug"),
        "sep": (ee.Filter.dayOfYear(244, 273), "08", "09", "sep"),
        "oct": (ee.Filter.dayOfYear(274, 304), "09", "10", "oct"),
        "nov": (ee.Filter.dayOfYear(305, 334), "10", "11", "nov"),
        "dec": (ee.Filter.dayOfYear(335, 365), "11", "12", "dec"),
    }
    return per_map[choice]

def _albedo_palette():
    # Same palette sequence as your JS (blue→green→yellow→orange→red→magenta)
    return ['0f00ff','03ff00','efff00','ffbc00','ff0000','ff0081']

# Core compute
def build_cost_surface(periodSel, maxCost, bitSel, queryConv, outrr_override):
    # DSM
    elevation_ic = ee.ImageCollection('JAXA/ALOS/AW3D30/V3_2').select('DSM')
    dsm = elevation_ic.mosaic()
    proj = elevation_ic.first().select(0).projection()

    # Period
    period, monthNum, altNum, periodName = _period_from_choice(periodSel)

    # Slope (in degrees → radians → tan)
    slope = ee.Terrain.slope(dsm.setDefaultProjection(proj))
    slp_rad = slope.multiply(math.pi).divide(180.0)
    slp_dg = slp_rad.tan()

    # Herzog/Minetti cost of slope
    costSlp1 = (slp_dg.pow(6).multiply(1337.8)
                .add(slp_dg.pow(5).multiply(278.19))
                .subtract(slp_dg.pow(4).multiply(517.39))
                .subtract(slp_dg.pow(3).multiply(78.199))
                .add(slp_dg.pow(2).multiply(93.419))
                .add(slp_dg.multiply(19.825))
                .add(1.64))
    costSlp = costSlp1.rename('cost').where(costSlp1.gt(99.99), 99.99)

    # Snow (MODIS daily → mean over period)
    snowBase = (ee.ImageCollection('MODIS/061/MOD10A1').select('NDSI_Snow_Cover')
                .filter(period)
                .filterBounds(geometry)
                .reduce(ee.Reducer.mean())
                .unmask(0))
    snow = snowBase.divide(33.33).add(ee.Image(1))  # ~1..4

    # Sea mask (ALOS MSK band)
    mskALOS = ee.ImageCollection('JAXA/ALOS/AW3D30/V3_2').select('MSK').mosaic()
    bitSea = mskALOS.bitwiseAnd(3).eq(0)
    seaMsk = bitSea.remap([0,1],[maxCost,0])

    # Dams/Reservoirs (user asset)
    maskDams = (ee.FeatureCollection('users/hao23/Grand_reservoirs')
                .reduceToImage(properties=['value'], reducer=ee.Reducer.first())
                .unmask(0)
                .remap([0,1],[1,0]))
    dams = maskDams.remap([0,1],[3.28,0]).float()

    # Temperatures & wind (WorldClim + TerraClimate)
    tempMinCold, tempMaxCold = -15, 10
    windSpeed = (ee.ImageCollection('IDAHO_EPSCOR/TERRACLIMATE').select('vs')
                 .filter(period).filterBounds(geometry).reduce(ee.Reducer.mean())
                 .unmask(0).multiply(0.036))  # 0.01 scale * 3.6 (m/s->km/h)

    if periodSel == 'yr':
        temp = (ee.ImageCollection('WORLDCLIM/V1/MONTHLY').select('tavg')
                .reduce(ee.Reducer.mean()).unmask(0).multiply(0.1))
    else:
        temp = (ee.Image('WORLDCLIM/V1/MONTHLY/' + altNum).select('tavg')
                .unmask(0).multiply(0.1))

    # Wind chill (valid T<=10C and V>4.8 km/h)
    wcValues = (ee.Image(13.12)
                .add(ee.Image(0.6215).multiply(temp))
                .subtract(ee.Image(11.37).multiply(windSpeed.pow(0.16)))
                .add(ee.Image(0.3965).multiply(temp).multiply(windSpeed.pow(0.16))))
    windChill = temp.where( temp.lte(10).And(windSpeed.gte(4.8)).Not(), wcValues)

    cold1 = windChill.gte(tempMinCold)
    cold2 = windChill.lte(tempMaxCold)
    cold3 = windChill.lt(tempMinCold).multiply(tempMinCold)
    cold4 = windChill.gt(tempMaxCold).multiply(tempMaxCold)
    cold5 = (cold1.multiply(cold2)).multiply(windChill).add(cold3).add(cold4)
    cold  = ee.Image(2).subtract((cold5.subtract(tempMinCold)).divide(tempMaxCold - tempMinCold))  # 1..2

    # Loose sand / dunes (user asset; expects probability)
    looseSandRaw = ee.Image('users/hao23/looseSand')
    looseSand = ee.Image(1).add(looseSandRaw.gte(0.7).multiply(looseSandRaw.multiply(0.9)))

    # Heat (lack of water via temp) 25..40C → 0..2
    tempMin, tempMax = 25, 40
    heat5 = temp.where(temp.lt(tempMin), tempMin).where(temp.gt(tempMax), tempMax)
    heat = (heat5.subtract(tempMin)).divide(tempMax - tempMin).multiply(2)  # 0..2

    # EVI (multi-annual/extended window)
    if periodSel == 'yr':
        evi = (ee.ImageCollection('LANDSAT/COMPOSITES/C02/T1_L2_8DAY_EVI')
               .filterDate('1984-01-01', '1995-12-31').filter(period)
               .filterBounds(geometry).reduce(ee.Reducer.mean()).unmask(0))
    else:
        evi = (ee.ImageCollection('LANDSAT/COMPOSITES/C02/T1_L2_8DAY_EVI')
               .filterDate('1984-01-01', '2015-12-31').filter(period)
               .filterBounds(geometry).reduce(ee.Reducer.mean()).unmask(0))

    # Desert factor via EVI threshold (0.. → 1..2), negatives (water)→1
    dsrtThrs = 0.075
    desert5 = evi.where(evi.lt(0), dsrtThrs).clamp(0, dsrtThrs)
    desert6 = desert5.divide(dsrtThrs).add(1)  # 1..2
    desert  = desert6.where(evi.lt(0), 1)

    # No water multiplier (aridity^heat)
    noWat = desert.pow(heat)

    # Water attraction (optional, heavy)
    if queryConv == 'y':
        rWat_km = 7.5
        watConv = noWat.gt(1).focal_median(kernel=ee.Kernel.circle(rWat_km * 1000, 'meters'))
        waterThrs = 0.2
        watAtt = watConv.multiply(evi.gte(waterThrs)).remap([0,1],[1,0.75])
    else:
        watAtt = ee.Image(1)

    # Surface water (JRC)
    if periodSel == 'yr':
        watRecur = ee.Image('JRC/GSW1_4/GlobalSurfaceWater').select('occurrence').unmask(0)
    else:
        watRecur = ee.Image(f'JRC/GSW1_4/MonthlyRecurrence/monthly_recurrence_{monthNum}').select('monthly_recurrence').unmask(0)
    surfWat = watRecur.divide(33.33).add(ee.Image(1))  # 1..4

    # Altitude penalty above 2000 m (custom curve)
    height = (ee.Image(-0.0170146)
              .subtract(ee.Image(0.004039874802622)
                       .multiply(ee.Image(1).subtract(ee.Image(2.718281828459)
                       .pow(dsm.multiply(ee.Image(0.0008254486)))))))
    height = height.multiply(dsm.gt(2000)).add(ee.Image(1))

    # Raw combined cost
    costsRaw = (costSlp
                .multiply(noWat)
                .multiply(watAtt)
                .multiply(surfWat)
                .multiply(snow)
                .multiply(looseSand)
                .multiply(cold)
                .multiply(height)
                .multiply(maskDams).add(dams)
                .add(seaMsk))

    # Cap at maxCost, then scale
    costsMax = costsRaw.where(costsRaw.gt(maxCost), maxCost)
    if bitSel == "8":
        costs = costsMax.divide(maxCost).multiply(255).byte().clip(geometry).unmask(255)
        vis = dict(min=1, max=150, palette=_albedo_palette())
    else:
        # "16-bit" emulation: preserve decimals (as float32) but keep vis min/max tight
        costs = (costsMax.multiply(1e4).round().divide(1e4)).multiply(1e-4).clip(geometry).unmask(0.004)
        vis = dict(min=0.0001, max=0.001, palette=_albedo_palette())

    # Output resolution
    native_res = proj.nominalScale()  # in meters
    rr = float(native_res.getInfo())
    if (outrr_override is None) or (outrr_override <= 0) or (outrr_override <= rr):
        outrr = rr
    else:
        outrr = float(outrr_override)

    return dict(
        image=costs.rename('cost'),
        vis=vis,
        periodName=periodName,
        scale=outrr,
        proj=proj
    )

def add_preview_layer(costs, vis):
    # Clear & add layer
    Map.clear_layers()
    Map.addLayer(costs, vis, "Cost surface")
    Map.centerObject(geometry, 5)

def snapshot_png(costs, vis, fname="cost_surface_preview.png"):
    # Thumbnail snapshot for the notebook output
    url = costs.visualize(**vis).getThumbURL({
        "region": geometry,
        "dimensions": 1024,
        "format": "png"
    })
    try:
        r = requests.get(url, timeout=120)
        r.raise_for_status()
        with open(fname, "wb") as f:
            f.write(r.content)
        from IPython.display import Image
        display(Image(filename=fname))
        return fname
    except Exception as e:
        print("Snapshot failed:", e)
        return None

def export_drive(costs, name, scale):
    task = ee.batch.Export.image.toDrive(
        image=costs,
        description=name,
        folder="GEE_Exports",
        fileNamePrefix=name,
        region=geometry,
        scale=scale,
        maxPixels=9e12
    )
    task.start()
    return task.id

def export_local(costs, name, scale):
    # Download GeoTIFF directly to the notebook machine
    out_tif = f"{name}.tif"
    geemap.ee_export_image(
        image=costs, filename=out_tif,
        scale=scale, region=geometry, file_per_band=False, crs="EPSG:4326"
    )
    return out_tif

# Wire UI
def _run_clicked(_):
    status.value = "<b>Running…</b>"
    try:
        cfg = build_cost_surface(
            periodSel=month.value,
            maxCost=float(max_cost.value),
            bitSel=bit_depth.value,
            queryConv=use_water_attraction.value,
            outrr_override=(None if (out_res.value in [None, ""]) else float(out_res.value))
        )
        costs, vis = cfg["image"], cfg["vis"]
        add_preview_layer(costs, vis)
        snap = snapshot_png(costs, vis, fname=f"{export_name.value}_preview.png")

        # Export
        if export_target.value == "drive":
            tid = export_drive(costs, f"{export_name.value}_{cfg['periodName']}_{int(cfg['scale'])}m", cfg["scale"])
            status.value = f"✅ Done. Snapshot shown. Drive export task id: <code>{tid}</code>."
        else:
            status.value = "✅ Done. Snapshot shown. (No export requested.)"

    except Exception as e:
        status.value = f"<span style='color:red'>❌ Error: {e}</span>"

run_btn.on_click(_run_clicked)


HBox(children=(VBox(children=(Dropdown(description='Period:', layout=Layout(width='260px'), options=(('Whole y…

Map(center=[37.566, 67.601], controls=(WidgetControl(options=['position', 'transparent_bg'], position='toprigh…

Snapshot failed: 400 Client Error: Bad Request for url: https://earthengine.googleapis.com/v1/projects/873546200438/thumbnails/afc7bd3b3f47b9728a71545003e323fc-6ac0cfe79d111aa3337a96b84d4f54b0:getPixels


## GRASS GIS-based raster analysis for the generation of probabilistic corridors
**Behavior:** The following cells need GRASS GIS to be installed in a local machine to work.

**How to run**
1. Run **Cell 1** to set up paramenters and environment providing:
  - A path to the cost surface raster generated by the execution of the previous cell (donloaded from Google Drive).
  - The number of origins/destinations for the corridors.
  - The coordinates (longitude,latitute) of origins/destinations use the same CRS of the cost surface raster WGS84 (EPSG 4326).
2. Run **Cell 2** to provide the path to GRASS' bat file and to check its version.
3. Run **Cell 3** to generate the cost distance rasters.
4. Run **Cell 4** to define the pairs of cost distance rasters (generated during the execution of the preivous cell) taht will form the corridors.
5. Run **Cell 5** to compute the corridors.
6. Run **Cell 6** to make sure the corridors created in Step 5 are avialable.
7. Run **Cell 7** to normalise corridors to 0–100 as Float64.
8. Run **Cell 8** to create a minimum raster composite over all normalized corridors producing the final output of the code.

**Notes**
- Please make sure all data needed is provided within the code.
- The code paralellizes the generation of outputs. Make sure you have plenty of computational resources available: as many cores as origin/destinations and large RAM memory. If this is not the case consider to run the algorithm in a server or run the processes sequentially.



In [32]:
# Step 1: Parameters (run first)

from pathlib import Path
import json
import math
import rasterio as rio

# 1) Path to your cost surface raster (e.g., r"C:\data\cost_surface.tif")
COST_RASTER_PATH = r"C:\CostSurf\costsurf_SilkRoute_yr_216m.tif"  # <-- fill in

# 2) How many cost-distance rasters to compute?
N_OUTPUTS = 3  # <-- set an integer >= 1

# 3) Start coordinates (Easting/X, Northing/Y) in the SAME CRS as COST_RASTER_PATH.
#    Provide exactly N_OUTPUTS pairs. Example:
#    START_COORDS = [(508450.0, 4641230.0), (512000.0, 4639000.0), (515000.0, 4635000.0)]
START_COORDS = [(29.019785,41.022583),(108.947162,34.264356),(66.900399,36.768930)]  # <-- fill in with a list of (x, y) tuples

# --------------------------------------------------------------------------
# Basic validation
# --------------------------------------------------------------------------
assert COST_RASTER_PATH, "Please provide COST_RASTER_PATH."
cost_path = Path(COST_RASTER_PATH)
assert cost_path.exists(), f"Cost raster not found: {cost_path}"

assert isinstance(N_OUTPUTS, int) and N_OUTPUTS >= 1, "N_OUTPUTS must be an integer >= 1."
assert isinstance(START_COORDS, (list, tuple)) and len(START_COORDS) == N_OUTPUTS, \
    "START_COORDS must be a list/tuple with exactly N_OUTPUTS (x, y) pairs."

# Read CRS and stats we will use to set null_cost
with rio.open(cost_path) as src:
    crs = src.crs
    epsg = crs.to_epsg() if crs is not None else None

# Compute a conservative null_cost (max of valid pixels) so NULLs are traversable but expensive
# If the raster is large, this single pass is still cheaper than doing it in each process.
import numpy as np
data = None
with rio.open(cost_path) as src:
    # Read in manageable chunks if needed; here we try a single read
    data = src.read(1, masked=True)
    # If *all* values are masked (unlikely), fallback to 1.0
    if np.ma.count(data) == 0:
        NULL_COST = 1.0
    else:
        NULL_COST = float(0.004)  # Use 0.004 if a 16bit cost surface is used and 255 if using a 8bit

print("✓ Inputs OK")
print(f"Cost raster: {cost_path}")
print(f"CRS: {crs}  |  EPSG: {epsg if epsg else 'unknown (XY tmp-location will be used)'}")
print(f"Number of outputs: {N_OUTPUTS}")
print(f"Start coords: {START_COORDS}")
print(f"null_cost to use: {NULL_COST}")


✓ Inputs OK
Cost raster: C:\CostSurf\costsurf_SilkRoute_yr_216m.tif
CRS: EPSG:4326  |  EPSG: 4326
Number of outputs: 3
Start coords: [(29.019785, 41.022583), (108.947162, 34.264356), (66.900399, 36.76893)]
null_cost to use: 0.004


In [33]:
# Step 2: Checking GRASS availability and version

from pathlib import Path
import shutil, subprocess, sys

# EDIT this to your actual GRASS .bat path (examples below)
# GRASS_BIN = r"C:\Program Files\GRASS GIS 8.4\grass84.bat"
# GRASS_BIN = r"C:\OSGeo4W\bin\grass.bat"
GRASS_BIN = r"C:\Program Files\GRASS GIS 8.3\grass83.bat"  # <-- fill in

assert GRASS_BIN, "Set GRASS_BIN to your GRASS launcher .bat file."
print("Using GRASS_BIN:", GRASS_BIN)

# Version check
try:
    out = subprocess.run([GRASS_BIN, "--version"], capture_output=True, text=True, check=True)
    print(out.stdout.strip() or out.stderr.strip())
except Exception as e:
    print("GRASS check failed:", e, file=sys.stderr)
    raise



Using GRASS_BIN: C:\Program Files\GRASS GIS 8.3\grass83.bat
GRASS GIS 8.3.0
Geographic Resources Analysis Support System (GRASS) is Copyright,
1999-2023 by the GRASS Development Team, and licensed under terms of the
GNU General Public License (GPL) version >=2.

This GRASS GIS 8.3.0 release is coordinated and produced by
the GRASS Development Team with contributions from all over the world.

This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
General Public License for more details.


In [34]:
# Step 3: r.cost execution

import os
import shlex
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed

out_dir = cost_path.parent
base = cost_path.stem

def q_path(p):
    """OS-aware quoting for file paths used inside shell commands."""
    p = str(p)
    if os.name == "nt":
        return f'"{p}"' if (" " in p or "(" in p or ")" in p) else p
    else:
        return shlex.quote(p)

def build_grass_pipeline(x, y, out_tif, epsg_code, null_cost, in_path):
    """
    Build command to run GRASS modules in a temporary location, with OS-specific chaining.
    """
    in_q  = q_path(in_path)
    out_q = q_path(out_tif)

    cmds = [
        f"r.in.gdal input={in_q} output=cost --o",
        "g.region raster=cost",
        f"r.cost -k input=cost start_coordinates={x},{y} output=cd_out null_cost={null_cost}",
        (
            "r.out.gdal "
            f"input=cd_out output={out_q} format=GTiff type=Float64 "
            'createopt="COMPRESS=LZW,TILED=YES,BIGTIFF=IF_SAFER" --o'
        ),
    ]
    tmp_loc = f"EPSG:{epsg_code}" if epsg_code else "XY"

    base_cmd = [GRASS_BIN, "--tmp-location", tmp_loc, "--exec"]

    if os.name == "nt":  # Windows
        chain = " && ".join(cmds)
        shell_part = ["cmd", "/c", chain]
    else:                # Linux/macOS
        chain = " ; ".join(cmds)
        shell_part = ["bash", "-lc", chain]

    return base_cmd + shell_part

def run_one(idx_coord):
    idx, (x, y) = idx_coord
    out_name = f"{base}_costdist_{idx+1}.tif"
    out_tif = out_dir / out_name
    cmd = build_grass_pipeline(x, y, out_tif, epsg, NULL_COST, cost_path)
    try:
        res = subprocess.run(
            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True
        )
        return (out_tif, res.stdout.strip(), res.stderr.strip(), True)
    except subprocess.CalledProcessError as e:
        return (out_tif, e.stdout, e.stderr, False)
    except Exception as e:
        return (out_tif, "", str(e), False)

max_workers = min(N_OUTPUTS, (os.cpu_count() or 1))
print(f"Launching {N_OUTPUTS} r.cost jobs with max_workers={max_workers} (ThreadPool)...")

results = []
with ThreadPoolExecutor(max_workers=max_workers) as ex:
    futures = [ex.submit(run_one, (idx, xy)) for idx, xy in enumerate(START_COORDS)]
    for fut in as_completed(futures):
        results.append(fut.result())

ok, failed = 0, 0
for out_tif, sout, serr, success in results:
    if success and out_tif.exists():
        ok += 1
        print(f"✓ Wrote {out_tif}")
    else:
        failed += 1
        print(f"✗ Failed for {out_tif.name}\nSTDOUT:\n{sout}\nSTDERR:\n{serr}\n")

print(f"\nDone. Successful: {ok} | Failed: {failed}")
print(f"Outputs directory: {out_dir}")


Launching 3 r.cost jobs with max_workers=3 (ThreadPool)...
✓ Wrote C:\CostSurf\costsurf_SilkRoute_yr_216m_costdist_2.tif
✓ Wrote C:\CostSurf\costsurf_SilkRoute_yr_216m_costdist_1.tif
✓ Wrote C:\CostSurf\costsurf_SilkRoute_yr_216m_costdist_3.tif

Done. Successful: 3 | Failed: 0
Outputs directory: C:\CostSurf


In [35]:
# Step 4: Corridor pairs selection

from pathlib import Path
import re

# If you ran Step 1, these already exist:
try:
    out_dir
    base
except NameError:
    # Fallback if running standalone: infer from COST_RASTER_PATH
    from pathlib import Path
    assert COST_RASTER_PATH, "Please set COST_RASTER_PATH as in Step 1."
    cost_path = Path(COST_RASTER_PATH)
    out_dir = cost_path.parent
    base = cost_path.stem

# Discover cost-distance rasters created in Step 1
cd_files = sorted(out_dir.glob(f"{base}_costdist_*.tif"))

assert cd_files, f"No cost-distance rasters found in {out_dir}. Expected files like '{base}_costdist_1.tif'."

print("Found cost-distance rasters:")
for i, p in enumerate(cd_files, 1):
    print(f"  {i:2d}: {p.name}")

# --------------------------------------------------------------------
# Define the PAIRS to average (by 1-based indices from the above list)
# Example: PAIRS = [(1,2), (1,3), (2,4)]
# --------------------------------------------------------------------
PAIRS = [(1,2), (1,3), (2,3)]  # <-- fill in your pairs, e.g., [(1,2), (3,5)]


Found cost-distance rasters:
   1: costsurf_SilkRoute_yr_216m_costdist_1.tif
   2: costsurf_SilkRoute_yr_216m_costdist_2.tif
   3: costsurf_SilkRoute_yr_216m_costdist_3.tif


In [36]:
# Step 5: Corridor computation

import os, subprocess, sys, platform, ctypes, tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

# Expect base, out_dir, PAIRS from previous cells

assert cd_files, "No cost-distance rasters (cd_files) found."
assert isinstance(PAIRS, (list, tuple)) and all(isinstance(p, (list, tuple)) and len(p) == 2 for p in PAIRS), \
       "PAIRS must be a list of (i, j) tuples."

n = len(cd_files)
pairs_0 = []
for (i, j) in PAIRS:
    assert 1 <= i <= n and 1 <= j <= n and i != j, f"Invalid pair {(i, j)}. Indices must be 1..{n} and different."
    pairs_0.append((i-1, j-1))  # 0-based

# Prefer EPSG tmp-location if available; fallback to XY
try:
    tmp_loc = f"EPSG:{epsg}" if epsg else "XY"
except NameError:
    tmp_loc = "XY"

IS_WINDOWS = platform.system().lower().startswith("win")

# ---- Windows path handling: no quotes; use 8.3 if spaces/parentheses ----
def win_short_path(path_str: str) -> str:
    buf = ctypes.create_unicode_buffer(32768)
    GetShortPathNameW = ctypes.windll.kernel32.GetShortPathNameW
    ret = GetShortPathNameW(path_str, buf, len(buf))
    return buf.value if ret else path_str

def safe_arg_path(p: Path) -> str:
    s = str(p)
    if IS_WINDOWS:
        if any(c in s for c in " ()"):
            s = win_short_path(s)
        return s  # unquoted
    else:
        return "'" + s.replace("'", "'\\''") + "'"

def build_grass_command_chain(in_a: Path, in_b: Path, out_tif: Path):
    a = safe_arg_path(in_a.resolve())
    b = safe_arg_path(in_b.resolve())
    o = safe_arg_path(out_tif.resolve())

    # Build r.mapcalc step:
    # - On Windows: write expression to temp file and use file=<path>
    # - On Unix: pass expression= with single quotes
    if IS_WINDOWS:
        # Create temp expression file (we return both the GRASS command and the temp path to clean later)
        tf = tempfile.NamedTemporaryFile(prefix="rmapcalc_", suffix=".txt", delete=False)
        tf.write(b"corridor = (a + b) / 2.0")
        tf.flush(); tf.close()
        expr_path = safe_arg_path(Path(tf.name))
        cmd_calc = f"r.mapcalc file={expr_path} --o"
        temp_to_cleanup = tf.name
    else:
        cmd_calc = "r.mapcalc expression='corridor = (a + b) / 2.0' --o"
        temp_to_cleanup = None

    cmd_rin_a = f"r.in.gdal input={a} output=a --o"
    cmd_rin_b = f"r.in.gdal input={b} output=b --o"
    cmd_region = "g.region raster=a,b"
    cmd_out    = (
        f"r.out.gdal input=corridor output={o} format=GTiff type=Float64 "
        f'createopt="COMPRESS=LZW,TILED=YES,BIGTIFF=IF_SAFER" --o'
    )

    if IS_WINDOWS:
        chain = " && ".join([cmd_rin_a, cmd_rin_b, cmd_region, cmd_calc, cmd_out])
        cmd = [GRASS_BIN, "--tmp-location", tmp_loc, "--exec", "cmd", "/C", chain]
    else:
        chain = " ; ".join([cmd_rin_a, cmd_rin_b, cmd_region, cmd_calc, cmd_out])
        cmd = [GRASS_BIN, "--tmp-location", tmp_loc, "--exec", "bash", "-lc", chain]

    return cmd, temp_to_cleanup

def run_pair(i0, j0):
    a = cd_files[i0]
    b = cd_files[j0]
    out_name = f"{base}_corridor_{i0+1}_{j0+1}.tif"
    out_tif = out_dir / out_name
    cmd, tmp_expr = build_grass_command_chain(a, b, out_tif)
    try:
        res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
        ok = out_tif.exists()
        return (out_tif, ok, res.stdout.strip(), res.stderr.strip(), tmp_expr)
    except subprocess.CalledProcessError as e:
        return (out_tif, False, e.stdout, e.stderr, tmp_expr)
    except Exception as e:
        return (out_tif, False, "", f"Python-level error: {repr(e)}", tmp_expr)

# You can set MAX_WORKERS=1 if you prefer serial execution on Windows
MAX_WORKERS = min(len(pairs_0), os.cpu_count() or 1)
print(f"Building {len(pairs_0)} corridor raster(s) with max_workers={MAX_WORKERS} (threaded) ...")

results = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    futs = [ex.submit(run_pair, i0, j0) for (i0, j0) in pairs_0]
    for f in as_completed(futs):
        results.append(f.result())

# Cleanup temp files (Windows) and report
ok, failed = 0, 0
for out_tif, success, so, se, tmp_expr in results:
    if tmp_expr and os.path.exists(tmp_expr):
        try:
            os.remove(tmp_expr)
        except Exception:
            pass
    if success:
        ok += 1
        print(f"✓ Wrote {out_tif.name}")
    else:
        failed += 1
        print(f"✗ Failed: {out_tif.name}\nSTDOUT:\n{so}\nSTDERR:\n{se}\n")

print(f"\nCorridors done. Successful: {ok} | Failed: {failed}")
print(f"Directory: {out_dir}")


Building 3 corridor raster(s) with max_workers=3 (threaded) ...
✓ Wrote costsurf_SilkRoute_yr_216m_corridor_2_3.tif
✓ Wrote costsurf_SilkRoute_yr_216m_corridor_1_3.tif
✓ Wrote costsurf_SilkRoute_yr_216m_corridor_1_2.tif

Corridors done. Successful: 3 | Failed: 0
Directory: C:\CostSurf


In [37]:
# Step 6: Find corridor rasters created in Step 3

from pathlib import Path

# Expect: base, out_dir from previous cells
try:
    base, out_dir
except NameError:
    raise RuntimeError("Run Step 1 first to set 'base' and 'out_dir'.")

corr_files = sorted(Path(out_dir).glob(f"{base}_corridor_*_*.tif"))
assert corr_files, f"No corridor rasters found in {out_dir} with pattern '{base}_corridor_*_*.tif'."

print("Found corridor rasters:")
for p in corr_files:
    print("  ", p.name)


Found corridor rasters:
   costsurf_SilkRoute_yr_216m_corridor_1_2.tif
   costsurf_SilkRoute_yr_216m_corridor_1_3.tif
   costsurf_SilkRoute_yr_216m_corridor_2_3.tif


In [38]:
# Step 7: ECDF, true percentile-rank (0–100, 0.001 steps) via rasterio

import os, platform, ctypes
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import numpy as np

try:
    import rasterio
except ImportError as e:
    raise RuntimeError("Please install rasterio: pip install rasterio") from e

# corr_files: list[Path]  | out_dir: Path  (expected from previous cells)
IS_WINDOWS = platform.system().lower().startswith("win")

def win_short_path(path_str: str) -> str:
    if not IS_WINDOWS:
        return path_str
    buf = ctypes.create_unicode_buffer(32768)
    ret = ctypes.windll.kernel32.GetShortPathNameW(path_str, buf, len(buf))
    return buf.value if ret else path_str

def _finite(vec: np.ndarray) -> np.ndarray:
    """Return finite-only 1D float64 array (no NaN/inf)."""
    if vec.size == 0:
        return vec
    vec = vec.astype(np.float64, copy=False)
    return vec[np.isfinite(vec)]

def ecdf_percentile_rank_rasterio(in_path: Path, out_path: Path, bins: int = 100_000) -> tuple[bool, str, str]:
    in_p = Path(in_path)
    out_p = Path(out_path)
    out_p.parent.mkdir(parents=True, exist_ok=True)

    try:
        with rasterio.Env():
            with rasterio.open(in_p, 'r') as src:
                profile = src.profile.copy()

                # Pass 0: Global min/max over valid pixels (masked=True)
                vmin, vmax = None, None
                for _, window in src.block_windows(1):
                    ma = src.read(1, window=window, masked=True)  # Mask combines nodata/alpha/overviews
                    if ma.mask.all():
                        continue
                    data = _finite(ma.compressed())
                    if data.size == 0:
                        continue
                    mn, mx = data.min(), data.max()
                    vmin = mn if vmin is None else min(vmin, mn)
                    vmax = mx if vmax is None else max(vmax, mx)

                if vmin is None or vmax is None:
                    return (False, "", f"No valid pixels in {in_p.name}")

                if vmax == vmin:
                    # Constant raster → neutral 50.000 everywhere valid, 0 elsewhere
                    profile.update(dtype="float64", compress="lzw", tiled=True, BIGTIFF="IF_SAFER")
                    with rasterio.open(out_p, 'w', **profile) as dst:
                        for _, window in dst.block_windows(1):
                            ma = src.read(1, window=window, masked=True)
                            tile = np.zeros(ma.shape, dtype=np.float64)
                            valid = ~ma.mask
                            tile[valid] = 50.0
                            dst.write(tile, 1, window=window)
                    return (True, f"Constant → wrote 50.000 to {out_p.name}", "")

                # Pass 1: Global histogram on [vmin, vmax] with 'bins' bins
                edges = np.linspace(vmin, vmax, bins + 1, dtype=np.float64)
                counts = np.zeros(bins, dtype=np.int64)

                for _, window in src.block_windows(1):
                    ma = src.read(1, window=window, masked=True)
                    if ma.mask.all():
                        continue
                    data = _finite(ma.compressed())
                    if data.size == 0:
                        continue
                    # Clip to edges to avoid right-edge spill
                    np.clip(data, vmin, vmax, out=data)
                    hist, _ = np.histogram(data, bins=edges)
                    counts += hist

                total = int(counts.sum())
                if total == 0:
                    return (False, "", f"No valid pixels counted in {in_p.name}")

                cum_counts = np.cumsum(counts, dtype=np.int64)
                # cum_before[k] = number of samples strictly below bin k
                cum_before = np.zeros_like(cum_counts)
                cum_before[1:] = cum_counts[:-1]

                # Pass 2: Map each pixel to percentile with sub-bin jitter
                profile.update(dtype="float64", compress="lzw", tiled=True, BIGTIFF="IF_SAFER", nodata=None)
                with rasterio.open(out_p, 'w', **profile) as dst:
                    for _, window in dst.block_windows(1):
                        ma = src.read(1, window=window, masked=True)
                        h, w = ma.shape
                        tile = np.zeros((h, w), dtype=np.float64)

                        if not ma.mask.all():
                            data = ma.filled(np.nan)  # keep shape; NaNs for masked
                            # Valid mask = not masked and finite
                            valid = (~ma.mask) & np.isfinite(data)
                            if valid.any():
                                vals = data[valid].astype(np.float64, copy=False)
                                np.clip(vals, vmin, vmax, out=vals)

                                # Bin index via searchsorted on edges
                                # side='right' ensures vmax lands in the last bin
                                idx = np.searchsorted(edges, vals, side='right') - 1
                                idx = np.clip(idx, 0, bins - 1)

                                # Deterministic jitter in [0,1)
                                rows = np.arange(window.row_off, window.row_off + h, dtype=np.int64)[:, None]
                                cols = np.arange(window.col_off, window.col_off + w, dtype=np.int64)[None, :]
                                jit = np.sin(rows * 12.9898 + cols * 78.233) * 43758.5453
                                jit = (jit - np.floor(jit))[valid]  # select only valid positions

                                cb = cum_before[idx]
                                cnt = counts[idx]
                                safe_cnt = np.where(cnt > 0, cnt, 1)

                                rank = 100.0 * (cb + jit * safe_cnt) / total
                                tile[valid] = rank

                        dst.write(tile, 1, window=window)

        return (True, f"✓ Wrote {out_p.name}", "")
    except Exception as e:
        return (False, "", str(e))

def normalize_all_to_percentile_rank(corr_files, bins=100_000):
    workers = min(len(corr_files), os.cpu_count() or 1)
    print(f"Generating percentile-rank (0–100) corridor raster(s) with max_workers={workers} ...")
    results = []
    with ThreadPoolExecutor(max_workers=workers) as ex:
        futs = []
        for p in corr_files:
            p = Path(p)
            out_tif = p.with_name(p.stem + "_prank.tif")
            futs.append(ex.submit(ecdf_percentile_rank_rasterio, p, out_tif, bins))
        for f in as_completed(futs):
            success, so, se = f.result()
            results.append((success, so, se))
    ok = sum(1 for s,_,_ in results if s)
    failed = len(results) - ok
    for s, so, se in results:
        if s and so: print(so)
        if not s:
            print("✗ Failed\nSTDOUT:\n" + (so or "") + "\nSTDERR:\n" + (se or "") + "\n")
    print(f"\nPercentile-rank normalisation done. Successful: {ok} | Failed: {failed}")
    print(f"Directory: {out_dir}")

# Run it
normalize_all_to_percentile_rank(corr_files, bins=100_000)


Generating percentile-rank (0–100) corridor raster(s) with max_workers=3 ...
✓ Wrote costsurf_SilkRoute_yr_216m_corridor_1_2_prank.tif
✓ Wrote costsurf_SilkRoute_yr_216m_corridor_1_3_prank.tif
✓ Wrote costsurf_SilkRoute_yr_216m_corridor_2_3_prank.tif

Percentile-rank normalisation done. Successful: 3 | Failed: 0
Directory: C:\CostSurf


In [39]:
# Step 8: Minimum composite over all normalized corridors

import subprocess, platform, ctypes
from pathlib import Path

# Expect GRASS_BIN, base, out_dir
try:
    GRASS_BIN
except NameError:
    raise RuntimeError("GRASS_BIN is not defined.")

IS_WINDOWS = platform.system().lower().startswith("win")

def win_short_path(path_str: str) -> str:
    buf = ctypes.create_unicode_buffer(260)
    ret = ctypes.windll.kernel32.GetShortPathNameW(path_str, buf, 260)
    return buf.value if ret else path_str

def safe_arg_path(p: Path) -> str:
    s = str(p)
    if IS_WINDOWS:
        if any(ch in s for ch in (" ", "(", ")")):
            s = win_short_path(s)
        return s
    else:
        return "'" + s.replace("'", "'\\''") + "'"

# Collect normalized corridors
norm_files = sorted(Path(out_dir).glob(f"{base}_corridor_*_*_prank.tif"))
assert norm_files, f"No normalized corridors found in {out_dir}."

# Build import list and r.series call
imports = []
ras_names = []
for k, tif in enumerate(norm_files, 1):
    nm = f"n{k}"
    imports.append(f"r.in.gdal input={safe_arg_path(tif.resolve())} output={nm} --o")
    ras_names.append(nm)

series_input = ",".join(ras_names)
out_min_tif = Path(out_dir) / f"{base}_corridors_min.tif"

# Prefer EPSG location if available; fallback to XY
try:
    tmp_loc = f"EPSG:{epsg}" if epsg else "XY"
except NameError:
    tmp_loc = "XY"

cmds = []
cmds += imports
cmds.append(f"g.region raster={ras_names[0]}")  # align to first; all share grid if built consistently
cmds.append(f"r.series input={series_input} output=min_corridors method=minimum --o")
cmds.append(
    f"r.out.gdal input=min_corridors output={safe_arg_path(out_min_tif.resolve())} "
    f'format=GTiff type=Float64 createopt="COMPRESS=LZW,TILED=YES,BIGTIFF=IF_SAFER" --o'
)

chain = (" && " if IS_WINDOWS else " ; ").join(cmds)
runner = ["cmd", "/C", chain] if IS_WINDOWS else ["bash", "-lc", chain]

# Execute inside a fresh GRASS tmp-location
proc = subprocess.run(
    [GRASS_BIN, "--tmp-location", tmp_loc, "--exec"] + runner,
    stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)

if proc.returncode == 0 and out_min_tif.exists():
    print(f"✓ Wrote minimum composite: {out_min_tif}")
else:
    print("✗ Failed to build minimum composite.")
    print("STDOUT:\n", proc.stdout)
    print("STDERR:\n", proc.stderr)


✓ Wrote minimum composite: C:\CostSurf\costsurf_SilkRoute_yr_216m_corridors_min.tif
