In [1]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.neighbors import KernelDensity
import re
import warnings

# =============================
# User-config (single run)
# =============================
# 1) Set your input CSV path (single file)
INPUT_CSV = r"D:\\path to your csv\\your csv.csv"  # <-- SET ME

# 2) Choose a single VI and a single fit type
VI_TYPE   = "NDVI"        # e.g., "NDVI", "SAVI", "SAVI_L050", "MSAVI"
FIT_TYPE  = "linear"      # one of: 'linear', 'polynomial', 'exponential'

# 3) Output directories
EXPORT_FOLDER        = r"your save path"
OUTPUT_FIGURE_FOLDER = r"your save path"
os.makedirs(EXPORT_FOLDER, exist_ok=True)
os.makedirs(OUTPUT_FIGURE_FOLDER, exist_ok=True)

# ------------------------------
# Utility / IO
# ------------------------------
def load_data(file_path, vi_type='NDVI'):
    """
    Load CSV and ensure numeric VI/STR; keep rows with VI > 0.
    """
    df = pd.read_csv(file_path)
    df[vi_type] = pd.to_numeric(df[vi_type], errors='coerce')
    df['STR'] = pd.to_numeric(df['STR'], errors='coerce')
    df = df[df[vi_type] > 0]
    df = df.dropna(subset=[vi_type, 'STR'])
    return df

def pretty_vi_label(vi_type: str) -> str:
    """
    Return a display-friendly label for the X-axis.
    - For SAVI and its variants (e.g., SAVI_L050/SAVI_L025/SAVI_L100), return 'SAVI'.
    - Otherwise, return the original vi_type.
    """
    vt = str(vi_type).upper()
    # Match 'SAVI' or 'SAVI_*' but NOT 'MSAVI'
    if re.match(r'^SAVI(?:$|_)', vt):
        return 'SAVI'
    return vi_type

# ------------------------------
# Boundary detection (Hardened)
# ------------------------------
def find_boundaries(
    df, vi_steps_dry, vi_steps_wet, 
    window_size=15, k=0.1, alpha=2, 
    min_threshold_abs=0.05,      # absolute floor for threshold
    q_iqr=0.2,                   # IQR component weight
    vi_threshold=0.1, alpha_low_vi=0.5, 
    vi_type='NDVI'
):
    import numpy as np
    import pandas as pd

    lower_boundary, upper_boundary = [], []


    df_local = df[[vi_type, 'STR']].copy()
    df_local[vi_type] = pd.to_numeric(df_local[vi_type], errors='coerce')
    df_local['STR'] = pd.to_numeric(df_local['STR'], errors='coerce')
    df_local = df_local.replace([np.inf, -np.inf], np.nan).dropna()

    if df_local.empty:
        return (pd.DataFrame(columns=['VI', 'STR_dry']),
                pd.DataFrame(columns=['VI', 'STR_wet']))


    str_p99 = df_local['STR'].quantile(0.99)

    def remove_outliers(s):
        if s.empty:
            return s
        Q1 = s.quantile(0.25)
        Q3 = s.quantile(0.75)
        IQR = Q3 - Q1
        lb = Q1 - 3 * IQR
        ub = Q3 + 3 * IQR
        return s[(s >= lb) & (s <= ub)]

    def filter_str_values(s, vi):
        if vi < vi_threshold:
            s = s[s <= str_p99]
        return s

    def compute_threshold(s_sorted, vi):
        vals = np.asarray(s_sorted, dtype=float)
        vals = vals[np.isfinite(vals)]
        if vals.size == 0:
            return min_threshold_abs
        std = np.nanstd(vals, ddof=1)
        q75 = np.nanpercentile(vals, 75)
        q25 = np.nanpercentile(vals, 25)
        iqr = q75 - q25
        current_alpha = alpha_low_vi if vi < vi_threshold else alpha
        base = (k * std * (1 + current_alpha * vi)) if np.isfinite(std) else 0.0
        adapt = (q_iqr * iqr) if np.isfinite(iqr) else 0.0
        return max(base, adapt, min_threshold_abs)


    for vi in vi_steps_dry:
        mask = df_local[vi_type].between(vi, vi + 0.01, inclusive='left')  
        s = df_local.loc[mask, 'STR']
        s = filter_str_values(remove_outliers(s), vi)
        if len(s) >= window_size:
            s_sorted = s.sort_values()
            thr = compute_threshold(s_sorted, vi)
            for i in range(len(s_sorted) - window_size + 1):
                w = s_sorted.iloc[i:i + window_size]
                if (w.max() - w.min() < thr) and w.diff().dropna().ge(0).all():
                    lower_boundary.append((vi, float(w.iloc[0])))
                    break

    for vi in vi_steps_wet:
        mask = df_local[vi_type].between(vi, vi + 0.01, inclusive='left')
        s = df_local.loc[mask, 'STR']
        s = filter_str_values(remove_outliers(s), vi)
        if len(s) >= window_size:
            s_sorted_desc = s.sort_values(ascending=False)
            thr = compute_threshold(s_sorted_desc, vi)
            for i in range(len(s_sorted_desc) - window_size + 1):
                w = s_sorted_desc.iloc[i:i + window_size]
                if (w.max() - w.min() < thr) and w.diff().dropna().le(0).all():
                    upper_boundary.append((vi, float(w.iloc[0])))
                    break

    lower_boundary_df = pd.DataFrame(lower_boundary, columns=['VI', 'STR_dry'])
    upper_boundary_df = pd.DataFrame(upper_boundary, columns=['VI', 'STR_wet'])

    if not lower_boundary_df.empty:
        lower_boundary_df = (
            lower_boundary_df.replace([np.inf, -np.inf], np.nan)
                             .dropna()
                             .astype(float)
                             .sort_values('VI')
        )
    if not upper_boundary_df.empty:
        upper_boundary_df = (
            upper_boundary_df.replace([np.inf, -np.inf], np.nan)
                             .dropna()
                             .astype(float)
                             .sort_values('VI')
        )
        if upper_boundary_df['STR_wet'].std(ddof=0) > 0:
            z = (upper_boundary_df['STR_wet'] - upper_boundary_df['STR_wet'].mean()) / upper_boundary_df['STR_wet'].std(ddof=0)
            upper_boundary_df = upper_boundary_df[np.abs(z) < 2.5]

    return lower_boundary_df, upper_boundary_df


