In [None]:
# --- A) IMPORT LIBRARIES ---
import os
import json
import geopandas as gpd
import pandas as pd
import rasterio
import numpy as np
from rasterio.mask import mask
from rasterio.features import rasterize
from shapely.geometry import mapping

------------------------------------------------

In [None]:
# --- B) IMPORT INPUT DATA ---
'''
add info
'''

# b.1 - Load vector mask (GeoPackage, GeoJSON, or Shapefile)
vector_mask_path = "./vector_pct.geojson"
if vector_mask_path.endswith(".gpkg"):
    vector_mask_layer = "layer_name"  # specify layer name for GPKG
    gdf_mask = gpd.read_file(vector_mask_path, layer=vector_mask_layer)
elif vector_mask_path.endswith(".geojson") or vector_mask_path.endswith(".json"):
    gdf_mask = gpd.read_file(vector_mask_path)
elif vector_mask_path.endswith(".shp"):
    gdf_mask = gpd.read_file(vector_mask_path)
else:
    raise ValueError("Unsupported vector format. Use GPKG, GeoJSON, or Shapefile.")
print("Mask:", gdf_mask)

# b.2 - Covariate layer folders
ucp_folder = "./ucps"
fractions_folder = "./lc_fractions"

# b.3 - Load JSON rules
rules_path = "./operation_rules_C3.json"
with open(rules_path) as f:
    rules = json.load(f)
    print("Rules:", rules)

# b.4 - Set output folder
output_folder = "./output"

------------------------------------------------

In [None]:
# --- C) PARSE VECTOR MASK ATTRIBUTES AND CHECK RULES ---

def check_crs_match(gdf, rules, ucp_folder, fractions_folder):
    """
    Check whether the Coordinate Reference Systems (CRS) of all raster layers 
    match the CRS of the input GeoDataFrame.
    """
    mismatched_layers = []
    for layer_with_ext in rules:
        if layer_with_ext.startswith("F_"):
            raster_path = os.path.join(fractions_folder, layer_with_ext)
        else:
            raster_path = os.path.join(ucp_folder, layer_with_ext)
        try:
            with rasterio.open(raster_path) as src:
                raster_crs = src.crs
                if raster_crs != gdf.crs:
                    mismatched_layers.append((layer_with_ext, str(raster_crs)))
        except Exception as e:
            print(f"Warning: Could not open raster for layer {layer_with_ext}: {e}")

    if mismatched_layers:
        print("Warning: CRS mismatch detected in the following layers:")
        for lyr, crs in mismatched_layers:
            print(f" - {lyr}: CRS = {crs}, differs from vector CRS = {gdf.crs}")
    else:
        print("All raster layers have the same CRS as the vector mask.")


