In [None]:
# Step 1: 读取 TIF 并报告 nodata 与有效数值范围
import os
import glob
import numpy as np
import rasterio
from rasterio.errors import RasterioIOError
from tqdm import tqdm

def report_tifs(folder="Factor_aligned", pattern="*.tif"):
    tif_paths = sorted(glob.glob(os.path.join(folder, pattern)))
    if not tif_paths:
        raise FileNotFoundError(f"No tif files found in folder: {folder}")

    print(f"Found {len(tif_paths)} files in '{folder}'.\n")

    for p in tqdm(tif_paths, desc="Processing files", unit="file"):
        try:
            with rasterio.open(p) as src:
                # 读取第一波段（若为多波段，根据需要修改）
                data = src.read(1, masked=True)  # 返回 numpy.ma.MaskedArray（若存在 nodata 则自动掩膜）
                nodata_from_meta = src.nodata
                dtype = src.dtypes[0] if src.count >= 1 else src.dtypes
                total_pixels = data.size

                # 确定 nodata 情况
                # rasterio 在 read(..., masked=True) 时，会把 nodata 与有效掩膜合并为 mask
                if isinstance(data, np.ma.MaskedArray):
                    nodata_mask = data.mask  # True 表示被掩膜（nodata 或 invalid）
                    nodata_count = int(nodata_mask.sum())
                    valid_arr = data.compressed()  # 返回去除掩膜后的 1D 有效值
                else:
                    # 若不是 MaskedArray，则检查 NaN
                    nan_mask = np.isnan(data)
                    nodata_count = int(nan_mask.sum())
                    valid_arr = data[~nan_mask].ravel()

                # 当 metadata 没有 nodata 时，提示并继续
                if nodata_from_meta is None:
                    meta_note = "nodata (metadata): None"
                    # 若没有掩膜且没有 NaN，说明文件元数据/像元均无显式 nodata
                    if nodata_count == 0:
                        inferred_note = "inferred nodata: none detected"
                    else:
                        inferred_note = f"inferred nodata pixels (from mask/NaN): {nodata_count}"
                else:
                    meta_note = f"nodata (metadata): {nodata_from_meta}"
                    inferred_note = f"masked/NaN pixels: {nodata_count}"

                # 计算有效值范围
                if valid_arr.size > 0:
                    vmin = float(np.nanmin(valid_arr))
                    vmax = float(np.nanmax(valid_arr))
                    mean = float(np.nanmean(valid_arr))
                    median = float(np.nanmedian(valid_arr))
                    valid_count = valid_arr.size
                    valid_pct = valid_count / total_pixels * 100.0
                else:
                    vmin = vmax = mean = median = None
                    valid_count = 0
                    valid_pct = 0.0

                # 输出摘要
                print(f"\nFile: {os.path.basename(p)}")
                print(f"  dtype: {dtype}, shape: {src.height}x{src.width} ({total_pixels} px)")
                print(f"  {meta_note}; {inferred_note}.")
                print(f"  Valid pixels: {valid_count} ({valid_pct:.2f}%)")
                if vmin is not None:
                    print(f"  Valid value range: min = {vmin}, max = {vmax}")
                    print(f"  Stats (valid): mean = {mean:.4f}, median = {median:.4f}")
                else:
                    print("  No valid pixels found (all masked or nodata).")

        except RasterioIOError as e:
            print(f"\nFile: {os.path.basename(p)} - ERROR opening file: {e}")
        except Exception as e:
            print(f"\nFile: {os.path.basename(p)} - Unexpected error: {e}")

if __name__ == "__main__":
    # 若需在不同路径运行，请修改 folder 参数
    report_tifs(folder="Factor_aligned")


In [None]:
import os
import rasterio
import numpy as np
import pandas as pd

input_dir = "Factor_aligned"
output_dir = "Scaled_1_100"
os.makedirs(output_dir, exist_ok=True)

NODATA = -32768

