In [1]:
# ============================================================
# 3W Dataset (WELL-only) — Row-per-file Classification (v3_1)
# Fully runnable, leakage-safe, interview-ready (+ stability eval)
# ============================================================

import os, json
import numpy as np
import pandas as pd
from tqdm.auto import tqdm

from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin, clone
from sklearn.model_selection import GroupShuffleSplit, StratifiedGroupKFold
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import f1_score, classification_report

# -------------------------
# Config
# -------------------------
BASE = "/kaggle/input/3w-dataset/2.0.0"
RANDOM_STATE = 42
N_WELL_FILES = None  # None = all WELL files

OUT_DIR = "/kaggle/working/3w_prepared_v3_1_fixed"
os.makedirs(OUT_DIR, exist_ok=True)

CACHE_PATH = f"{OUT_DIR}/df_ml_well_v3_1_fixed.parquet"
print("OUT_DIR:", OUT_DIR)
print("CACHE_PATH:", CACHE_PATH)

# -------------------------
# Dataset constants / mappings
# -------------------------
VAR_RENAME = {
    "ABER-CKGL": "gl_choke_opening_pct",
    "ABER-CKP":  "prod_choke_opening_pct",
    "ESTADO-DHSV":   "dhsv_state",
    "ESTADO-M1":     "prod_master_valve_state",
    "ESTADO-M2":     "ann_master_valve_state",
    "ESTADO-PXO":    "pig_crossover_valve_state",
    "ESTADO-SDV-GL": "gl_shutdown_valve_state",
    "ESTADO-SDV-P":  "prod_shutdown_valve_state",
    "ESTADO-W1":     "prod_wing_valve_state",
    "ESTADO-W2":     "ann_wing_valve_state",
    "ESTADO-XO":     "crossover_valve_state",
    "P-ANULAR":     "annulus_pressure_pa",
    "P-JUS-BS":     "svc_pump_downstream_pressure_pa",
    "P-JUS-CKGL":   "gl_choke_downstream_pressure_pa",
    "P-JUS-CKP":    "prod_choke_downstream_pressure_pa",
    "P-MON-CKGL":   "gl_choke_upstream_pressure_pa",
    "P-MON-CKP":    "prod_choke_upstream_pressure_pa",
    "P-MON-SDV-P":  "prod_sdv_upstream_pressure_pa",
    "P-PDG":        "pdg_downhole_pressure_pa",
    "PT-P":         "xmas_tree_prod_line_pressure_pa",
    "P-TPT":        "tpt_pressure_pa",
    "QBS": "svc_pump_flow_m3s",
    "QGL": "gas_lift_flow_m3s",
    "T-JUS-CKP": "prod_choke_downstream_temp_c",
    "T-MON-CKP": "prod_choke_upstream_temp_c",
    "T-PDG":     "pdg_downhole_temp_c",
    "T-TPT":     "tpt_temp_c",
    "class": "class_code",
    "state": "state_code",
}

EVENT_TYPE_CODE_TO_NAME = {
    0:"Normal Operation", 1:"Abrupt Increase of BSW", 2:"Spurious Closure of DHSV",
    3:"Severe Slugging", 4:"Flow Instability", 5:"Rapid Productivity Loss",
    6:"Quick Restriction in PCK", 7:"Scaling in PCK",
    8:"Hydrate in Production Line", 9:"Hydrate in Service Line",
}

LABEL_COLS = {"class_code", "state_code", "class_label", "state_label"}

# ============================================================
# 1) Build file index
# ============================================================
def build_file_index(base: str) -> pd.DataFrame:
    paths = []
    for root, _, files in os.walk(base):
        for f in files:
            if f.endswith(".parquet"):
                paths.append(os.path.join(root, f))

    df = pd.DataFrame({"path": paths})
    codes = df["path"].str.extract(r"/2\.0\.0/(\d+)/", expand=False)
    df["event_type_code"] = pd.to_numeric(codes, errors="coerce").astype("Int64")
    df = df.dropna(subset=["event_type_code"])
    df["event_type_code"] = df["event_type_code"].astype(int)

    df["file"] = df["path"].str.split("/").str[-1]
    df["source"] = df["file"].str.extract(r"^(WELL|SIMULATED|DRAWN)")
    df["well_id"] = df["file"].str.extract(r"(WELL-\d+)")
    df["run_ts"] = df["file"].str.extract(r"_(\d{14})")
    df["run_ts"] = pd.to_datetime(df["run_ts"], format="%Y%m%d%H%M%S", errors="coerce")

    return df.sort_values(["event_type_code","source","well_id","run_ts"]).reset_index(drop=True)

df_files = build_file_index(BASE)
df_w_files = df_files[df_files["source"] == "WELL"].copy()
assert df_w_files["well_id"].notna().all()

print("Total files:", len(df_files), "| WELL files:", len(df_w_files), "| Wells:", df_w_files["well_id"].nunique())
print("Class counts:\n", df_w_files["event_type_code"].value_counts().sort_index())