def parse_rules_from_mask(gdf, rules):
    """
    Analyze and validate rule definitions for raster layer processing, 
    based on per-polygon vector attributes in a GeoDataFrame.

    This function evaluates the rules assigned to each raster layer (e.g., MASKING, PCT, NONE),
    determines which processing category applies (C0 to C5), and performs detailed checks 
    when the rule type is "mask", ensuring that each polygon meets the required consistency criteria:
    
    - C0: All layers set to 'none' → No processing required.
    - C1: All layers set to 'mask' → Validate each polygon against:
        - C1.1: All values must be within [0, 1].
        - C1.2: IMD ≥ BSF constraint.
        - C1.3: The sum of all fractional inputs (F_*) must equal 1.
    - C2: All layers set to 'pct' → Apply uniform percentage change.
    - C3: A mix of 'pct' and 'none' → Apply change where defined, treat 'none' as 0%.
    - C4: Invalid combination of 'mask' and 'none' → Raise error.
    - C5: Invalid combination of 'mask' and 'pct' → Raise error.

    Parameters:
        gdf (GeoDataFrame): Input polygons with attributes corresponding to raster layers (e.g., F_AC, IMD, BSF).
        rules (dict): Dictionary mapping raster layer names (with or without .tif) to one of the rule types:
                      {'mask', 'pct', 'none'}.

    Returns:
        tuple:
            - rule_code (str): One of {"C0", "C1", "C2", "C3"} indicating which rule is valid.
            - rule_info (dict): Currently unused but reserved for future metadata about the rule set.

    Raises:
        ValueError: If the rule combination is invalid (C4 or C5), or if polygon-level constraints fail
                    under the 'mask' rule (e.g., values out of bounds, invalid fractions, IMD < BSF).
    """
    normalized_keys = [k.replace(".tif", "") for k in rules]
    fractions_keys = [k for k in normalized_keys if k.startswith("F_")]
    ucp_keys = [k for k in normalized_keys if not k.startswith("F_")]
    rule_types = set(rules.values())

    if rule_types == {"none"}:
        print("Rule C0: All layers set to NONE. No processing will be done.")
        return "C0", {}

    if rule_types == {"mask"}:
        print("Rule C1: All layers set to MASKING. Checking per-polygon constraints...")

        for idx, row in gdf.iterrows():
            print(f"\nChecking Feature ID {idx}")

            # --- Preload fraction_values ---
            fraction_values = {}
            for k in fractions_keys:
                val = row.get(k)
                if pd.notna(val):
                    try:
                        fraction_values[k] = float(val)
                    except (ValueError, TypeError):
                        raise ValueError(f"C1.3 Violation in feature {idx}: '{k}' has non-convertible value: {val}")

            # --- C1.1: All fraction and UCP values must be in [0,1] ---
            ucp_values = {}
            for k in ucp_keys:
                val = row.get(k)
                if pd.notna(val):
                    try:
                        ucp_values[k] = float(val)
                    except (ValueError, TypeError):
                        raise ValueError(f"C1.1 Violation in feature {idx}: '{k}' has non-convertible value: {val}")

            combined_values = {**fraction_values, **ucp_values}
            out_of_bounds = {k: v for k, v in combined_values.items() if not (0.0 <= v <= 1.0)}

            if out_of_bounds:
                print("\nC1.1 Violation: Values outside [0,1] detected.")
                df_show = pd.DataFrame({
                    "Key": list(out_of_bounds.keys()),
                    "Value": list(out_of_bounds.values())
                })
                df_show.insert(0, "Feature ID", idx)
                print(df_show.to_string(index=False))
                raise ValueError(f"C1.1 Violation in feature {idx}: Some values not in [0,1].")

            # --- C1.2: IMD >= BSF is required ---
            imd = row.get("IMD")
            bsf = row.get("BSF")
            print(f"IMD: {imd}, BSF: {bsf}")

            if imd is None or bsf is None:
                raise ValueError(f"C1.2 Violation in feature {idx}: IMD or BSF is missing.")

            try:
                imd_val = float(imd)
                bsf_val = float(bsf)
            except (ValueError, TypeError):
                raise ValueError(f"C1.2 Violation in feature {idx}: IMD or BSF not convertible to float.")

            if imd_val < bsf_val:
                print("C1.2 Violation: IMD < BSF")
                df_show = pd.DataFrame([{
                    "Feature ID": idx,
                    "IMD": imd,
                    "BSF": bsf
                }])
                print(df_show.to_string(index=False))
                raise ValueError(f"C1.2 Violation in feature {idx}: IMD < BSF")

            # --- C1.3: Sum of fraction keys must be 1 ---
            fraction_sum = sum(fraction_values.values())

            df_show = pd.DataFrame({
                "Fraction Key": list(fraction_values.keys()),
                "Value": list(fraction_values.values())
            })
            df_show.insert(0, "Feature ID", idx)
            df_show.loc[len(df_show.index)] = [idx, "SUM", fraction_sum]

            print("Fraction values and sum:")
            print(df_show.to_string(index=False))

            if not np.isclose(fraction_sum, 1.0):
                print("C1.3 Violation: Sum of fractions != 1.0")
                raise ValueError(f"C1.3 Violation in feature {idx}: Sum = {fraction_sum} != 1.0")

        print("\nAll per-polygon constraints (C1.1, C1.2, C1.3) are satisfied.")
        return "C1", {}

    if rule_types == {"pct"}:
        print("Rule C2: All layers set to PCT. Proceeding with PCT logic.")
        return "C2", {}

    if rule_types == {"pct", "none"}:
        print("Rule C3: Layers set to PCT and NONE. Proceeding with PCT logic and treating NONE as 0.")
        return "C3", {}

    if rule_types == {"mask", "none"}:
        print("Rule C4 violation: MASKING + NONE is not allowed.")
        raise ValueError("Rule C4 violation: Please change NONE to MASKING and provide valid attributes.")

    if "mask" in rule_types and "pct" in rule_types:
        print("Rule C5 violation: MASKING + PCT is not allowed.")
        raise ValueError("Rule C5 violation: Please use either MASKING or PCT rules exclusively.")

    raise ValueError("Invalid rule combination detected in rule file.")

