<a href="https://colab.research.google.com/github/brotheramin/MachineLearning/blob/main/SahandProject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
# ==== RP Failure Classifier (auto-detect failure column, safe for single-class) ====
import pandas as pd, numpy as np, joblib
from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score, confusion_matrix

EXCEL_PATH = "XF.xlsx"
SHEET_NAME = "Sheet2"
TARGET_POSITIVE_RATE = 0.35        # you said ~35% RP segments fail
USER_FAILURE_COL = None            # set to a specific column to skip auto-detect (e.g., "HydroTest")

# --- version-safe OneHot ---
def make_onehot():
    try:    return OneHotEncoder(handle_unknown="ignore", sparse_output=True)
    except TypeError:
        return OneHotEncoder(handle_unknown="ignore", sparse=True)

# --- helpers to map columns to binary ---
POS_TOKENS = {"fail","failed","ng","reject","leak","leakage","bad","not pass","nopass","np","no pass"}
NEG_TOKENS = {"pass","ok","good","normal","success"}

def normalize_str_series(s: pd.Series) -> pd.Series:
    return s.astype(str).str.strip().str.lower()

def to_binary(series: pd.Series) -> pd.Series:
    """Map a series to 0/1 if it looks like a failure flag. Return None if not mappable."""
    if np.issubdtype(series.dtype, np.number):
        # numeric: treat >0 as fail
        x = pd.to_numeric(series, errors="coerce")
        if x.notna().sum() == 0:
            return None
        # must be (mostly) binary-like
        vals = set(np.unique(x.dropna()))
        if vals.issubset({0,1}) or vals.issubset({0.0,1.0}) or len(vals) <= 3:
            return (x.fillna(0) > 0).astype(int)
        return None
    # strings
    xs = normalize_str_series(series)
    uniq = set(xs.unique())
    # direct binary case (two distinct values)
    if len(uniq) == 2:
        a,b = list(uniq)
        # choose positive by token match
        def has_pos(u):
            return any(tok in u for tok in POS_TOKENS)
        pos_val = a if has_pos(a) else (b if has_pos(b) else None)
        if pos_val is not None:
            return xs.eq(pos_val).astype(int)
    # token-based mapping
    pos_mask = xs.apply(lambda u: any(tok in u for tok in POS_TOKENS))
    neg_mask = xs.apply(lambda u: any(tok in u for tok in NEG_TOKENS))
    if pos_mask.any() or neg_mask.any():
        return pos_mask.astype(int)
    return None

def auto_detect_failure_col(df_rp: pd.DataFrame, exclude_cols=None):
    """Scan columns right-to-left; pick the first that cleanly maps to binary with non-trivial positives."""
    if exclude_cols is None: exclude_cols = set()
    candidates = []
    for col in reversed(df_rp.columns.tolist()):
        if col in exclude_cols:
            continue
        y_bin = to_binary(df_rp[col])
        if y_bin is None:
            continue
        pos_rate = y_bin.mean()
        # keep columns with 1%–99% positives
        if 0.01 <= pos_rate <= 0.99:
            # prioritize those close to your 35% expectation
            score = abs(pos_rate - TARGET_POSITIVE_RATE)
            candidates.append((score, -len(str(col)), col, pos_rate))  # tie-breakers: shorter name later
    if candidates:
        candidates.sort()
        _,_,chosen, pr = candidates[0]
        return chosen, pr
    return None, None

# --- load & light clean ---
df = pd.read_excel(EXCEL_PATH, sheet_name=SHEET_NAME)
drop_unnamed = [c for c in df.columns if str(c).lower().startswith("unnamed")]
df = df.drop(columns=drop_unnamed, errors="ignore")
df = df.drop(columns=df.columns[df.isna().mean() > 0.70], errors="ignore")
for c in ["Counter","Weld NO."]:
    if c in df.columns: df = df.drop(columns=[c])

# --- RP filter (RESULT / RESULT2) ---
def is_rp(s): return s.astype(str).str.strip().str.upper().isin(["RP","REPAIR"])
rp_mask = pd.Series(False, index=df.index)
if "RESULT"  in df.columns:  rp_mask |= is_rp(df["RESULT"])
if "RESULT2" in df.columns:  rp_mask |= is_rp(df["RESULT2"])
df_rp = df.loc[rp_mask].copy()
if df_rp.empty:
    raise ValueError("No RP rows found in RESULT/RESULT2. Please check your dataset.")

# --- choose failure column ---
exclude = {"RESULT","RESULT2"}
if USER_FAILURE_COL is not None and USER_FAILURE_COL in df_rp.columns:
    failure_col = USER_FAILURE_COL
    y_bin = to_binary(df_rp[failure_col])
    pos_rate = float(y_bin.mean()) if y_bin is not None else 0.0
    if (y_bin is None) or (pos_rate==0.0 or pos_rate==1.0):
        # fallback to auto-detect if unusable
        failure_col, pos_rate = auto_detect_failure_col(df_rp, exclude_cols=exclude)
else:
    failure_col, pos_rate = auto_detect_failure_col(df_rp, exclude_cols=exclude)