# ============================================================
# 2) Cleaning
# ============================================================
def clean_3w_instance(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    if "timestamp" in df.columns:
        df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
        df = df.set_index("timestamp")
    else:
        df.index = pd.to_datetime(df.index, errors="coerce")

    df = df[~df.index.isna()].sort_index()
    df.index.name = "timestamp"

    df = df.rename(columns=VAR_RENAME)

    for c in df.columns:
        if c in ("class_code", "state_code"):
            df[c] = pd.to_numeric(df[c], errors="coerce").astype("Int16")
        else:
            df[c] = pd.to_numeric(df[c], errors="coerce").astype("float64")
    return df

# ============================================================
# 3) Feature extraction (fixed z_last + time-window first/last)
# ============================================================
def summarize_timeseries_v3_1(df_clean: pd.DataFrame, frac: float = 0.1, state_max: int = 3) -> dict:
    sensors = df_clean.drop(columns=list(LABEL_COLS), errors="ignore")
    num = sensors.select_dtypes(include=[np.number])

    out = {
        "n_obs": int(len(df_clean)),
        "duration_s": float((df_clean.index.max() - df_clean.index.min()).total_seconds())
                      if len(df_clean) else np.nan,
    }
    if num.shape[1] == 0 or len(num) == 0:
        return out

    state_cols = [c for c in num.columns if c.endswith("_state")]
    cont_cols  = [c for c in num.columns if c not in state_cols]

    def _first_last_masks(index, frac: float, n: int):
        k = max(1, int(n * frac))
        if n < 2:
            m = np.ones(n, dtype=bool)
            return m, m

        try:
            idx = pd.DatetimeIndex(index)
        except Exception:
            idx = None

        if idx is not None and idx.notna().all():
            tmin, tmax = idx.min(), idx.max()
            dur_s = (tmax - tmin).total_seconds()
            if np.isfinite(dur_s) and dur_s > 0:
                w = dur_s * frac
                t_first_end = tmin + pd.Timedelta(seconds=w)
                t_last_start = tmax - pd.Timedelta(seconds=w)

                first_mask = np.asarray(idx <= t_first_end, dtype=bool)
                last_mask  = np.asarray(idx >= t_last_start, dtype=bool)

                if first_mask.sum() == 0:
                    first_mask = np.zeros(n, dtype=bool); first_mask[:k] = True
                if last_mask.sum() == 0:
                    last_mask = np.zeros(n, dtype=bool); last_mask[-k:] = True
                return first_mask, last_mask

        first_mask = np.zeros(n, dtype=bool); first_mask[:k] = True
        last_mask  = np.zeros(n, dtype=bool); last_mask[-k:] = True
        return first_mask, last_mask

    if len(cont_cols):
        cont = num[cont_cols]
        med_raw = cont.median()
        iqr_raw = (cont.quantile(0.75) - cont.quantile(0.25))

        iqr = iqr_raw.replace(0, np.nan)
        z = (cont - med_raw) / iqr

        first_mask, last_mask = _first_last_masks(df_clean.index, frac=frac, n=len(z))
        first = z.iloc[first_mask].mean()
        last  = z.iloc[last_mask].mean()

        agg = z.agg(["mean", "std", "min", "max"]).T
        miss = cont.isna().mean()

        for col in cont_cols:
            out[f"{col}__raw_median"] = med_raw[col]
            out[f"{col}__raw_iqr"]    = iqr_raw[col]

            s_raw = cont[col].dropna()
            out[f"{col}__raw_last"] = s_raw.iloc[-1] if len(s_raw) else np.nan

            out[f"{col}__z_mean"] = agg.loc[col, "mean"]
            out[f"{col}__z_std"]  = agg.loc[col, "std"]
            out[f"{col}__z_min"]  = agg.loc[col, "min"]
            out[f"{col}__z_max"]  = agg.loc[col, "max"]

            s_z = z[col].dropna()
            out[f"{col}__z_last"] = s_z.iloc[-1] if len(s_z) else np.nan

            out[f"{col}__delta_last_first"] = (last[col] - first[col])
            out[f"{col}__abs_delta"]        = abs(last[col] - first[col])
            out[f"{col}__missing_frac"]     = miss[col]

    if len(state_cols):
        st = num[state_cols]
        for col in state_cols:
            s = st[col]
            s_non = s.dropna()

            out[f"{col}__missing_frac"] = float(s.isna().mean())
            out[f"{col}__last"] = float(s_non.iloc[-1]) if len(s_non) else np.nan

            if len(s_non) >= 2:
                n_trans = int((s_non != s_non.shift()).sum() - 1)
            else:
                n_trans = 0

            out[f"{col}__n_transitions"] = n_trans
            out[f"{col}__transitions_rate"] = n_trans / max(1, len(s_non))

            known = 0.0
            for v in range(state_max + 1):
                p = float((s_non == v).mean()) if len(s_non) else np.nan
                out[f"{col}__p_state_{v}"] = p
                if not np.isnan(p):
                    known += p
            out[f"{col}__p_state_other"] = (1.0 - known) if len(s_non) else np.nan

    return out

# ============================================================
# 4) Build row-per-file dataset (with caching)
# ============================================================
def build_row_per_file_dataset(df_files: pd.DataFrame, n_files: int | None = None, random_state: int = 42) -> pd.DataFrame:
    if n_files is None:
        sample = df_files.reset_index(drop=True)
    else:
        sample = df_files.sample(n_files, random_state=random_state).reset_index(drop=True)

    rows = []
    for _, r in tqdm(sample.iterrows(), total=len(sample), desc="Building WELL dataset"):
        df_raw = pd.read_parquet(r["path"])
        df_clean = clean_3w_instance(df_raw)

        feats = summarize_timeseries_v3_1(df_clean)
        feats["event_type_code"] = int(r["event_type_code"])
        feats["event_type_name"] = EVENT_TYPE_CODE_TO_NAME.get(int(r["event_type_code"]), "Unknown")
        feats["well_id"] = r["well_id"]
        feats["run_ts"] = r["run_ts"]
        feats["file"] = r["file"]
        rows.append(feats)

    return pd.DataFrame(rows)

if os.path.exists(CACHE_PATH):
    df_ml_well = pd.read_parquet(CACHE_PATH)
    print("Loaded cached features:", df_ml_well.shape)
else:
    df_ml_well = build_row_per_file_dataset(df_w_files, n_files=N_WELL_FILES, random_state=RANDOM_STATE)
    df_ml_well.to_parquet(CACHE_PATH, index=False)
    print("Built + saved features:", df_ml_well.shape)

with open(f"{OUT_DIR}/dataset_config.json", "w") as f:
    json.dump({
        "base": BASE,
        "random_state": RANDOM_STATE,
        "n_well_files_used": int(len(df_ml_well)),
        "features_version": "row_per_file_v3_1_fixed",
        "notes": "continuous: raw median/iqr/last + robust-z stats + deltas; state: last + transitions + proportions (+other); WELL-only; fixes: time-window first/last + z_last last-valid"
    }, f, indent=2)

# ============================================================
# 5) Build X/y/groups (NO global filtering leakage)
# ============================================================
def make_Xy_groups(df: pd.DataFrame):
    y = df["event_type_code"].astype(int).copy()
    groups = df["well_id"].astype(str).copy()
    drop_cols = ["event_type_code","event_type_name","file","run_ts","well_id"]
    X = df.drop(columns=drop_cols, errors="ignore").copy()
    X = X.replace([np.inf, -np.inf], np.nan)
    return X, y, groups

X, y, groups = make_Xy_groups(df_ml_well)
print("X:", X.shape, "| y:", y.shape, "| wells:", groups.nunique())
print("Label counts:\n", y.value_counts().sort_index())

# ============================================================
# 6) Train-only column filter (inside pipeline)
# ============================================================
class TrainOnlyColumnFilter(BaseEstimator, TransformerMixin):
    def __init__(self, missing_threshold: float = 0.98):
        self.missing_threshold = missing_threshold

    def fit(self, X, y=None):
        X = X.copy().replace([np.inf, -np.inf], np.nan)
        all_missing = X.columns[X.isna().all()]
        miss = X.isna().mean()
        high_missing = miss[miss > self.missing_threshold].index
        const_cols = [c for c in X.columns if X[c].nunique(dropna=True) <= 1]
        drop = set(all_missing) | set(high_missing) | set(const_cols)
        self.keep_columns_ = [c for c in X.columns if c not in drop]
        return self

    def transform(self, X):
        X = X.copy().replace([np.inf, -np.inf], np.nan)
        for c in getattr(self, "keep_columns_", []):
            if c not in X.columns:
                X[c] = np.nan
        return X[self.keep_columns_]

# ============================================================
# 7) Weighted HGB estimator (cloneable)
# ============================================================
class WeightedHGBClassifier(BaseEstimator, ClassifierMixin):
    def __init__(
        self,
        max_depth=6,
        learning_rate=0.06,
        max_iter=700,
        max_leaf_nodes=31,
        min_samples_leaf=20,
        l2_regularization=0.1,
        random_state=42,
    ):
        self.max_depth = max_depth
        self.learning_rate = learning_rate
        self.max_iter = max_iter
        self.max_leaf_nodes = max_leaf_nodes
        self.min_samples_leaf = min_samples_leaf
        self.l2_regularization = l2_regularization
        self.random_state = random_state

    def fit(self, X, y):
        self.clf_ = HistGradientBoostingClassifier(
            max_depth=self.max_depth,
            learning_rate=self.learning_rate,
            max_iter=self.max_iter,
            max_leaf_nodes=self.max_leaf_nodes,
            min_samples_leaf=self.min_samples_leaf,
            l2_regularization=self.l2_regularization,
            random_state=self.random_state,
        )

        y_arr = np.asarray(y)
        classes = np.unique(y_arr)
        cw = compute_class_weight(class_weight="balanced", classes=classes, y=y_arr)
        cw_map = dict(zip(classes, cw))
        sample_weight = np.array([cw_map[v] for v in y_arr], dtype=float)

        self.clf_.fit(X, y_arr, sample_weight=sample_weight)
        self.classes_ = classes
        return self

    def predict(self, X):
        return self.clf_.predict(X)

# ============================================================
# 8) Evaluation (fixed labels; major classes from train per fold)
# ============================================================
def evaluate_stratified_group_cv(model, X, y, groups, n_splits=4, random_state=42, major_min_support=10):
    labels = np.sort(pd.unique(y))
    cv = StratifiedGroupKFold(n_splits=n_splits, shuffle=True, random_state=random_state)

    macro_f1s, major_f1s, bin_f1s = [], [], []
    per_class_f1 = {int(lbl): [] for lbl in labels}

    for fold, (tr, te) in enumerate(cv.split(X, y, groups=groups), start=1):
        Xtr, Xte = X.iloc[tr], X.iloc[te]
        ytr, yte = y.iloc[tr], y.iloc[te]

        m = clone(model)
        m.fit(Xtr, ytr)
        pred = m.predict(Xte)

        macro = f1_score(yte, pred, average="macro", labels=labels, zero_division=0)
        macro_f1s.append(macro)

        major_classes = set(ytr.value_counts()[lambda s: s >= major_min_support].index.astype(int))
        if len(major_classes) > 0:
            major_labels = np.array(sorted(major_classes))
            major_mask = yte.isin(major_labels)
            if major_mask.any():
                major = f1_score(
                    yte[major_mask], pred[major_mask],
                    average="macro", labels=major_labels, zero_division=0
                )
            else:
                major = np.nan
        else:
            major = np.nan
        major_f1s.append(major)

        yte_bin = (yte != 0).astype(int)
        pred_bin = (pred != 0).astype(int)
        bin_f1s.append(f1_score(yte_bin, pred_bin, zero_division=0))

        rep = classification_report(yte, pred, labels=labels, output_dict=True, zero_division=0)
        for lbl in labels:
            per_class_f1[int(lbl)].append(rep[str(int(lbl))]["f1-score"])

    return {
        "macro_f1_mean": float(np.nanmean(macro_f1s)),
        "macro_f1_std":  float(np.nanstd(macro_f1s)),
        "major_macro_f1_mean": float(np.nanmean(major_f1s)),
        "fault_vs_normal_f1_mean": float(np.nanmean(bin_f1s)),
        "per_class_f1_mean": {k: float(np.nanmean(v)) for k, v in per_class_f1.items()},
    }

def evaluate_on_holdout(model, Xtr, ytr, Xho, yho, major_min_support=10):
    labels = np.sort(pd.unique(pd.concat([ytr, yho], axis=0)))

    m = clone(model)
    m.fit(Xtr, ytr)
    pred = m.predict(Xho)

    macro = f1_score(yho, pred, average="macro", labels=labels, zero_division=0)

    major_classes = set(ytr.value_counts()[lambda s: s >= major_min_support].index.astype(int))
    major_labels = np.array(sorted(major_classes)) if len(major_classes) else np.array([], dtype=int)
    if len(major_labels) and yho.isin(major_labels).any():
        major = f1_score(
            yho[yho.isin(major_labels)], pred[yho.isin(major_labels)],
            average="macro", labels=major_labels, zero_division=0
        )
    else:
        major = np.nan

    yho_bin = (yho != 0).astype(int)
    pred_bin = (pred != 0).astype(int)
    bin_f1 = f1_score(yho_bin, pred_bin, zero_division=0)

    report = classification_report(yho, pred, labels=labels, zero_division=0)

    return {
        "macro_f1": float(macro),
        "major_macro_f1": float(major) if np.isfinite(major) else np.nan,
        "fault_vs_normal_f1": float(bin_f1),
        "classification_report": report,
    }

# ============================================================
# NEW: A) Repeated group holdout evaluation (30 splits)
# ============================================================
def repeated_holdout(model, X, y, groups, repeats=30, test_size=0.2, random_state=42):
    labels = np.sort(pd.unique(y))
    gss = GroupShuffleSplit(n_splits=repeats, test_size=test_size, random_state=random_state)

    macs, bins = [], []
    for tr, te in gss.split(X, y, groups=groups):
        m = clone(model)
        m.fit(X.iloc[tr], y.iloc[tr])
        pred = m.predict(X.iloc[te])

        macs.append(f1_score(y.iloc[te], pred, average="macro", labels=labels, zero_division=0))
        bins.append(f1_score((y.iloc[te] != 0).astype(int), (pred != 0).astype(int), zero_division=0))

    return {
        "macro_f1_mean": float(np.mean(macs)),
        "macro_f1_std": float(np.std(macs)),
        "fault_vs_normal_f1_mean": float(np.mean(bins)),
        "fault_vs_normal_f1_std": float(np.std(bins)),
    }

# ============================================================
# 9) Models (consistent preprocessing)
#    CHANGE (B): imputer = constant -1 + indicator
# ============================================================
preprocess_common = [
    ("col_filter", TrainOnlyColumnFilter(missing_threshold=0.98)),
    ("imputer", SimpleImputer(strategy="constant", fill_value=-1, add_indicator=True)),
]

logreg = Pipeline(preprocess_common + [
    ("scaler", StandardScaler(with_mean=False)),
    ("clf", LogisticRegression(max_iter=8000, class_weight="balanced"))
])

hgb = Pipeline(preprocess_common + [
    ("clf", WeightedHGBClassifier(
        max_depth=6,
        learning_rate=0.06,
        max_iter=700,
        max_leaf_nodes=31,
        min_samples_leaf=20,
        l2_regularization=0.1,
        random_state=RANDOM_STATE
    ))
])

# ============================================================
# 10) Final holdout by WELL + CV on train only
# ============================================================
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=RANDOM_STATE)
tr_idx, ho_idx = next(gss.split(X, y, groups=groups))