# 模糊匹配关键词：文件名中出现这些关键词即可触发方法
method_map = {
    "E1_DEM": ("log_p2p98", ["DEM"]),
    "E2_Slope": ("p2p98", ["Slope"]),
    "E3_Orientation": ("minmax", ["Orientation"]),
    "E4_NDVI": ("minmax", ["NDVI"]),
    "E5_RHU": ("p2p98", ["RHU"]),
    "E6_SSD": ("p2p98", ["SSD"]),
    "E8_WIN": ("p2p98", ["WIN"]),
    "E10_LST": ("minmax", ["LST"]),

    "S1_PublicService": ("log_p2p98", ["PublicService"]),
    "S2_DistanceFromUrbanArea": ("log_p2p98", ["Distance", "UrbanArea"]),
    "S4_Supply": ("p2p98", ["Supply"]),
    "S9_NighttimeLights": ("log_p2p98", ["Light", "Night", "VIIRS"]),

    "LULC_2020": ("skip", ["LULC"])
}

# 查找 factor key
def find_factor_key(filename):
    fname = filename.lower()
    for key, (method, keywords) in method_map.items():
        for kw in keywords:
            if kw.lower() in fname:
                return key, method
    return None, None


def scale_minmax(arr, vmin, vmax):
    return (arr - vmin) / (vmax - vmin) * 99 + 1


def robust_p2p98(arr):
    return np.nanpercentile(arr, 2), np.nanpercentile(arr, 98)


def log1p_safe(arr):
    arr2 = arr.copy()
    arr2[arr2 < 0] = 0
    return np.log1p(arr2)


records = []

for fname in os.listdir(input_dir):

    if not fname.endswith(".tif"):
        continue

    key, method = find_factor_key(fname)
    if key is None:
        print(f"[Skip] {fname}: method not defined by fuzzy match.")
        continue

    print(f"\n---- Processing {fname} | Key={key} | Method={method} ----")

    # 分类数据跳过
    if method == "skip":
        print("Classification raster, skipping scaling.")
        continue

    fpath = os.path.join(input_dir, fname)

    # 读取数据
    with rasterio.open(fpath) as src:
        arr = src.read(1).astype(float)
        mask = (arr == src.nodata) if src.nodata is not None else np.isnan(arr)
        arr[mask] = np.nan

    # 应用不同缩放方式
    if method == "minmax":
        vmin, vmax = np.nanmin(arr), np.nanmax(arr)
        scaled = scale_minmax(arr, vmin, vmax)

    elif method == "p2p98":
        p2, p98 = robust_p2p98(arr)
        clipped = np.clip(arr, p2, p98)
        scaled = scale_minmax(clipped, p2, p98)
        vmin, vmax = p2, p98

    elif method == "log_p2p98":
        arr2 = log1p_safe(arr)
        p2, p98 = robust_p2p98(arr2)
        clipped = np.clip(arr2, p2, p98)
        scaled = scale_minmax(clipped, p2, p98)
        vmin, vmax = p2, p98

    else:
        raise ValueError(f"Unknown method: {method}")

    # 清理 NaN，转 int16
    scaled[np.isnan(scaled)] = np.nan
    scaled_int = scaled.astype(np.float32)   # 先浮点避免溢出
    scaled_int[np.isnan(scaled_int)] = NODATA
    scaled_int = scaled_int.astype(np.int16)

    outpath = os.path.join(output_dir, fname)

    # 写出文件
    with rasterio.open(fpath) as src:
        profile = src.profile
        profile.update(dtype=rasterio.int16, nodata=NODATA)

        with rasterio.open(outpath, "w", **profile) as dst:
            dst.write(scaled_int, 1)

    # 记录映射信息
    records.append({
        "factor_key": key,
        "file": fname,
        "method": method,
        "value_min_used": float(vmin),
        "value_max_used": float(vmax),
        "output_file": outpath,
        "nodata": NODATA
    })

# 保存映射表
df = pd.DataFrame(records)
df.to_csv("scaling_mapping_table.csv", index=False)

print("\nAll done. Mapping table saved: scaling_mapping_table.csv")


In [None]:
import os
import math
import numpy as np
import pandas as pd

