## LEOHS Apply Harmonization Tool
This script applies harmonization equations to either multiband Landsat images or a directory containing multiband Landsat images.

https://doi.org/10.1080/10106049.2025.2538108

Coded by Galen Richardson with assistance of ChatGPT 5 on on 2/4/2026

In [None]:
#python imports
import re
from pathlib import Path
import numpy as np, rasterio, gc

In [10]:
def detect_conversion_direction(txt):
    first = txt.strip().splitlines()[0]
    return "2L8" if "LS8 =" in first else ("2L7" if "LS7 =" in first else "Unknown")
def parse_harmonization_text(txt):
    bands, slopes, intercepts = [], [], []
    for line in txt.strip().splitlines():
        if ":" not in line or "=" not in line: 
            continue
        band, eq = [p.strip() for p in line.split(":", 1)]
        _, right = [p.strip() for p in eq.split("=", 1)]
        r = right.replace(" ", "")
        m = re.match(r"([+-]?\d*\.?\d+)LS[78]([+-])(\d*\.?\d+)", r)
        if not m: 
            continue
        slope = float(m.group(1))
        intercept = (1 if m.group(2) == "+" else -1) * float(m.group(3))
        bands.append(band); slopes.append(slope); intercepts.append(intercept)
    if not slopes: 
        raise ValueError("No valid harmonization equations parsed.")
    return bands, np.asarray(slopes, np.float32), np.asarray(intercepts, np.float32)
def apply_harmonization_windowed(input_harmonization, input_path, out_nodata=-1.0, out_prefix=None):
    eps_zero = 1e-6
    compress, zstd_level, predictor = "zstd", 9, 2
    in_path = Path(input_path)
    tifs = (sorted(in_path.glob("*.tif")) + sorted(in_path.glob("*.tiff"))) if in_path.is_dir() else [in_path]
    if not tifs: raise FileNotFoundError(f"No .tif/.tiff found at: {in_path}")
    conv = detect_conversion_direction(input_harmonization)
    _, slopes, intercepts = parse_harmonization_text(input_harmonization)
    out_prefix = out_prefix or f"{conv}_"
    print(f"[inputs] {'dir' if in_path.is_dir() else 'file'}: {in_path} | {len(tifs)} tif(s)")
    print(f"[mode] {conv} | equations={len(slopes)} | out_nodata={out_nodata}")
    for fp in tifs:
        fp = Path(fp); out_fp = fp.with_name(out_prefix + fp.name)
        with rasterio.open(fp) as src:
            if src.count != len(slopes):
                raise ValueError(f"Band count mismatch: image has {src.count} bands but equations={len(slopes)}: {fp}")
            nod = src.nodata if src.nodata is not None else out_nodata
            by, bx = src.block_shapes[0]
            prof = src.profile.copy()
            prof.update(dtype="float32", count=src.count, nodata=out_nodata, BIGTIFF="YES",
                        compress=compress, tiled=True, blockxsize=bx, blockysize=by,
                        predictor=predictor, NUM_THREADS="ALL_CPUS")
            if str(compress).lower() == "zstd": prof.update(zstd_level=zstd_level)
            print(f"\n[file] {fp.name} -> {out_fp.name} | {src.width}x{src.height} | src_nod={src.nodata} (use nod={nod})")
            # progress every 10%
            total_blocks = sum(1 for _ in src.block_windows(1))
            step = max(1, total_blocks // 10)
            next_mark, last_pct = step, 0
            tot_px = tot_good = 0
            with rasterio.open(out_fp, "w", **prof) as dst:
                for wi, (_, win) in enumerate(src.block_windows(1), 1):
                    X = src.read(window=win).astype(np.float32, copy=False)   # (b,h,w)
                    b, h, w = X.shape
                    Xf = X.reshape(b, -1).T; del X
                    n = Xf.shape[0]; tot_px += n
                    good = np.isfinite(Xf).all(1) & (Xf != nod).all(1)
                    if eps_zero is not None: good &= ~((np.abs(Xf) <= eps_zero).all(1))
                    ng = int(good.sum()); tot_good += ng
                    out = np.full((b, n), out_nodata, np.float32)
                    if ng:
                        Xg = Xf[good]
                        if float(np.max(Xg)) > 1000: Xg = Xg * 0.0000275 - 0.2
                        Yg = np.clip(Xg * slopes + intercepts, 0.0, 1.0)  # clip ONLY predicted pixels
                        out[:, good] = Yg.T
                        del Xg, Yg
                    dst.write(out.reshape(b, h, w), window=win)
                    del Xf, good, out
                    gc.collect()
                    if wi >= next_mark:
                        pct10 = min(100, int((wi / total_blocks) * 10) * 10)  # 10,20,...,100
                        if pct10 > last_pct:
                            print(f"[progress] {pct10}%  blocks={wi}/{total_blocks}")
                            last_pct = pct10
                        next_mark += step
            print(f"[Done] good={tot_good/tot_px:.2%} wrote={out_fp}")

In [1]:
Landsat_image_or_dir=r'G:\LEOHS_test'
#can be either an image path or dir path. Images need to be 6 band rasters.
input_harmonization="""
B: LS8 = 0.5179LS7 + 0.0088
G: LS8 = 0.6970LS7 + 0.0129
R: LS8 = 0.7799LS7 + 0.0060
NIR: LS8 = 0.9585LS7 + 0.0064
SWIR1: LS8 = 0.8483LS7 + 0.0100
SWIR2: LS8 = 0.8412LS7 + 0.0073

""" #copy equations from LEOHS output textfile into a raw string like this
print(input_harmonization) #showing an example of input_harmonization


B: LS8 = 0.5179LS7 + 0.0088
G: LS8 = 0.6970LS7 + 0.0129
R: LS8 = 0.7799LS7 + 0.0060
NIR: LS8 = 0.9585LS7 + 0.0064
SWIR1: LS8 = 0.8483LS7 + 0.0100
SWIR2: LS8 = 0.8412LS7 + 0.0073




In [None]:
#Code to run this function
apply_harmonization_windowed(input_harmonization, Landsat_image_or_dir)