def save_fit_statistics(fit_type, fit_dry, fit_wet, rmse_dry, rmse_wet, r2_dry, r2_wet, output_file_path):
    os.makedirs(os.path.dirname(output_file_path), exist_ok=True)
    with open(output_file_path, 'w', encoding='utf-8') as f:
        f.write(f"Fit Type: {fit_type}\n\n")
        f.write(f"Dry Edge Fit:\n  Equation: {fit_dry}\n  RMSE: {rmse_dry:.4f}\n  R²: {r2_dry:.4f}\n\n")
        f.write(f"Wet Edge Fit:\n  Equation: {fit_wet}\n  RMSE: {rmse_wet:.4f}\n  R²: {r2_wet:.4f}\n")

def exponential_func(x, a, b, c):
    return a * np.exp(b * x) + c


def fit_edges(lower_boundary_df, upper_boundary_df, fit_type):

    if lower_boundary_df is None or upper_boundary_df is None:
        return None
    if lower_boundary_df.empty or upper_boundary_df.empty:
        return None

    def select_top_percent(df, x_col, y_col, fit_func, *params, p=70):
        y_pred = fit_func(df[x_col], *params)
        residuals = np.abs(df[y_col] - y_pred)
        thr = np.percentile(residuals, p)
        return df[residuals <= thr]

    x_dry = lower_boundary_df['VI'].values
    y_dry = lower_boundary_df['STR_dry'].values
    x_wet = upper_boundary_df['VI'].values
    y_wet = upper_boundary_df['STR_wet'].values

    if fit_type == 'linear' and (len(x_dry) < 2 or len(x_wet) < 2):
        return None
    if fit_type == 'polynomial' and (len(x_dry) < 3 or len(x_wet) < 3):
        return None
    if fit_type == 'exponential' and (len(x_dry) < 3 or len(x_wet) < 3):
        return None

    if fit_type == 'linear':
        slope_dry, intercept_dry = np.polyfit(x_dry, y_dry, 1)
        slope_wet, intercept_wet = np.polyfit(x_wet, y_wet, 1)

        lower_trim = select_top_percent(
            lower_boundary_df, 'VI', 'STR_dry', lambda x, m, b: m * x + b, slope_dry, intercept_dry, p=70)
        upper_trim = select_top_percent(
            upper_boundary_df, 'VI', 'STR_wet', lambda x, m, b: m * x + b, slope_wet, intercept_wet, p=70)

        if len(lower_trim) >= 2:
            slope_dry, intercept_dry = np.polyfit(lower_trim['VI'], lower_trim['STR_dry'], 1)
        if len(upper_trim) >= 2:
            slope_wet, intercept_wet = np.polyfit(upper_trim['VI'], upper_trim['STR_wet'], 1)

        return slope_dry, intercept_dry, slope_wet, intercept_wet

    elif fit_type == 'polynomial':
        coeffs_dry = np.polyfit(x_dry, y_dry, 2)
        coeffs_wet = np.polyfit(x_wet, y_wet, 2)

        lower_trim = select_top_percent(
            lower_boundary_df, 'VI', 'STR_dry', lambda x, c: np.polyval(c, x), coeffs_dry, p=60)
        upper_trim = select_top_percent(
            upper_boundary_df, 'VI', 'STR_wet', lambda x, c: np.polyval(c, x), coeffs_wet, p=60)

        if len(lower_trim) >= 3:
            coeffs_dry = np.polyfit(lower_trim['VI'], lower_trim['STR_dry'], 2)
        if len(upper_trim) >= 3:
            coeffs_wet = np.polyfit(upper_trim['VI'], upper_trim['STR_wet'], 2)

        return coeffs_dry, coeffs_wet

    elif fit_type == 'exponential':
        def guess_init(x, y):
            y_min, y_max = float(np.min(y)), float(np.max(y))
            a0 = max(y_max - y_min, 1e-3)  # amplitude guess
            b0 = 1.0                       # growth/decay rate guess
            c0 = y_min                      # offset guess
            return (a0, b0, c0)

        bounds = ([-1e3, -10.0, -1e3], [1e3, 10.0, 1e3])  # broad but finite

        p0_dry = guess_init(x_dry, y_dry)
        p0_wet = guess_init(x_wet, y_wet)

        try:
            popt_dry, _ = curve_fit(exponential_func, x_dry, y_dry, p0=p0_dry, bounds=bounds, maxfev=20000)
            popt_wet, _ = curve_fit(exponential_func, x_wet, y_wet, p0=p0_wet, bounds=bounds, maxfev=20000)
        except Exception as e:
            warnings.warn(f"Exponential fit failed (initial pass): {e}")
            return None

        lower_trim = select_top_percent(
            lower_boundary_df, 'VI', 'STR_dry', exponential_func, *popt_dry, p=70)
        upper_trim = select_top_percent(
            upper_boundary_df, 'VI', 'STR_wet', exponential_func, *popt_wet, p=70)
        if len(lower_trim) >= 3:
            try:
                popt_dry, _ = curve_fit(exponential_func, lower_trim['VI'], lower_trim['STR_dry'],
                                        p0=popt_dry, bounds=bounds, maxfev=20000)
            except Exception as e:
                warnings.warn(f"Exponential dry refit failed: {e}")
                return None
        if len(upper_trim) >= 3:
            try:
                popt_wet, _ = curve_fit(exponential_func, upper_trim['VI'], upper_trim['STR_wet'],
                                        p0=popt_wet, bounds=bounds, maxfev=20000)
            except Exception as e:
                warnings.warn(f"Exponential wet refit failed: {e}")
                return None

        return popt_dry, popt_wet

    return None