# 输入文件（Excel）与映射表（CSV）
mapping_csv = "scaling_mapping_table.csv"  # 之前输出的映射表
excel_files = [
    "Expansion_RF_shap_values.xlsx",
    "Shrink_RF_shap_values.xlsx"
]

# 若映射表使用不同名称，请修改上述 mapping_csv
if not os.path.exists(mapping_csv):
    raise FileNotFoundError(f"Mapping file not found: {mapping_csv}")

# 读取映射表：期望列包含 factor_key, method, value_min_used, value_max_used
mapping_df = pd.read_csv(mapping_csv, dtype={"factor_key": str})
# 确保列名兼容（兼容不同导出名）
# 支持的列名： factor_key / factor / method / min_used / max_used / value_min_used / value_max_used
def _col(df, candidates):
    for c in candidates:
        if c in df.columns:
            return c
    return None

col_key = _col(mapping_df, ["factor_key", "factor", "Factor"])
col_method = _col(mapping_df, ["method", "Method"])
col_min = _col(mapping_df, ["value_min_used", "min_used", "orig_min", "min_used"])
col_max = _col(mapping_df, ["value_max_used", "max_used", "orig_max", "max_used"])

if not all([col_key, col_method, col_min, col_max]):
    raise ValueError("mapping CSV missing required columns. Found columns: " + ", ".join(mapping_df.columns))

# 建立查找字典：key -> {method, vmin, vmax}
mapping = {}
for _, row in mapping_df.iterrows():
    key = str(row[col_key]).strip()
    method = str(row[col_method]).strip()
    try:
        vmin = float(row[col_min])
        vmax = float(row[col_max])
    except Exception:
        vmin, vmax = np.nan, np.nan
    mapping[key] = {"method": method, "vmin": vmin, "vmax": vmax}

# 模糊匹配：根据列名中包含的关键字匹配到 mapping 的 key
def find_mapping_for_col(colname):
    name = colname.lower()
    # 直接按常见 token 顺序匹配
    for key in mapping.keys():
        k = key.lower()
        if k in name:
            return key, mapping[key]
    # 其次按部分 token（拆 key）
    for key in mapping.keys():
        kparts = [p for p in key.lower().split("_") if p]
        for p in kparts:
            if p and p in name:
                return key, mapping[key]
    return None, None

# 缩放函数：输入原始值和映射参数，输出 1-100 的整数或 NaN
def map_value_to_1_100(x, method, vmin, vmax):
    if pd.isna(x):
        return np.nan
    try:
        xv = float(x)
    except Exception:
        return np.nan

    # skip / categorical
    if method.lower() in ("skip", "categorical"):
        return xv  # 不改变分类值（LULC）
    # log_p2p98
    if method.lower() in ("log_p2p98", "log-p2p98", "log_p2-p98", "log1p_p2p98", "log1p->p2p98"):
        # 先处理负值：与栅格一致，负值设为 0
        xv2 = max(xv, 0.0)
        xv2 = math.log1p(xv2)
        if math.isnan(vmin) or math.isnan(vmax) or vmax == vmin:
            return np.nan
        # 裁剪后线性到 1-100
        xv2 = min(max(xv2, vmin), vmax)
        scaled = (xv2 - vmin) / (vmax - vmin) * 99.0 + 1.0
        # 截断到 [1,100] 并取整
        scaled = min(max(scaled, 1.0), 100.0)
        return int(round(scaled))

    # p2p98 (已在 mapping 表中给出 vmin/vmax 为 p2/p98)
    if method.lower() in ("p2p98", "p2-p98", "p2_p98"):
        if math.isnan(vmin) or math.isnan(vmax) or vmax == vmin:
            return np.nan
        xv2 = min(max(xv, vmin), vmax)
        scaled = (xv2 - vmin) / (vmax - vmin) * 99.0 + 1.0
        scaled = min(max(scaled, 1.0), 100.0)
        return int(round(scaled))

    # plain minmax
    if method.lower() in ("minmax", "min_max", "min-max"):
        if math.isnan(vmin) or math.isnan(vmax) or vmax == vmin:
            return np.nan
        xv2 = min(max(xv, vmin), vmax)
        scaled = (xv2 - vmin) / (vmax - vmin) * 99.0 + 1.0
        scaled = min(max(scaled, 1.0), 100.0)
        return int(round(scaled))

    # fallback: try minmax
    if math.isnan(vmin) or math.isnan(vmax) or vmax == vmin:
        return np.nan
    xv2 = min(max(xv, vmin), vmax)
    scaled = (xv2 - vmin) / (vmax - vmin) * 99.0 + 1.0
    scaled = min(max(scaled, 1.0), 100.0)
    return int(round(scaled))