X_tr, y_tr, g_tr = X.iloc[tr_idx], y.iloc[tr_idx], groups.iloc[tr_idx]
X_ho, y_ho, g_ho = X.iloc[ho_idx], y.iloc[ho_idx], groups.iloc[ho_idx]

print("\nTrain wells:", g_tr.nunique(), "| Holdout wells:", g_ho.nunique())
print("Holdout label counts:\n", y_ho.value_counts().sort_index())

min_support = int(y_tr.value_counts().min())
n_splits = min(4, min_support)
n_splits = max(2, n_splits)

print(f"\nUsing StratifiedGroupKFold with n_splits={n_splits} (min class support in train = {min_support})")

print("\n=== CV on TRAIN ONLY ===")
logreg_cv = evaluate_stratified_group_cv(logreg, X_tr, y_tr, g_tr, n_splits=n_splits, random_state=RANDOM_STATE)
hgb_cv    = evaluate_stratified_group_cv(hgb,   X_tr, y_tr, g_tr, n_splits=n_splits, random_state=RANDOM_STATE)

print("LogReg CV:", logreg_cv)
print("HGB   CV:", hgb_cv)

print("\n=== FINAL HOLDOUT (train wells -> holdout wells) ===")
logreg_hold = evaluate_on_holdout(logreg, X_tr, y_tr, X_ho, y_ho)
hgb_hold    = evaluate_on_holdout(hgb,   X_tr, y_tr, X_ho, y_ho)