In [None]:
# --- C) TEST CHECK RULES
# Check CRS consistency between the vector mask and all covariate rasters
check_crs_match(gdf_mask, rules, ucp_folder, fractions_folder)

# Parse rules and validate them against the vector mask attributes
try:
    rule_id, parsed_info = parse_rules_from_mask(gdf_mask, rules)
except ValueError as e:
    print("Rule validation failed:", e)
    raise

------------------------------------------------

In [None]:
# --- D) APPLY MASK RULE --- TO BE ADJUSTED!!!
def apply_masking(gdf, raster_path, attr_name, output_path):
    """
    Apply masking values from vector attributes to a raster.
    For each polygon in the GeoDataFrame, assign the value of the given attribute
    to the corresponding area in the raster.

    Parameters:
        gdf: GeoDataFrame containing geometries and attribute values.
        raster_path: Input raster to be masked.
        attr_name: Attribute name containing values to assign in the mask.
        output_path: Output raster file.
    """
    #print(f"Applying masking for attribute '{attr_name}' on raster: {os.path.basename(raster_path)}")

    with rasterio.open(raster_path) as src:
        data = src.read(1)
        meta = src.meta.copy()
        nodata_val = src.nodata

        if nodata_val is not None:
            nodata_mask = (data == nodata_val)
            count_nodata = np.count_nonzero(nodata_mask)
            print(f"Detected {count_nodata} NoData pixels in input raster.")
        else:
            print("No explicit NoData value defined in the input raster.")

        for idx, row in gdf.iterrows():
            value = row[attr_name]
            mask_arr = rasterize(
                [(row.geometry, 1)],
                out_shape=src.shape,
                transform=src.transform,
                fill=0,
                dtype=np.uint8
            )

            data = np.where(mask_arr == 1, value, data)

        print(f"Applying masking for attribute '{attr_name}' on raster {os.path.basename(raster_path)} with value {value}")

        with rasterio.open(output_path, 'w', **meta) as dst:
            dst.write(data, 1)

    print(f"Masking complete: output saved to {os.path.basename(output_path)}.")

In [None]:
def apply_mask_rule_all(gdf, rules, ucp_folder, fractions_folder, output_folder):
    os.makedirs(output_folder, exist_ok=True)
    for layer, rule in rules.items():
        if rule != "mask":
            continue

        if layer.endswith(".tif"):
            layer = layer[:-4]

        if layer.startswith("F_"):
            raster_path = os.path.join(fractions_folder, layer + ".tif")
        else:
            raster_path = os.path.join(ucp_folder, layer + ".tif")

        output_path = os.path.join(output_folder, layer + "_mask.tif")
        apply_masking(gdf, raster_path, layer, output_path)

In [None]:
# --- D) TEST MASK RULE 
if rule_id in {"C1"}:
    print(f"Running masking (rule {rule_id})...")
    apply_mask_rule_all(gdf_mask, rules, ucp_folder, fractions_folder, output_folder)
    print("Masking applied. Output files saved in:", output_folder)
else:
    print("Masking not applied:", output_folder)

------------------------------------------------

In [None]:
# --- E) APPLY PCT RULE 
def is_fraction_layer(layer_name):
    """
    Check if a given layer name corresponds to a fraction layer or not (i.e. UCP).
    """
    return layer_name.startswith("F_")