# 主循环：处理每个 Excel，按列映射 Feature_* 到 Feature_*_mapped
for excel in excel_files:
    if not os.path.exists(excel):
        print(f"[Warning] file not found: {excel}; skip.")
        continue

    print(f"\nProcessing Excel: {excel}")
    df = pd.read_excel(excel)

    out_df = df.copy()

    # 查找所有以 Feature_ 开头的列
    feature_cols = [c for c in df.columns if str(c).startswith("Feature_")]

    if not feature_cols:
        print("  No Feature_ columns found. Skipping file.")
        continue

    for col in feature_cols:
        key, mapinfo = find_mapping_for_col(col)
        if key is None:
            print(f"  [Skip] Feature column '{col}' has no mapping key found. Leave unchanged.")
            continue

        method = mapinfo["method"]
        vmin = mapinfo["vmin"]
        vmax = mapinfo["vmax"]

        # 生成新列名
        newcol = f"{col}_mapped"
        # 应用映射（逐元素）
        out_df[newcol] = df[col].apply(lambda z: map_value_to_1_100(z, method, vmin, vmax))

        print(f"  Mapped {col} -> {newcol} using key='{key}', method='{method}', vmin={vmin}, vmax={vmax}")

    # 输出文件
    outname = os.path.splitext(excel)[0] + "_mapped.xlsx"
    out_df.to_excel(outname, index=False)
    print(f"Saved mapped Excel: {outname}")

print("\nAll Excel files processed.")


In [None]:
import os
import pandas as pd
import numpy as np

# 输入 Excel 文件列表
excel_files = [
    "Expansion_RF_shap_values.xlsx",
    "Shrink_RF_shap_values.xlsx"
]

for excel in excel_files:
    if not os.path.exists(excel):
        print(f"[Warning] file not found: {excel}; skip.")
        continue

    print(f"\nProcessing SHAP values in: {excel}")
    df = pd.read_excel(excel)

    # 查找所有 SHAP_ 开头的列
    shap_cols = [c for c in df.columns if str(c).startswith("SHAP_")]
    if not shap_cols:
        print("  No SHAP_ columns found. Skipping file.")
        continue

    # 统计 SHAP 值分布
    all_shap_values = df[shap_cols].values.flatten()
    all_shap_values = all_shap_values[~np.isnan(all_shap_values)]  # 去掉 NaN

    print("  SHAP value distribution (before processing):")
    print(f"    Count: {len(all_shap_values)}")
    print(f"    Min: {np.min(all_shap_values):.6f}")
    print(f"    Max: {np.max(all_shap_values):.6f}")
    print(f"    Mean: {np.mean(all_shap_values):.6f}")
    print(f"    Median: {np.median(all_shap_values):.6f}")
    print(f"    25%: {np.percentile(all_shap_values, 25):.6f}")
    print(f"    75%: {np.percentile(all_shap_values, 75):.6f}")

    # 处理 SHAP 值：负值置为 0
    for col in shap_cols:
        newcol = f"{col}_processed"
        df[newcol] = df[col].apply(lambda x: max(x, 0) if not pd.isna(x) else np.nan)

    # 保存新 Excel
    outname = os.path.splitext(excel)[0] + "_shap_processed.xlsx"
    df.to_excel(outname, index=False)
    print(f"  Saved processed SHAP values to: {outname}")

print("\nAll SHAP values processed.")


In [None]:
import os
import pandas as pd
import numpy as np

# 输入 Excel 文件列表
excel_files = [
    "Expansion_RF_shap_values.xlsx",
    "Shrink_RF_shap_values.xlsx"
]