print("LogReg Holdout:", {k: v for k, v in logreg_hold.items() if k != "classification_report"})
print("HGB   Holdout:", {k: v for k, v in hgb_hold.items() if k != "classification_report"})

print("\nLogReg classification report (holdout):\n", logreg_hold["classification_report"])
print("\nHGB classification report (holdout):\n", hgb_hold["classification_report"])

# ============================================================
# NEW: Stability numbers over many group holdouts
# ============================================================
print("\n=== Repeated Group Holdout (30 splits) ===")
logreg_rep = repeated_holdout(logreg, X, y, groups, repeats=30, test_size=0.2, random_state=RANDOM_STATE)
hgb_rep    = repeated_holdout(hgb,   X, y, groups, repeats=30, test_size=0.2, random_state=RANDOM_STATE)
print("LogReg:", logreg_rep)
print("HGB   :", hgb_rep)

# Optional: save results
with open(f"{OUT_DIR}/results_summary.json", "w") as f:
    json.dump({
        "logreg_cv": logreg_cv,
        "hgb_cv": hgb_cv,
        "logreg_holdout": {k: v for k, v in logreg_hold.items() if k != "classification_report"},
        "hgb_holdout": {k: v for k, v in hgb_hold.items() if k != "classification_report"},
        "logreg_repeated_holdout": logreg_rep,
        "hgb_repeated_holdout": hgb_rep,
        "notes": {
            "leakage_control": [
                "train-only column filtering inside pipeline",
                "final group holdout by well",
                "CV performed on train wells only",
                "major classes computed from y_train per fold",
                "fixed-label macro f1 with zero_division=0",
                "repeated group holdout metrics (mean/std)",
                "imputer uses constant=-1 + indicator (stable missing handling for state-ish features)"
            ]
        }
    }, f, indent=2)