def apply_pct_ucp(gdf, raster_path, attr_name, rule_type, output_path,
                  exceed_handling="clip", zero_handling="raise", zero_value=0.01):
    """
    Apply per-polygon percentage change to UCP raster pixels using vector attributes.
    This function modifies the values of UCP raster layers by applying percentage changes 
    defined in a vector GeoDataFrame. Each polygon in the vector mask carries an attribute 
    that specifies how much to increase or decrease the UCP raster values within its bounds.

    Parameters:
        gdf: GeoDataFrame with per-polygon percentage values.
        raster_path: Input raster (UCP).
        attr_name: Name of the attribute with percentage change.
        rule_type: 'pct' or 'none'.
        output_path: Path to save adjusted raster.
        exceed_handling: How to handle output values outside [0,1]. Options: 
                         'clip', 'normalize', 'ignore'.
        zero_handling: How to handle pixels with value 0. Options:
                       - 'preserve': leave 0s unchanged (default)
                       - 'raise': treat 0s as `zero_value` before applying change
        zero_value: Value to use in place of 0 if `zero_handling='raise'`.
    """
    with rasterio.open(raster_path) as src:
        data = src.read(1).astype(np.float32)
        meta = src.meta.copy()
        nodata_val = src.nodata

        # Preserve NoData value in output metadata
        if nodata_val is not None:
            meta['nodata'] = nodata_val
            nodata_mask = (data == nodata_val)
            count_nodata = np.count_nonzero(nodata_mask)
            if count_nodata > 0:
                print(f"Detected {count_nodata} NoData pixels in input raster: {os.path.basename(raster_path)}.")
        else:
            print("No explicit NoData value defined in the input raster.")
            nodata_mask = None

        for idx, row in gdf.iterrows():
            pct_value = 0.0
            if rule_type == "pct":
                val = row.get(attr_name)
                if val is not None and not np.isnan(val):
                    pct_value = val

            factor = 1 + (pct_value / 100.0)
            print(attr_name, rule_type, pct_value, factor)

            mask_arr = rasterize(
                [(row.geometry, 1)],
                out_shape=src.shape,
                transform=src.transform,
                fill=0,
                dtype=np.uint8
            )

            # Handle zero values before update
            if zero_handling == "raise":
                zero_mask = (data == 0.0) & (mask_arr == 1)
                if nodata_mask is not None:
                    zero_mask = zero_mask & (~nodata_mask)
                data = np.where(zero_mask, zero_value, data)

            # Update only non-nodata pixels within the mask
            if nodata_mask is not None:
                update_mask = (mask_arr == 1) & (~nodata_mask)
            else:
                update_mask = (mask_arr == 1)

            data = np.where(update_mask, data * factor, data)

        # Clip, normalize, or ignore values outside [0,1]
        if nodata_mask is not None:
            valid_data = data[~nodata_mask]
        else:
            valid_data = data

        min_val = np.min(valid_data)
        max_val = np.max(valid_data)

        if max_val > 1.0 or min_val < 0.0:
            print(f"Warning: values outside [0,1] detected in {os.path.basename(output_path)}. "
                  f"Handling method: {exceed_handling}.")

            if exceed_handling == "clip":
                if nodata_mask is not None:
                    data = np.where(~nodata_mask, np.clip(data, 0.0, 1.0), data)
                else:
                    data = np.clip(data, 0.0, 1.0)

            elif exceed_handling == "normalize":
                if max_val != min_val:
                    scaled = (valid_data - min_val) / (max_val - min_val)
                    if nodata_mask is not None:
                        data = np.where(~nodata_mask, scaled, data)
                    else:
                        data = scaled
                else:
                    data[:] = 0.0

            elif exceed_handling == "ignore":
                pass
            else:
                raise ValueError("Invalid exceed_handling value. Use 'clip', 'normalize', or 'ignore'.")

        # Re-apply NoData to output
        if nodata_mask is not None:
            data = np.where(nodata_mask, nodata_val, data)

        with rasterio.open(output_path, 'w', **meta) as dst:
            dst.write(data, 1)

    return data