# Final fallback: use the last column if still nothing
if failure_col is None:
    failure_col = df_rp.columns[-1]
    y_bin = to_binary(df_rp[failure_col])
    pos_rate = float(y_bin.mean()) if y_bin is not None else 0.0

print(f"Using failure column: {failure_col} | RP positive rate in this column: {pos_rate:.3f}")

# If still not binary-mappable or single-class, stop early with guidance
y_bin = to_binary(df_rp[failure_col])
if (y_bin is None) or (y_bin.nunique() < 2):
    raise SystemExit(
        "Could not find a usable failure column with both classes in the RP subset.\n"
        "Tips:\n"
        "  1) Inspect the rightmost columns and any that mention 'fail', 'reject', 'leak', etc.\n"
        "  2) Set USER_FAILURE_COL = '<ExactColumnName>' above and re-run.\n"
        "  3) If the failure flag is numeric, ensure 1/0 or >0 implies failure.\n"
    )

# --- build X/y for RP subset ---
mask = df_rp[failure_col].notna()
X = df_rp.loc[mask].drop(columns=[failure_col]).copy()
y = y_bin.loc[mask]

cat = [c for c in X.columns if X[c].dtype == "object"]
num = [c for c in X.columns if c not in cat]
for c in cat: X[c] = X[c].astype(str)
X = X[cat + num]

# --- stratified split (guard if classes very rare) ---
if y.nunique() < 2:
    raise SystemExit("Target has only one class after NA filtering. Unable to train a classifier.")

try:
    X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.20, random_state=42, stratify=y)
except ValueError:
    # If class counts are tiny, fall back to unstratified split
    X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.20, random_state=42)

print(f"[Info] RP rows used: {len(X)} | Fail rate: {y.mean():.3f}")

# --- pipeline & model ---
pre = ColumnTransformer([
    ("num", SimpleImputer(strategy="median"), num),
    ("cat", Pipeline([("imp", SimpleImputer(strategy="most_frequent")),
                      ("ohe", make_onehot())]), cat),
])
clf = RandomForestClassifier(
    n_estimators=200, max_depth=20, min_samples_leaf=5, max_features=0.7,
    class_weight="balanced", n_jobs=-1, random_state=42
)
pipe = Pipeline([("preprocess", pre), ("clf", clf)])
pipe.fit(X_tr, y_tr)

# --- evaluate (safe proba handling) ---
def report(y_true, proba, t):
    pred = (proba >= t).astype(int)
    acc = accuracy_score(y_true, pred)
    prec, rec, f1, _ = precision_recall_fscore_support(y_true, pred, average="binary", zero_division=0)
    try: auc = roc_auc_score(y_true, proba)
    except: auc = float("nan")
    cm = confusion_matrix(y_true, pred)
    return dict(t=float(t), pos_rate=float(pred.mean()), acc=acc, prec=prec, rec=rec, f1=f1, auc=auc, cm=cm)

# Predict probabilities if two classes exist; else back off to zeros
if hasattr(pipe.named_steps["clf"], "classes_") and len(pipe.named_steps["clf"].classes_) == 2:
    proba = pipe.predict_proba(X_te)[:, 1]
else:
    proba = np.zeros(len(X_te), dtype=float)  # degenerate (shouldn't happen after checks)

r50 = report(y_te, proba, 0.50)
print("\n@ t=0.50:", {k:r50[k] for k in ["t","pos_rate","acc","prec","rec","f1","auc"]})
print("Confusion matrix:\n", r50["cm"])

# threshold to match ~35% positive rate
cands = np.linspace(0.1, 0.9, 17)
best, best_diff = None, 1.0
for t in cands:
    r = report(y_te, proba, t)
    d = abs(r["pos_rate"] - TARGET_POSITIVE_RATE)
    if d < best_diff: best, best_diff = r, d

print("\n@ t≈target 35%:", {k:best[k] for k in ["t","pos_rate","acc","prec","rec","f1","auc"]})
print("Confusion matrix:\n", best["cm"])

# --- save model ---
joblib.dump(pipe, "RP_failure_classifier.pkl")
print("\nSaved: RP_failure_classifier.pkl")


Using failure column: HydroTest | RP positive rate in this column: 0.053
[Info] RP rows used: 2794 | Fail rate: 0.053

@ t=0.50: {'t': 0.5, 'pos_rate': 0.057245080500894455, 'acc': 0.9767441860465116, 'prec': 0.75, 'rec': 0.8275862068965517, 'f1': 0.7868852459016393, 'auc': np.float64(0.9469746258945998)}
Confusion matrix:
 [[522   8]
 [  5  24]]

@ t≈target 35%: {'t': 0.1, 'pos_rate': 0.2826475849731664, 'acc': 0.7620751341681574, 'prec': 0.17088607594936708, 'rec': 0.9310344827586207, 'f1': 0.2887700534759358, 'auc': np.float64(0.9469746258945998)}
Confusion matrix:
 [[399 131]
 [  2  27]]

Saved: RP_failure_classifier.pkl