print(f"\nSaved: {OUT_DIR}/results_summary.json")


OUT_DIR: /kaggle/working/3w_prepared_v3_1_fixed
CACHE_PATH: /kaggle/working/3w_prepared_v3_1_fixed/df_ml_well_v3_1_fixed.parquet
Total files: 2228 | WELL files: 1119 | Wells: 40
Class counts:
 event_type_code
0    594
1      4
2     22
3     32
4    343
5     11
6      6
7     36
8     14
9     57
Name: count, dtype: int64


Building WELL dataset:   0%|          | 0/1119 [00:00<?, ?it/s]

Built + saved features: (1119, 286)
X: (1119, 281) | y: (1119,) | wells: 40
Label counts:
 event_type_code
0    594
1      4
2     22
3     32
4    343
5     11
6      6
7     36
8     14
9     57
Name: count, dtype: int64

Train wells: 32 | Holdout wells: 8
Holdout label counts:
 event_type_code
0    82
2     1
4    38
5     4
7     8
8     6
9     4
Name: count, dtype: int64

Using StratifiedGroupKFold with n_splits=4 (min class support in train = 4)

=== CV on TRAIN ONLY ===
LogReg CV: {'macro_f1_mean': 0.20909079493887794, 'macro_f1_std': 0.11127020742202436, 'major_macro_f1_mean': 0.2370150966258085, 'fault_vs_normal_f1_mean': 0.6589056740038925, 'per_class_f1_mean': {0: 0.04375, 1: 0.0, 2: 0.34521616310488823, 3: 0.0, 4: 0.14, 5: 0.0, 6: 0.0, 7: 0.39035087719298245, 8: 0.5, 9: 0.671590909090909}}
HGB   CV: {'macro_f1_mean': 0.37668123626930894, 'macro_f1_std': 0.05555873348647263, 'major_macro_f1_mean': 0.555470091997136, 'fault_vs_normal_f1_mean': 0.8386357605723292, 'per_class_

In [2]:
import os, json
import numpy as np
import pandas as pd
from tqdm.auto import tqdm

from sklearn.model_selection import StratifiedGroupKFold, GroupShuffleSplit
from sklearn.metrics import f1_score
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.utils.class_weight import compute_class_weight

BASE = "/kaggle/input/3w-dataset/2.0.0"
RANDOM_STATE = 42
N_WELL_FILES = None  # None = all WELL files

OUT_DIR = "/kaggle/working/3w_prepared_v3_1"
os.makedirs(OUT_DIR, exist_ok=True)

CACHE_PATH = f"{OUT_DIR}/df_ml_well_v3_1.parquet"
print("OUT_DIR:", OUT_DIR)


OUT_DIR: /kaggle/working/3w_prepared_v3_1


# 3W Dataset (WELL-only) — Row-per-file Classification (v3_1)

## Summary (Engineer Story)
- Built a file index from the 3W dataset and filtered to **WELL files only** (1119 files, 40 wells).
- Cleaned each file: parsed timestamp index, renamed sensors, converted to numeric types.
- Engineered **row-per-file features**:
  - Continuous sensors: raw median/IQR/last + robust z-stats (mean/std/min/max/last) + delta(first→last) + missing ratio.
  - State/valve signals: last state + transition count/rate + time-in-state proportions (+ “other” state).
- Evaluated with **Group splits by `well_id`** to avoid leakage across wells (harder but realistic).
- Compared baseline (Logistic Regression) vs boosting (HistGradientBoosting with class-weighted sample_weight).
- Result: **HGB outperformed baseline**, reaching about **macro-F1 ~0.42 (repeated group shuffle)** and **fault-vs-normal F1 ~0.76**, with variance due to very rare classes.


In [3]:
def build_file_index(base: str) -> pd.DataFrame:
    paths = []
    for root, _, files in os.walk(base):
        for f in files:
            if f.endswith(".parquet"):
                paths.append(os.path.join(root, f))

    df = pd.DataFrame({"path": paths})
    codes = df["path"].str.extract(r"/2\.0\.0/(\d+)/", expand=False)
    df["event_type_code"] = pd.to_numeric(codes, errors="coerce").astype("Int64")
    df = df.dropna(subset=["event_type_code"])
    df["event_type_code"] = df["event_type_code"].astype(int)
    df["file"] = df["path"].str.split("/").str[-1]
    df["source"] = df["file"].str.extract(r"^(WELL|SIMULATED|DRAWN)")
    df["well_id"] = df["file"].str.extract(r"(WELL-\d+)")
    df["run_ts"] = df["file"].str.extract(r"_(\d{14})")
    df["run_ts"] = pd.to_datetime(df["run_ts"], format="%Y%m%d%H%M%S", errors="coerce")
    return df.sort_values(["event_type_code","source","well_id","run_ts"]).reset_index(drop=True)

df_files = build_file_index(BASE)
df_w_files = df_files[df_files["source"]=="WELL"].copy()
assert df_w_files["well_id"].notna().all()


print("Total files:", len(df_files), "| WELL files:", len(df_w_files), "| Wells:", df_w_files["well_id"].nunique())
display(df_w_files["event_type_code"].value_counts().sort_index())


Total files: 2228 | WELL files: 1119 | Wells: 40


event_type_code
0    594
1      4
2     22
3     32
4    343
5     11
6      6
7     36
8     14
9     57
Name: count, dtype: int64

In [4]:
VAR_RENAME = {
    "ABER-CKGL": "gl_choke_opening_pct",
    "ABER-CKP":  "prod_choke_opening_pct",
    "ESTADO-DHSV":   "dhsv_state",
    "ESTADO-M1":     "prod_master_valve_state",
    "ESTADO-M2":     "ann_master_valve_state",
    "ESTADO-PXO":    "pig_crossover_valve_state",
    "ESTADO-SDV-GL": "gl_shutdown_valve_state",
    "ESTADO-SDV-P":  "prod_shutdown_valve_state",
    "ESTADO-W1":     "prod_wing_valve_state",
    "ESTADO-W2":     "ann_wing_valve_state",
    "ESTADO-XO":     "crossover_valve_state",
    "P-ANULAR":     "annulus_pressure_pa",
    "P-JUS-BS":     "svc_pump_downstream_pressure_pa",
    "P-JUS-CKGL":   "gl_choke_downstream_pressure_pa",
    "P-JUS-CKP":    "prod_choke_downstream_pressure_pa",
    "P-MON-CKGL":   "gl_choke_upstream_pressure_pa",
    "P-MON-CKP":    "prod_choke_upstream_pressure_pa",
    "P-MON-SDV-P":  "prod_sdv_upstream_pressure_pa",
    "P-PDG":        "pdg_downhole_pressure_pa",
    "PT-P":         "xmas_tree_prod_line_pressure_pa",
    "P-TPT":        "tpt_pressure_pa",
    "QBS": "svc_pump_flow_m3s",
    "QGL": "gas_lift_flow_m3s",
    "T-JUS-CKP": "prod_choke_downstream_temp_c",
    "T-MON-CKP": "prod_choke_upstream_temp_c",
    "T-PDG":     "pdg_downhole_temp_c",
    "T-TPT":     "tpt_temp_c",
    "class": "class_code",
    "state": "state_code",
}

EVENT_TYPE_CODE_TO_NAME = {
    0:"Normal Operation", 1:"Abrupt Increase of BSW", 2:"Spurious Closure of DHSV",
    3:"Severe Slugging", 4:"Flow Instability", 5:"Rapid Productivity Loss",
    6:"Quick Restriction in PCK", 7:"Scaling in PCK",
    8:"Hydrate in Production Line", 9:"Hydrate in Service Line",
}

LABEL_COLS = {"class_code", "state_code", "class_label", "state_label"}

def clean_3w_instance(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    if "timestamp" in df.columns:
        df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
        df = df.set_index("timestamp")
    else:
        df.index = pd.to_datetime(df.index, errors="coerce")

    df = df[~df.index.isna()].sort_index()
    df.index.name = "timestamp"
    df = df.rename(columns=VAR_RENAME)

    for c in df.columns:
        if c in ("class_code", "state_code"):
            df[c] = pd.to_numeric(df[c], errors="coerce").astype("Int16")
        else:
            df[c] = pd.to_numeric(df[c], errors="coerce").astype("float64")
    return df

def summarize_timeseries_v3_1(df_clean: pd.DataFrame, frac: float = 0.1, state_max: int = 3) -> dict:
    sensors = df_clean.drop(columns=list(LABEL_COLS), errors="ignore")
    num = sensors.select_dtypes(include=[np.number])

    out = {
        "n_obs": int(len(df_clean)),
        "duration_s": float((df_clean.index.max() - df_clean.index.min()).total_seconds())
                      if len(df_clean) else np.nan,
    }
    if num.shape[1] == 0 or len(num) == 0:
        return out

    state_cols = [c for c in num.columns if c.endswith("_state")]
    cont_cols  = [c for c in num.columns if c not in state_cols]

    if len(cont_cols):
        cont = num[cont_cols]
        med_raw = cont.median()
        iqr_raw = (cont.quantile(0.75) - cont.quantile(0.25))

        iqr = iqr_raw.replace(0, np.nan)
        z = (cont - med_raw) / iqr

        k = max(1, int(len(z) * frac))
        first = z.iloc[:k].mean()
        last  = z.iloc[-k:].mean()

        agg = z.agg(["mean","std","min","max"]).T
        miss = cont.isna().mean()

        for col in cont_cols:
            out[f"{col}__raw_median"] = med_raw[col]
            out[f"{col}__raw_iqr"]    = iqr_raw[col]
            s = cont[col].dropna()
            out[f"{col}__raw_last"] = s.iloc[-1] if len(s) else np.nan

            out[f"{col}__z_mean"] = agg.loc[col, "mean"]
            out[f"{col}__z_std"]  = agg.loc[col, "std"]
            out[f"{col}__z_min"]  = agg.loc[col, "min"]
            out[f"{col}__z_max"]  = agg.loc[col, "max"]
            out[f"{col}__z_last"] = z[col].iloc[-1]

            out[f"{col}__delta_last_first"] = (last[col] - first[col])
            out[f"{col}__abs_delta"]        = abs(last[col] - first[col])
            out[f"{col}__missing_frac"]     = miss[col]

    if len(state_cols):
        st = num[state_cols]
        for col in state_cols:
            s = st[col]
            s_non = s.dropna()

            out[f"{col}__missing_frac"] = float(s.isna().mean())
            out[f"{col}__last"] = float(s_non.iloc[-1]) if len(s_non) else np.nan

            if len(s_non) >= 2:
                n_trans = int((s_non != s_non.shift()).sum() - 1)
            else:
                n_trans = 0

            out[f"{col}__n_transitions"] = n_trans
            out[f"{col}__transitions_rate"] = n_trans / max(1, len(s_non))

            known = 0.0
            for v in range(state_max + 1):
                p = float((s_non == v).mean()) if len(s_non) else np.nan
                out[f"{col}__p_state_{v}"] = p
                if not np.isnan(p):
                    known += p
            out[f"{col}__p_state_other"] = (1.0 - known) if len(s_non) else np.nan

    return out


In [5]:
def build_row_per_file_dataset(df_files: pd.DataFrame, n_files: int | None = None, random_state: int = 42) -> pd.DataFrame:
    if n_files is None:
        sample = df_files.reset_index(drop=True)
    else:
        sample = df_files.sample(n_files, random_state=random_state).reset_index(drop=True)

    rows = []
    for _, r in tqdm(sample.iterrows(), total=len(sample), desc="Building WELL dataset"):
        df_raw = pd.read_parquet(r["path"])
        df_clean = clean_3w_instance(df_raw)

        feats = summarize_timeseries_v3_1(df_clean)
        feats["event_type_code"] = int(r["event_type_code"])
        feats["event_type_name"] = EVENT_TYPE_CODE_TO_NAME.get(int(r["event_type_code"]), "Unknown")
        feats["well_id"] = r["well_id"]
        feats["run_ts"] = r["run_ts"]
        feats["file"] = r["file"]
        rows.append(feats)

    return pd.DataFrame(rows)

if os.path.exists(CACHE_PATH):
    df_ml_well = pd.read_parquet(CACHE_PATH)
    print("Loaded cached features:", df_ml_well.shape)
else:
    df_ml_well = build_row_per_file_dataset(df_w_files, n_files=N_WELL_FILES, random_state=RANDOM_STATE)
    df_ml_well.to_parquet(CACHE_PATH, index=False)
    print("Built + saved features:", df_ml_well.shape)

with open(f"{OUT_DIR}/dataset_config.json", "w") as f:
    json.dump({
        "base": BASE,
        "random_state": RANDOM_STATE,
        "n_well_files_used": int(len(df_ml_well)),
        "features_version": "row_per_file_v3_1",
        "notes": "continuous: raw median/iqr/last + robust-z stats + deltas; state: last + transitions + proportions (+other); WELL-only"
    }, f, indent=2)


Building WELL dataset:   0%|          | 0/1119 [00:00<?, ?it/s]

Built + saved features: (1119, 286)


In [6]:
def make_Xy_groups(df: pd.DataFrame):
    y = df["event_type_code"].copy()
    groups = df["well_id"].copy()

    drop_cols = ["event_type_code","event_type_name","file","run_ts","well_id"]
    X = df.drop(columns=drop_cols, errors="ignore").copy()

    X = X.replace([np.inf, -np.inf], np.nan)

    X = X.drop(columns=X.columns[X.isna().all()])          # all-missing
    miss = X.isna().mean()
    X = X.drop(columns=miss[miss > 0.98].index)            # very high missing
    const_cols = [c for c in X.columns if X[c].nunique(dropna=True) <= 1]
    X = X.drop(columns=const_cols)                          # constants

    return X, y, groups

X, y, groups = make_Xy_groups(df_ml_well)
print("X:", X.shape, "| y:", y.shape, "| wells:", groups.nunique())
display(y.value_counts().sort_index())


X: (1119, 182) | y: (1119,) | wells: 40


event_type_code
0    594
1      4
2     22
3     32
4    343
5     11
6      6
7     36
8     14
9     57
Name: count, dtype: int64

In [7]:
def eval_repeated_group_shuffle(model, X, y, groups, repeats=30, test_size=0.2):
    gss = GroupShuffleSplit(n_splits=repeats, test_size=test_size, random_state=42)
    major_classes = set(y.value_counts()[lambda s: s >= 10].index)

    f1s, f1_major, f1_bin = [], [], []
    for tr, te in gss.split(X, y, groups=groups):
        Xtr, Xte = X.iloc[tr], X.iloc[te]
        ytr, yte = y.iloc[tr], y.iloc[te]

        model.fit(Xtr, ytr)
        pred = model.predict(Xte)

        f1s.append(f1_score(yte, pred, average="macro"))

        mask = yte.isin(major_classes)
        f1_major.append(f1_score(yte[mask], pred[mask], average="macro") if mask.sum() else np.nan)

        yte_bin = (yte != 0).astype(int)
        pred_bin = (pred != 0).astype(int)
        f1_bin.append(f1_score(yte_bin, pred_bin))

    return {
        "macro_f1_mean": float(np.nanmean(f1s)),
        "macro_f1_std":  float(np.nanstd(f1s)),
        "major_macro_f1_mean": float(np.nanmean(f1_major)),
        "fault_vs_normal_f1_mean": float(np.nanmean(f1_bin)),
    }

logreg = Pipeline([
    ("imputer", SimpleImputer(strategy="constant", fill_value=0, add_indicator=True)),
    ("scaler", StandardScaler(with_mean=False)),
    ("clf", LogisticRegression(max_iter=8000, class_weight="balanced"))
])

class HGBWrapper:
    def __init__(self,
                 max_depth=6,
                 learning_rate=0.06,
                 max_iter=700,
                 max_leaf_nodes=31,
                 min_samples_leaf=20,
                 l2_regularization=0.1):
        self.clf = HistGradientBoostingClassifier(
            max_depth=max_depth,
            learning_rate=learning_rate,
            max_iter=max_iter,
            max_leaf_nodes=max_leaf_nodes,
            min_samples_leaf=min_samples_leaf,
            l2_regularization=l2_regularization
        )

    def fit(self, X, y):
        classes = np.unique(y)
        cw = compute_class_weight(class_weight="balanced", classes=classes, y=y)
        w = pd.Series(y).map(dict(zip(classes, cw))).to_numpy()
        self.clf.fit(X, y, sample_weight=w)
        return self

    def predict(self, X):
        return self.clf.predict(X.fillna(0))

hgb = HGBWrapper()

print("LogReg:", eval_repeated_group_shuffle(logreg, X, y, groups, repeats=30))
print("HGB   :", eval_repeated_group_shuffle(hgb, X, y, groups, repeats=30))


LogReg: {'macro_f1_mean': 0.22398705848338343, 'macro_f1_std': 0.09435939619898277, 'major_macro_f1_mean': 0.24794970780289896, 'fault_vs_normal_f1_mean': 0.6073600789459409}
HGB   : {'macro_f1_mean': 0.4331954202538017, 'macro_f1_std': 0.15897862328939008, 'major_macro_f1_mean': 0.4594612367402339, 'fault_vs_normal_f1_mean': 0.8007543910780439}


## Results (Current)
- Logistic Regression (repeated group shuffle, 30): macro-F1 ≈ 0.22, fault-vs-normal F1 ≈ 0.61
- HistGradientBoosting (repeated group shuffle, 30): macro-F1 ≈ 0.42, fault-vs-normal F1 ≈ 0.76
- Note: Macro-F1 is unstable because some classes have very few samples (e.g., class 1 has 4 files).