def apply_pct_all_fractions(gdf, rules, fractions_folder, output_folder):
    os.makedirs(output_folder, exist_ok=True)

    # Filter only fraction layers
    fraction_rules = {k: v for k, v in rules.items() if is_fraction_layer(k.replace(".tif", ""))}
    fraction_layers = list(fraction_rules.keys())
    if not fraction_layers:
        return

    # Load all fraction layers into memory
    fraction_data = {}
    meta = None
    nodata_mask = None

    for fname in fraction_layers:
        path = os.path.join(fractions_folder, fname)
        with rasterio.open(path) as src:
            arr = src.read(1).astype(np.float32)
            if meta is None:
                meta = src.meta.copy()
                if src.nodata is not None:
                    nodata_mask = (arr == src.nodata)
                    meta["nodata"] = src.nodata
                else:
                    nodata_mask = None
                    print(f"No explicit NoData value defined in the input raster: {os.path.basename(path)}")
            fraction_data[fname] = arr

    shape = (meta["height"], meta["width"])
    transform = meta["transform"]

    # Process each polygon
    for _, row in gdf.iterrows():
        geom = row.geometry
        mask_arr = rasterize(
            [(geom, 1)],
            out_shape=shape,
            transform=transform,
            fill=0,
            dtype=np.uint8
        )
        polygon_pixels = mask_arr == 1
        if not np.any(polygon_pixels):
            continue

        # Extract current pixel values for all layers (shape: N_layers x N_pixels)
        pixel_stack = np.stack([fraction_data[fname] for fname in fraction_layers], axis=0)
        polygon_values = pixel_stack[:, polygon_pixels]

        # Apply % change per layer
        for i, fname in enumerate(fraction_layers):
            rule_type = fraction_rules[fname]
            attr_name = fname.replace(".tif", "")
            pct_value = 0.0
            if rule_type == "pct":
                val = row.get(attr_name)
                if val is not None and not np.isnan(val):
                    pct_value = val
            factor = 1 + (pct_value / 100.0)
            print(attr_name, rule_type, pct_value, factor)
            polygon_values[i, :] *= factor

        # Normalize per pixel (column-wise) so fractions sum to 1.0
        sums = np.sum(polygon_values, axis=0)
        sums_safe = np.where(sums == 0, 1.0, sums)
        polygon_values /= sums_safe[np.newaxis, :]

        # Optionally verify pixel sums ≈ 1
        check_sums = np.sum(polygon_values, axis=0)
        print("Pixel sum stats:", np.min(check_sums), np.max(check_sums), np.mean(check_sums))

        # Update modified pixel values back to full arrays
        for i, fname in enumerate(fraction_layers):
            arr = fraction_data[fname]
            arr[polygon_pixels] = polygon_values[i, :]
            fraction_data[fname] = arr

    # Re-apply nodata where needed
    if nodata_mask is not None:
        for fname in fraction_layers:
            arr = fraction_data[fname]
            arr[nodata_mask] = meta["nodata"]
            fraction_data[fname] = arr

    # Save results to output folder
    for fname in fraction_layers:
        out_path = os.path.join(output_folder, fname.replace(".tif", "_pct.tif"))
        with rasterio.open(out_path, 'w', **meta) as dst:
            dst.write(fraction_data[fname], 1)

    print("Finished applying percentage changes and polygon-wise normalization to fraction layers.")