def calculate_and_save_statistics(fit_type, fit_params, lower_boundary_df, upper_boundary_df, output_file_path):

    if fit_params is None:
        return  # nothing to do

    if fit_type == 'linear':
        slope_dry, intercept_dry, slope_wet, intercept_wet = fit_params
        fit_dry = f"y = {slope_dry:.2f}x + {intercept_dry:.2f}"
        fit_wet = f"y = {slope_wet:.2f}x + {intercept_wet:.2f}"
        y_pred_dry = slope_dry * lower_boundary_df['VI'] + intercept_dry
        y_pred_wet = slope_wet * upper_boundary_df['VI'] + intercept_wet

    elif fit_type == 'polynomial':
        coeffs_dry, coeffs_wet = fit_params
        fit_dry = f"y = {coeffs_dry[0]:.2f}x² + {coeffs_dry[1]:.2f}x + {coeffs_dry[2]:.2f}"
        fit_wet = f"y = {coeffs_wet[0]:.2f}x² + {coeffs_wet[1]:.2f}x + {coeffs_wet[2]:.2f}"
        y_pred_dry = np.polyval(coeffs_dry, lower_boundary_df['VI'])
        y_pred_wet = np.polyval(coeffs_wet, upper_boundary_df['VI'])

    elif fit_type == 'exponential':
        popt_dry, popt_wet = fit_params
        fit_dry = f"y = {popt_dry[0]:.2f}e^({popt_dry[1]:.2f}x) + {popt_dry[2]:.2f}"
        fit_wet = f"y = {popt_wet[0]:.2f}e^({popt_wet[1]:.2f}x) + {popt_wet[2]:.2f}"
        y_pred_dry = exponential_func(lower_boundary_df['VI'], *popt_dry)
        y_pred_wet = exponential_func(upper_boundary_df['VI'], *popt_wet)
    else:
        return

    rmse_dry = np.sqrt(mean_squared_error(lower_boundary_df['STR_dry'], y_pred_dry))
    rmse_wet = np.sqrt(mean_squared_error(upper_boundary_df['STR_wet'], y_pred_wet))
    r2_dry = r2_score(lower_boundary_df['STR_dry'], y_pred_dry)
    r2_wet = r2_score(upper_boundary_df['STR_wet'], y_pred_wet)

    save_fit_statistics(fit_type, fit_dry, fit_wet, rmse_dry, rmse_wet, r2_dry, r2_wet, output_file_path)