# 特征映射 1–100（百分位线性映射，去掉极值2-98%）
def feature_map_1_100(series):
    # 去掉 NaN
    s = series.dropna()
    p2, p98 = np.percentile(s, [2, 98])
    s_clipped = series.clip(lower=p2, upper=p98)
    # 线性映射 1-100
    scaled = ((s_clipped - p2) / (p98 - p2) * 99 + 1).round()
    return scaled.astype(int)

# SHAP 映射 0–255
def shap_map_0_255(series):
    series_proc = series.copy()
    series_proc[series_proc < 0] = 0
    pos = series_proc[series_proc > 0]
    if len(pos) == 0:
        return pd.Series([0]*len(series_proc), index=series_proc.index)
    ranks = pos.rank(method='min')
    scaled = ((ranks - 1) / (len(pos) - 1) * 254 + 1).round()
    series_mapped = pd.Series(0, index=series_proc.index)
    series_mapped.loc[pos.index] = scaled.astype(int)
    return series_mapped

for excel in excel_files:
    if not os.path.exists(excel):
        print(f"[Warning] file not found: {excel}; skip.")
        continue

    print(f"\nProcessing file: {excel}")
    df = pd.read_excel(excel)

    # 找到 Feature_ 和 SHAP_ 列
    feature_cols = [c for c in df.columns if str(c).startswith("Feature_")]
    shap_cols = [c for c in df.columns if str(c).startswith("SHAP_")]

    # 生成 Feature 1–100 列
    for f in feature_cols:
        score_col = f"{f}_score"
        df[score_col] = feature_map_1_100(df[f])
        print(f"  Feature mapped: {f} -> {score_col}")

    # 生成 SHAP 0–255 列
    for s in shap_cols:
        score_col = f"{s}_score"
        df[score_col] = shap_map_0_255(df[s])
        print(f"  SHAP mapped: {s} -> {score_col}")

    # 输出 Excel：只保留原始 Feature、Feature 1–100、SHAP 0–255
    selected_cols = []
    for f in feature_cols:
        selected_cols.append(f)
        selected_cols.append(f"{f}_score")
    selected_cols += [f"{s}_score" for s in shap_cols]

    df_out = df[selected_cols]
    outname = os.path.splitext(excel)[0] + "_processed.xlsx"
    df_out.to_excel(outname, index=False)
    print(f"  Saved processed file to: {outname}")

print("\nAll files processed successfully.")


In [None]:
import pandas as pd
import numpy as np
import os
from scipy.interpolate import interp1d

# 文件列表
excel_files = {
    "Expansion": "SHAP赋分结果_Expan.xlsx",
    "Shrink": "SHAP赋分结果_Shrink.xlsx"
}