In [None]:
def apply_pct_all(gdf, rules, ucp_folder, fractions_folder, output_folder):
    """
    Apply percentage-based changes to both UCP and fraction raster layers using vector attributes.

    - UCP layers are processed individually using `apply_pct_ucp()`.
    - Fraction layers are processed together in a batch using `apply_pct_all_fractions()`.
    """
    os.makedirs(output_folder, exist_ok=True)

    ucp_rules = {}
    fraction_rules = {}

    for layer_with_ext, rule_type in rules.items():
        if rule_type not in {"pct", "none"}:
            continue

        layer_name = layer_with_ext.replace(".tif", "")
        if is_fraction_layer(layer_name):
            fraction_rules[layer_with_ext] = rule_type
        else:
            ucp_rules[layer_with_ext] = rule_type

    # Process all fraction layers in batch
    if fraction_rules:
        apply_pct_all_fractions(gdf, fraction_rules, fractions_folder, output_folder)

    # Process each UCP layer individually
    for layer_with_ext, rule_type in ucp_rules.items():
        layer_name = layer_with_ext.replace(".tif", "")
        raster_path = os.path.join(ucp_folder, layer_with_ext)
        output_path = os.path.join(output_folder, layer_name + "_pct.tif")

        if not os.path.exists(raster_path):
            print(f"Warning: Missing raster file: {raster_path}")
            continue

        apply_pct_ucp(gdf, raster_path, layer_name, rule_type, output_path)

In [None]:
# --- E) TEST PCT RULE 
if rule_id in {"C2", "C3"}:
    print(f"Running percentage adjustment (rule {rule_id})...")
    
    # Run the unified function that handles both UCP and fraction layers
    apply_pct_all(gdf_mask, rules, ucp_folder, fractions_folder, output_folder)
    
    print("Percentage adjustment applied. Output files saved in:", output_folder)
else:
    print("Percentage adjustment not applied:", output_folder)

---------------------------------------------------------

In [None]:
# --- E) TEST PCT RULE C3.1
'''
def check_imd_bsf_consistency(output_folder, imd_filename="IMD_pct.tif", bsf_filename="BSF_pct.tif"):
    """
    Check whether IMD >= BSF for each pixel in the output rasters.
    
    Parameters:
        output_folder: Folder where adjusted raster outputs are saved.
        imd_filename: Filename of the adjusted IMD raster.
        bsf_filename: Filename of the adjusted BSF raster.
    """
    imd_path = os.path.join(output_folder, imd_filename)
    bsf_path = os.path.join(output_folder, bsf_filename)

    if not os.path.exists(imd_path) or not os.path.exists(bsf_path):
        print(f"Warning: Cannot perform IMD ≥ BSF check because one or both files are missing.")
        return

    with rasterio.open(imd_path) as src_imd, rasterio.open(bsf_path) as src_bsf:
        imd = src_imd.read(1)
        bsf = src_bsf.read(1)

        if imd.shape != bsf.shape:
            print("Warning: IMD and BSF rasters have different shapes. Cannot compare.")
            return

        invalid = (imd < bsf) & (~np.isnan(imd)) & (~np.isnan(bsf))
        if np.any(invalid):
            n_invalid = np.count_nonzero(invalid)
            total = imd.size
            pct = (n_invalid / total) * 100
            print(f"Warning: {n_invalid} pixels ({pct:.2f}%) have IMD < BSF. "
                  "Consider adjusting the 'pct' attributes in the vector mask.")
        else:
            print("Check passed: All pixels satisfy IMD ≥ BSF.")
'''

In [None]:
# --- E) Test new IMD and BSF consistency
# check_imd_bsf_consistency(output_folder)

Notes for PCT:

- Think about how to deal with 0 data -> **see "zero_handling" option**
- Test if normalisation [0-1] for each UCP layer is a good proxy to obtain a proportional increase of the UCP that is wanted to be modified by having a final UCP raster still in the range [0-1]? Maybe not, because this changes the whole range of values in the raster -> **Consider only "clip" option for the UCPs**
- Check consistency in IMD and BSF -> **Creates a separate functions "check_imd_bsf_consistency" for checking**
- Think about how to deal with no-data -> **Added additional masking in bot ucp and fractions processing**
- Test if a second normalisation [0-1] per pixel fractions array (other than a normalisation on each single raster) on the modified fractions raster is sufficient to preserve proportion by obeying the rules that sum per pixel = 1 and fractions value >=0. Maybe it's better first to normalise per pixel and then apply the second normalisation to each final raster -> **Applied a single normalization within each pixel in the mask as proportional scaling Fx/sum(all_F)**