from matplotlib.ticker import FuncFormatter

def apply_leading_zero_if_needed(ax):
    yticks = ax.get_yticks()
    if len(yticks) == 0:
        return
    y_max_display = np.nanmax(yticks)
    if y_max_display < 10:
        def _fmt(x, pos):
            try:
                v = int(round(x))
            except Exception:
                return ""
            label = f"{v:02d}"  # 两位数
            if label.startswith("0"):
                return "\u2007" + label[1]   # figure space
            return label
        ax.yaxis.set_major_formatter(FuncFormatter(_fmt))


def fit_and_plot_optimized(
    df, lower_boundary_df, upper_boundary_df, fit_type, fit_params, title, output_file_path,
    stats_output_path, additional_points=None, vi_type='NDVI', rng_seed=1337
):

    os.makedirs(os.path.dirname(output_file_path), exist_ok=True)

    fig, ax = plt.subplots(figsize=(2,2))

    x = df[vi_type].values
    y = df['STR'].values

    rng = np.random.default_rng(rng_seed)
    sample_size = min(len(x), 50000)
    if sample_size > 0:
        sampled_indices = rng.choice(len(x), sample_size, replace=False)
        x_sampled = x[sampled_indices]
        y_sampled = y[sampled_indices]
    else:
        x_sampled = x
        y_sampled = y

    if len(x_sampled) > 0:
        xy = np.vstack([x_sampled, y_sampled]).T
        kde = KernelDensity(bandwidth=0.05, kernel='gaussian')
        kde.fit(xy)
        log_density = kde.score_samples(xy)
        density = np.exp(log_density)
        ax.scatter(x_sampled, y_sampled, c=density, cmap='viridis', s=0.5, alpha=0.3, label='Point Density')

    # Optional measured points
    if additional_points is not None and not additional_points.empty:
        ax.scatter(
            additional_points[vi_type], additional_points['STR'],
            c='green', s=20, marker='x', label='Measured Points'
        )

    if fit_params is not None:
        if fit_type == 'linear':
            slope_dry, intercept_dry, slope_wet, intercept_wet = fit_params
            ax.plot(
                lower_boundary_df['VI'], slope_dry * lower_boundary_df['VI'] + intercept_dry,
                color='red', linestyle='--', linewidth=2, label='Dry Edge (Linear)'
            )
            ax.plot(
                upper_boundary_df['VI'], slope_wet * upper_boundary_df['VI'] + intercept_wet,
                color='blue', linestyle='--', linewidth=2, label='Wet Edge (Linear)'
            )
        elif fit_type == 'polynomial':
            coeffs_dry, coeffs_wet = fit_params
            ax.plot(
                lower_boundary_df['VI'], np.polyval(coeffs_dry, lower_boundary_df['VI']),
                color='red', linestyle='--', linewidth=2, label='Dry Edge (Polynomial)'
            )
            ax.plot(
                upper_boundary_df['VI'], np.polyval(coeffs_wet, upper_boundary_df['VI']),
                color='blue', linestyle='--', linewidth=2, label='Wet Edge (Polynomial)'
            )
        elif fit_type == 'exponential':
            popt_dry, popt_wet = fit_params
            ax.plot(
                lower_boundary_df['VI'], exponential_func(lower_boundary_df['VI'], *popt_dry),
                color='red', linestyle='--', linewidth=2, label='Dry Edge (Exponential)'
            )
            ax.plot(
                upper_boundary_df['VI'], exponential_func(upper_boundary_df['VI'], *popt_wet),
                color='blue', linestyle='--', linewidth=2, label='Wet Edge (Exponential)'
            )

    ax.set_xlim(0, 1)
    ax.set_ylim(-0.2, 10)
    ax.set_xticks(np.arange(0, 1.2, 0.2))
    ax.set_yticks(np.linspace(0, 10, 6))
    for lbl in ax.get_xticklabels() + ax.get_yticklabels():
        lbl.set_fontsize(6.5)
        lbl.set_fontweight('bold')
    apply_leading_zero_if_needed(ax)

    plt.tight_layout()
    plt.savefig(output_file_path, dpi=300, bbox_inches='tight')
    plt.close(fig)

