In [3]:
# inspect_and_convert_to_onnx.py
import joblib
import numpy as np
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
import os, sys, traceback
from sklearn.pipeline import Pipeline as SkPipeline

model_path = "model/landsilde_rf_pipeline_best.joblib"
target_opset = 11

def find_pipeline(est):
    # ถ้าเป็น RandomizedSearchCV หรือ GridSearchCV ให้ใช้ best_estimator_
    if hasattr(est, "best_estimator_"):
        print("[INFO] Using .best_estimator_ from search object")
        est = est.best_estimator_
    return est

def try_get_steps(est):
    steps = getattr(est, "steps", None)
    if steps:
        print("[INFO] Pipeline steps:")
        for n, s in steps:
            print("  -", n, "->", type(s))
    else:
        print("[INFO] No steps attribute on model (not a pipeline?)")
    return steps

def find_scaler_feature_count(est):
    # ถ้ามี named_steps และ scaler อยู่ ให้ดึง n_features_in_ หรือ mean_.shape
    try:
        named = getattr(est, "named_steps", None)
        if named:
            # ลองหา scaler-like step (ชื่อ 'scaler' หรือ object type StandardScaler)
            if 'scaler' in named:
                sc = named['scaler']
            else:
                # search for any StandardScaler
                from sklearn.preprocessing import StandardScaler
                sc = None
                for k, v in named.items():
                    if isinstance(v, StandardScaler):
                        sc = v
                        print(f"[INFO] Found StandardScaler at step '{k}'")
                        break
                if sc is None:
                    print("[INFO] No StandardScaler found in named_steps")
                    return None
            # now inspect scaler
            n = getattr(sc, "n_features_in_", None)
            if n is None:
                mean = getattr(sc, "mean_", None)
                if mean is not None:
                    n = getattr(mean, "shape", [None])[0]
            print("[INFO] Scaler expects n_features =", n)
            return int(n) if n is not None else None
    except Exception as e:
        print("[WARN] Error while inspecting scaler:", e)
    # fallback: try estimator level attribute
    n = getattr(est, "n_features_in_", None)
    if n is not None:
        print("[INFO] Estimator n_features_in_ =", n)
        return int(n)
    return None

def remove_smote_if_any(est):
    # If pipeline has 'smote' step (imblearn), remove it for conversion
    steps = getattr(est, "steps", None)
    if not steps:
        return est
    filtered = [(n, s) for (n, s) in steps if n.lower() != 'smote']
    if len(filtered) != len(steps):
        print("[INFO] Removed 'smote' step from pipeline for conversion")
        return SkPipeline(filtered)
    return est

def convert_with_inferred_dim(est, n_features):
    if n_features is None:
        raise ValueError("n_features is None - cannot set initial_type automatically")
    initial_type = [('float_input', FloatTensorType([None, int(n_features)]))]
    print("[INFO] Using initial_type with n_features =", n_features)
    onx = convert_sklearn(est, initial_types=initial_type, target_opset=target_opset)
    return onx

def main():
    try:
        model = joblib.load(model_path)
    except Exception as e:
        print("[ERROR] Could not load model:", e)
        traceback.print_exc()
        sys.exit(1)

    print("Loaded model type:", type(model))

    # Use best_estimator_ if wrapper
    model = find_pipeline(model)

    # show pipeline steps if any
    try_get_steps(model)

    # Possibly remove SMOTE if present
    model_for_export = remove_smote_if_any(model)

    # Try to infer n_features from scaler or estimator
    n_features = find_scaler_feature_count(model_for_export)
    print("[INFO] Inferred n_features =", n_features)

    # If inferred n_features is small (e.g. 6) we proceed; if it's large (e.g. 400)
    # we warn user to double-check that this is the expected model.
    if n_features is not None:
        print("[INFO] Attempting to convert using inferred n_features...")
        try:
            onx = convert_with_inferred_dim(model_for_export, n_features)
            out_path = "model/landslide_rf_pipeline.onnx"
            with open(out_path, "wb") as f:
                f.write(onx.SerializeToString())
            print("[OK] Saved ONNX to:", out_path)
            return
        except Exception as e:
            print("[ERROR] Conversion failed with inferred n_features:", e)
            traceback.print_exc()

    # If we get here, either we couldn't infer or conversion failed
    print("""
[FAILED] Could not convert automatically.
Diagnostics you should check:
  * Are you converting the correct model file? (maybe the model was trained on many features)
  * Print model.named_steps and inspect which transformer expands features (OneHotEncoder/ColumnTransformer)
  * If scaler expects 400 features but you intend to use 6 features, you're converting the wrong model.
  * You can manually set initial_type = [('float_input', FloatTensorType([None, 6]))] but ensure pipeline expects 6 inputs.
""")

if __name__ == "__main__":
    main()


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


Loaded model type: <class 'sklearn.pipeline.Pipeline'>
[INFO] Pipeline steps:
  - scaler -> <class 'sklearn.preprocessing._data.StandardScaler'>
  - clf -> <class 'sklearn.ensemble._forest.RandomForestClassifier'>
[INFO] Scaler expects n_features = 5
[INFO] Inferred n_features = 5
[INFO] Attempting to convert using inferred n_features...
[INFO] Using initial_type with n_features = 5
[OK] Saved ONNX to: model/landslide_rf_pipeline.onnx