for mode, excel in excel_files.items():
    if not os.path.exists(excel):
        print(f"[Warning] file not found: {excel}; skip {mode}.")
        continue

    print(f"\nProcessing {mode} SHAP mapping table from: {excel}")
    df = pd.read_excel(excel)

    # 去除列名空格
    df.columns = df.columns.str.strip()

    # Feature 和 SHAP 列
    feature_cols = [c for c in df.columns if c.startswith("Feature_") and not c.endswith("_score") == False]
    feature_score_cols = [c for c in df.columns if c.startswith("Feature_") and c.endswith("_score")]
    shap_score_cols = [c for c in df.columns if c.startswith("SHAP_") and c.endswith("_score")]

    mapping_records = []

    for f_feat, f_feat_score, f_shap_score in zip(feature_cols, feature_score_cols, shap_score_cols):
        # 特殊处理土地利用
        if f_feat == "Feature_LULC_2020":
            scores = np.arange(1, 14)
            shap_values = []
            remarks = []
            for s in scores:
                mask = df[f_feat] == s
                if mask.sum() > 0:
                    shap_values.append(round(df.loc[mask, f_shap_score].mean(),2))
                    remarks.append("")
                else:
                    shap_values.append(np.nan)  # 用插值后填充
                    remarks.append("插值")
            # 插值
            nan_idx = [i for i,v in enumerate(shap_values) if np.isnan(v)]
            if nan_idx:
                x = np.array([i for i,v in enumerate(shap_values) if not np.isnan(v)])
                y = np.array([v for v in shap_values if not np.isnan(v)])
                f_interp = interp1d(x, y, kind='linear', fill_value='extrapolate')
                for idx in nan_idx:
                    shap_values[idx] = round(float(f_interp(idx)),2)
        else:
            # 正常特征 1-100
            scores = np.arange(1,101)
            temp = df[[f_feat_score, f_shap_score]].copy()
            group = temp.groupby(f_feat_score, as_index=False).agg({f_shap_score:'mean'})
            # 构建插值函数
            interp_func = interp1d(group[f_feat_score], group[f_shap_score], kind='linear', bounds_error=False, fill_value='extrapolate')
            shap_values = [round(float(interp_func(s)),2) for s in scores]
            # 备注
            existing_scores = set(group[f_feat_score].values)
            remarks = ["插值" if s not in existing_scores else "" for s in scores]

        # 保存记录
        for s, v, r in zip(scores, shap_values, remarks):
            mapping_records.append({
                "Feature": f_feat,
                "Score": int(s),
                "SHAP_mean": v,
                "Remark": r
            })

    # 保存 Excel
    df_mapping = pd.DataFrame(mapping_records)
    outname = f"{mode}_FeatureScore_SHAP_mapping.xlsx"
    df_mapping.to_excel(outname, index=False)
    print(f"  Saved {mode} mapping table to: {outname}")

print("\nAll mapping tables generated successfully.")


In [None]:
import pandas as pd
import numpy as np
from scipy.interpolate import interp1d
import os

# 文件名
excel_files = {
    "Expansion": "SHAP赋分结果_Expan.xlsx",
    "Shrink": "SHAP赋分结果_Shrink.xlsx"
}

for mode, excel in excel_files.items():
    if not os.path.exists(excel):
        print(f"[Warning] file not found: {excel}; skip {mode}.")
        continue

    print(f"\nProcessing LULC SHAP mapping for: {mode}")
    df = pd.read_excel(excel)
    df.columns = df.columns.str.strip()

    f_feat = "Feature_LULC_2020"
    f_shap_score = "SHAP_LULC_2020_score"

    # 1-13 类
    scores = np.arange(1,14)
    shap_values = []
    remarks = []

    for s in scores:
        mask = df[f_feat] == s
        if mask.sum() > 0:
            shap_values.append(round(df.loc[mask, f_shap_score].mean(),2))
            remarks.append("")
        else:
            shap_values.append(np.nan)
            remarks.append("插值")

    # 插值处理
    nan_idx = [i for i,v in enumerate(shap_values) if np.isnan(v)]
    if nan_idx:
        x = np.array([i for i,v in enumerate(shap_values) if not np.isnan(v)])
        y = np.array([v for v in shap_values if not np.isnan(v)])
        f_interp = interp1d(x, y, kind='linear', fill_value='extrapolate')
        for idx in nan_idx:
            shap_values[idx] = round(float(f_interp(idx)),2)

    # 保存记录
    mapping_records = []
    for s, v, r in zip(scores, shap_values, remarks):
        mapping_records.append({
            "Feature": f_feat,
            "Score": int(s),
            "SHAP_mean": v,
            "Remark": r
        })

    df_lulc_mapping = pd.DataFrame(mapping_records)

    # 如果之前已经生成了映射表，则追加
    mapping_file = f"{mode}_FeatureScore_SHAP_mapping.xlsx"
    if os.path.exists(mapping_file):
        df_existing = pd.read_excel(mapping_file)
        df_final = pd.concat([df_existing, df_lulc_mapping], ignore_index=True)
    else:
        df_final = df_lulc_mapping

    outname = f"{mode}_FeatureScore_SHAP_mapping_with_LULC.xlsx"
    df_final.to_excel(outname, index=False)
    print(f"  Saved {mode} mapping table with LULC to: {outname}")

print("\nAll LULC SHAP mapping tables generated successfully.")