# ------------------------------
# Single-run main
# ------------------------------

def main_single_run():
    file_path = INPUT_CSV
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"Input CSV not found: {file_path}")

    base_name = os.path.splitext(os.path.basename(file_path))[0]


    df = load_data(file_path, VI_TYPE)
    if df.empty:
        print(f"[{base_name} | {VI_TYPE}] Skipped: empty or invalid data.")
        return

    vi_min = float(df[VI_TYPE].min())
    vi_max = float(df[VI_TYPE].max())
    if not np.isfinite(vi_min) or not np.isfinite(vi_max) or vi_max <= vi_min:
        print(f"[{base_name} | {VI_TYPE}] Skipped: invalid VI range.")
        return


    n_steps = max(1, int(np.ceil((vi_max - vi_min) / 0.01)))
    vi_steps_dry = np.linspace(vi_min, vi_max, n_steps, endpoint=False)
    vi_steps_wet = vi_steps_dry.copy()

    # Detect boundaries
    lower_boundary_df, upper_boundary_df = find_boundaries(
        df, vi_steps_dry, vi_steps_wet, vi_type=VI_TYPE
    )

    if lower_boundary_df.empty or upper_boundary_df.empty:
        print(f"[{base_name} | {VI_TYPE}] Skipped: empty boundary(s).")
        return


    fit_params = fit_edges(lower_boundary_df, upper_boundary_df, FIT_TYPE)
    if fit_params is None:
        print(f"[{FIT_TYPE}] Fitting failed or not enough points. Skipped.")
        return


    stats_output_path = os.path.join(
        EXPORT_FOLDER, f"{FIT_TYPE}_{base_name}_{VI_TYPE}.txt"
    )
    calculate_and_save_statistics(
        FIT_TYPE, fit_params, lower_boundary_df, upper_boundary_df, stats_output_path
    )

    title = f"{base_name}"
    output_file_path = os.path.join(
        OUTPUT_FIGURE_FOLDER, f"{base_name}-{VI_TYPE}-{FIT_TYPE}.png"
    )
    fit_and_plot_optimized(
        df, lower_boundary_df, upper_boundary_df, FIT_TYPE, fit_params, title,
        output_file_path, stats_output_path, vi_type=VI_TYPE, rng_seed=1337
    )

    print("Done!")


if __name__ == "__main__":
    main_single_run()


Done (single run)!
