In [8]:
import numpy as np
# import warnings
# warnings.filterwarnings("ignore")

from typing import Dict, Tuple
from dataclasses import dataclass

from sklearn.model_selection import StratifiedKFold, cross_validate
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.metrics import make_scorer, roc_auc_score, f1_score, accuracy_score
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, HistGradientBoostingClassifier
from sklearn.neighbors import KNeighborsClassifier
import pandas as pd


In [9]:
@dataclass
class TrainResult:
    best_name: str
    best_model: object      # fitted sklearn estimator (Pipeline)
    cv_scores: pd.DataFrame # per-model cross-val summary

In [10]:
def _prepare_features(X0: np.ndarray, X1: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    Flatten (n, t, c) -> (n, t*c) and build labels.
    """
    if X0.ndim != 3 or X1.ndim != 3:
        raise ValueError("X0 and X1 must be 3D arrays of shape (n_samples, t, c).")
    if X0.shape[1:] != X1.shape[1:]:
        raise ValueError("X0 and X1 must share the same (t, c) shape.")
    X = np.concatenate([X0.reshape(X0.shape[0], -1), X1.reshape(X1.shape[0], -1)], axis=0)
    y = np.concatenate([np.zeros(X0.shape[0], dtype=int), np.ones(X1.shape[0], dtype=int)], axis=0)
    return X, y

In [None]:
# import numpy as np
# import warnings
# warnings.filterwarnings("ignore")

# from typing import Dict, Tuple
# from dataclasses import dataclass

# from sklearn.model_selection import StratifiedKFold, cross_validate
# from sklearn.pipeline import Pipeline
# from sklearn.preprocessing import StandardScaler
# from sklearn.decomposition import PCA
# from sklearn.metrics import make_scorer, roc_auc_score, f1_score, accuracy_score
# from sklearn.linear_model import LogisticRegression
# from sklearn.svm import SVC
# from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, HistGradientBoostingClassifier
# from sklearn.neighbors import KNeighborsClassifier
# import pandas as pd

@dataclass
class TrainResult:
    best_name: str
    best_model: object      # fitted sklearn estimator (Pipeline)
    cv_scores: pd.DataFrame # per-model cross-val summary


def _prepare_features(X0: np.ndarray, X1: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    Flatten (n, t, c) -> (n, t*c) and build labels.
    """
    if X0.ndim != 3 or X1.ndim != 3:
        raise ValueError("X0 and X1 must be 3D arrays of shape (n_samples, t, c).")
    if X0.shape[1:] != X1.shape[1:]:
        raise ValueError("X0 and X1 must share the same (t, c) shape.")
    X = np.concatenate([X0.reshape(X0.shape[0], -1), X1.reshape(X1.shape[0], -1)], axis=0)
    y = np.concatenate([np.zeros(X0.shape[0], dtype=int), np.ones(X1.shape[0], dtype=int)], axis=0)
    return X, y


def train_models(
    X0: np.ndarray,
    X1: np.ndarray,
    random_state: int = 42,
    use_pca: bool = True,
    pca_max_components: int = 128,
    cv_splits: int = 5,
) -> TrainResult:
    """
    Trains several classifiers and selects the best by mean ROC-AUC via stratified CV.
    """

    X, y = _prepare_features(X0, X1)

    # Optional PCA (keeps min(pca_max_components, n_features, n_samples-1) comps)
    n_features = X.shape[1]
    n_samples = X.shape[0]
    n_pca = None
    if use_pca:
        n_pca = max(2, min(pca_max_components, n_features, max(2, n_samples - 1)))

    # Define candidate models (balanced where it makes sense)
    candidates: Dict[str, Pipeline] = {}

    def base_steps(needs_scaler=True, add_pca=(n_pca is not None)):
        steps = []
        if needs_scaler:
            steps.append(("scaler", StandardScaler(with_mean=True)))
        if add_pca:
            steps.append(("pca", PCA(n_components=n_pca, random_state=random_state)))
        return steps

    candidates["logreg"] = Pipeline([
        *base_steps(needs_scaler=True),
        ("clf", LogisticRegression(max_iter=2000, class_weight="balanced", random_state=random_state))
    ])

    candidates["svc_rbf"] = Pipeline([
        *base_steps(needs_scaler=True),
        ("clf", SVC(kernel="rbf", probability=True, class_weight="balanced", random_state=random_state))
    ])

    candidates["knn"] = Pipeline([
        *base_steps(needs_scaler=True),
        ("clf", KNeighborsClassifier(n_neighbors=5))
    ])

    # Tree-based models don't need scaling; PCA is optional but often not helpfulâ€”skip it.
    candidates["rf"] = Pipeline([
        ("clf", RandomForestClassifier(
            n_estimators=300, max_depth=None, n_jobs=-1, class_weight="balanced_subsample", random_state=random_state))
    ])

    candidates["gb"] = Pipeline([
        ("clf", GradientBoostingClassifier(random_state=random_state))
    ])

    try:
        candidates["hgb"] = Pipeline([
            ("clf", HistGradientBoostingClassifier(random_state=random_state))
        ])
    except Exception:
        pass  # older sklearn versions may not have HGB

    # Scoring set
    scoring = {
        "roc_auc": "roc_auc",
        "f1": make_scorer(f1_score),
        "acc": make_scorer(accuracy_score),
    }

    cv = StratifiedKFold(n_splits=cv_splits, shuffle=True, random_state=random_state)

    rows = []
    fitted_best = None
    best_name = None
    best_auc = -np.inf

    for name, model in candidates.items():
        cv_res = cross_validate(model, X, y, cv=cv, scoring=scoring, n_jobs=-1, return_estimator=False)
        auc_mean = float(np.mean(cv_res["test_roc_auc"]))
        f1_mean = float(np.mean(cv_res["test_f1"]))
        acc_mean = float(np.mean(cv_res["test_acc"]))
        rows.append({"model": name, "roc_auc": auc_mean, "f1": f1_mean, "accuracy": acc_mean})

        if auc_mean > best_auc:
            best_auc = auc_mean
            best_name = name
        
        print(name, model, auc_mean, f1_mean, acc_mean)

    scores_df = pd.DataFrame(rows).sort_values("roc_auc", ascending=False).reset_index(drop=True)

    # Fit the best model on the full training set
    best_model = candidates[best_name]
    best_model.fit(X, y)

    return TrainResult(best_name=best_name, best_model=best_model, cv_scores=scores_df)


def evaluate_on_test(best_model, X_test: np.ndarray, y_test: np.ndarray) -> Dict[str, float]:
    """
    Evaluate a fitted model on separately supplied test data shaped (n_samples, t, c).
    """
    if X_test.ndim != 3:
        raise ValueError("X_test must be 3D (n_samples, t, c).")
    X_test_flat = X_test.reshape(X_test.shape[0], -1)

    y_pred = best_model.predict(X_test_flat)
    # Use predict_proba if available for AUC; otherwise decision_function
    if hasattr(best_model, "predict_proba"):
        y_proba = best_model.predict_proba(X_test_flat)[:, 1]
    else:
        # Some pipelines (e.g., SVC without prob) still have decision_function
        if hasattr(best_model, "decision_function"):
            scores = best_model.decision_function(X_test_flat)
            # Scale to [0,1] for AUC if decision_function is unbounded
            from sklearn.preprocessing import MinMaxScaler
            y_proba = MinMaxScaler().fit_transform(scores.reshape(-1, 1)).ravel()
        else:
            # Fallback: cast preds to proba-like 0/1
            y_proba = y_pred.astype(float)

    return {
        "roc_auc": float(roc_auc_score(y_test, y_proba)),
        "f1": float(f1_score(y_test, y_pred)),
        "accuracy": float(accuracy_score(y_test, y_pred)),
    }


# ------------------------------
# Example usage (remove/modify as needed)
if __name__ == "__main__":
    months = [1,2,3,4,5,6,7,11,12]
    mean = np.zeros((1,9,13)) 
    std  = np.zeros((1,9,13)) 
    for i,m in enumerate(months):
        a = np.load(f"Lebanon/preprocessed_data/meta/2020_{m}.npz")
        mean[0,i] = a["mean"]
        std[0,i] = a["std"]

    # rng = np.random.default_rng(0)

    # Dummy shapes: (n, t, c)
    # X0 = rng.normal(size=(120, 20, 4))  # class 0
    # X1 = rng.normal(loc=0.4, size=(160, 20, 4))  # class 1 (slightly shifted mean)
    count = 5
    X0 = np.concatenate([(np.load(r"split_processed_data\nonwheat\0.npy") - mean)/std for i in range(count)])
    X1 = np.concatenate([(np.load(r"split_processed_data\wheat\0.npy") - mean)/std for i in range(count)])

    result = train_models(X0, X1, random_state=313, use_pca=False, pca_max_components=64, cv_splits=5)
    # result = train_models(X0, X1, random_state=0, use_pca=True, pca_max_components=64, cv_splits=5)
    print("Best model:", result.best_name)
    print(result.cv_scores)

    # # Simulated separate test set
    # X0_te = rng.normal(size=(40, 20, 4))
    # X1_te = rng.normal(loc=0.4, size=(60, 20, 4))
    # X_test = np.concatenate([X0_te, X1_te], axis=0)
    # y_test = np.concatenate([np.zeros(len(X0_te)), np.ones(len(X1_te))]).astype(int)

    # test_metrics = evaluate_on_test(result.best_model, X_test, y_test)
    # print("Test metrics:", test_metrics)


logreg Pipeline(steps=[('scaler', StandardScaler()),
                ('clf',
                 LogisticRegression(class_weight='balanced', max_iter=2000,
                                    random_state=313))]) 0.993557118 0.9653878922092108 0.9655100000000001


In [17]:
import pickle
with open("results.pkl1", "wb") as f:
    pickle.dump(result,f)

In [18]:
import numpy as np
months = [1,2,3,4,5,6,7,11,12]
mean = np.zeros((1,9,13)) 
std  = np.zeros((1,9,13)) 
for i,m in enumerate(months):
    a = np.load(f"Lebanon/preprocessed_data/meta/2020_{m}.npz")
    mean[0,i] = a["mean"]
    std[0,i] = a["std"]

In [20]:
np.save("mean.npy", mean)
np.save("std.npy", std)

In [14]:
mean.shape

(1, 9, 13)

In [15]:
np.load(r"split_processed_data\nonwheat\0.npy").shape

(40000, 9, 13)

In [21]:
# Simulated separate test set
if __name__ == "__main__":
    X0_te = (np.load(r"split_processed_data\nonwheat\1.npy")- mean)/std
    X1_te = (np.load(r"split_processed_data\wheat\1.npy")- mean)/std
    # X0_te = rng.normal(size=(40, 20, 4))
    # X1_te = rng.normal(loc=0.4, size=(60, 20, 4))
    X_test = np.concatenate([X0_te, X1_te], axis=0)
    y_test = np.concatenate([np.zeros(len(X0_te)), np.ones(len(X1_te))]).astype(int)

    test_metrics = evaluate_on_test(result.best_model, X_test, y_test)
    print("Test metrics:", test_metrics)

Test metrics: {'roc_auc': 0.9984333709375, 'f1': 0.9831168504886566, 'accuracy': 0.9832}


In [32]:
from pathlib import Path
import os
from tqdm.auto import tqdm
split_path = Path("split_processed_data")
wpath = split_path/"wheat"
nwpath = split_path/"nonwheat"

In [31]:
confm = np.zeros((2,2))
for i,f in tqdm(enumerate(os.listdir(wpath))):
    if i==0:
        continue
    X1_te = (np.load(wpath/f)- mean)/std
    X1_te = X1_te.reshape(X1_te.shape[0], -1)
    # print(X1_te.shape)
    y_p = result.best_model.predict(X1_te)
    # print(y_p)
    confm[1,1] += y_p.sum()
    confm[1,0] += (y_p==0).sum()

for i,f in tqdm(enumerate(os.listdir(nwpath))):
    if i==0:
        continue
    X1_te = (np.load(nwpath/f)- mean)/std
    X1_te = X1_te.reshape(X1_te.shape[0], -1)
    y_p = result.best_model.predict(X1_te)
    confm[0,1] += y_p.sum()
    confm[0,0] += (y_p==0).sum()

45it [00:05,  7.62it/s]
298it [00:48,  6.16it/s]


In [33]:
confm

array([[11287665.,   554483.],
       [  317363.,  1408388.]])

In [34]:
f1 = 2*confm[1,1]/(2*confm[1,1] + confm[1,0] + confm[0,1])
f1

np.float64(0.7636391042508558)