In [7]:
import os
import re
import sys
import subprocess
from typing import Tuple, List, Dict

EXCEL_PATH = "C:/Users/chinw/Downloads/K-FOLD_Chinesetallow_AceticAcid.xlsx"
SHEET_NAME = "Sheet1"
TIMESTEPS = 6                 # shorter windows more samples on small data
THRESHOLD_FIXED = 470.0       
REQUESTED_FOLDS = 5           
EPOCHS = 50
PATIENCE = 10
UNITS = 50
DROPOUT = 0.3
LR = 1e-3
BATCH = 16
SEED = 47

def _ensure_openpyxl():
    try:
        import openpyxl  
    except Exception:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", "openpyxl"])
        import importlib; importlib.invalidate_caches()
        import openpyxl 
_ensure_openpyxl()

import numpy as np
import pandas as pd

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import (
    accuracy_score, balanced_accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, average_precision_score, confusion_matrix
)

import tensorflow as tf
from tensorflow.keras import Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras import backend as K

In [9]:
def load_and_tidy(path: str, sheet: str) -> pd.DataFrame:
    if not os.path.exists(path):
        raise FileNotFoundError(f"Excel not found: {path}")
    raw = pd.read_excel(path, sheet_name=sheet, engine="openpyxl")

    labels = raw.iloc[:, 0].ffill()
    date_row = pd.to_datetime(raw.iloc[0, 1:].values, errors="coerce")
    values = raw.iloc[1:, 1:]
    values.index = labels.iloc[1:]
    values.columns = date_row

    plant_re = re.compile(r"sapium", re.IGNORECASE)
    metrics = {"LIGHT", "MOISTURE", "TEMPERATURE", "FERTILITY"}

    blocks = []
    cur = {"Plant": None, "LIGHT": None, "MOISTURE": None, "TEMPERATURE": None, "FERTILITY": None}
    for label, row in values.iterrows():
        if isinstance(label, str) and plant_re.search(label):
            if cur["Plant"] and cur["MOISTURE"] is not None and cur["FERTILITY"] is not None:
                blocks.append(cur)
            cur = {"Plant": label, "LIGHT": None, "MOISTURE": None, "TEMPERATURE": None, "FERTILITY": None}
        elif label in metrics:
            cur[label] = pd.to_numeric(row, errors="coerce")
    if cur["Plant"] and cur["MOISTURE"] is not None and cur["FERTILITY"] is not None:
        blocks.append(cur)

    if not blocks:
        raise ValueError("No plant blocks with MOISTURE and FERTILITY found.")

    frames = []
    for b in blocks:
        frames.append(pd.DataFrame({
            "Plant": b["Plant"],
            "Date": b["MOISTURE"].index,
            "Moisture": b["MOISTURE"].values,
            "Fertility": b["FERTILITY"].values
        }))
    ts = pd.concat(frames, ignore_index=True)
    ts = ts.dropna(subset=["Moisture", "Fertility"]).sort_values("Date").reset_index(drop=True)
    return ts

In [11]:
def build_variation_scaled(df: pd.DataFrame) -> np.ndarray:
    v = np.abs((df["Moisture"].values - df["Fertility"].values).astype(float))
    return MinMaxScaler().fit_transform(v.reshape(-1, 1)).ravel()

def build_windows(series_1d: np.ndarray, labels_1d: np.ndarray, T: int) -> Tuple[np.ndarray, np.ndarray]:
    X, y = [], []
    for t in range(T - 1, len(series_1d)):
        X.append(series_1d[t - T + 1: t + 1].reshape(T, 1))
        y.append(labels_1d[t])  # label at window end
    X = np.asarray(X, np.float32)
    y = np.asarray(y)
    if X.size == 0:
        raise ValueError("No windows built; reduce TIMESTEPS.")
    return X, y

In [13]:
def build_model(timesteps: int) -> Sequential:
    # Input layer avoids Keras warning
    m = Sequential([
        Input(shape=(timesteps, 1)),
        LSTM(50),
        Dropout(0.3),  # why: regularization on small data
        Dense(1, activation='sigmoid')
    ])
    m.compile(optimizer=Adam(learning_rate=LR), loss='binary_crossentropy', metrics=['accuracy'])
    return m

def class_weights(y: np.ndarray) -> Dict[int, float]:
    c0 = int((y == 0).sum()); c1 = int((y == 1).sum())
    tot = c0 + c1
    return {0: tot / (2 * max(c0, 1)), 1: tot / (2 * max(c1, 1))}

In [15]:
def metrics_with_cm(y_true: np.ndarray, y_prob: np.ndarray) -> Dict[str, float]:
    y_pred = (y_prob >= 0.5).astype(int)
    out = {
        "acc": accuracy_score(y_true, y_pred),
        "bacc": balanced_accuracy_score(y_true, y_pred),
        "prec_macro": precision_score(y_true, y_pred, average="macro", zero_division=0),
        "rec_macro":  recall_score(y_true, y_pred, average="macro", zero_division=0),
        "f1_macro":   f1_score(y_true, y_pred, average="macro", zero_division=0),
        "prec_weighted": precision_score(y_true, y_pred, average="weighted", zero_division=0),
        "rec_weighted":  recall_score(y_true, y_pred, average="weighted", zero_division=0),
        "f1_weighted":   f1_score(y_true, y_pred, average="weighted", zero_division=0),
        "ap":  average_precision_score(y_true, y_prob),
    }
    try:
        out["auc"] = roc_auc_score(y_true, y_prob)
    except ValueError:
        out["auc"] = float("nan")
    out["cm"] = confusion_matrix(y_true, y_pred, labels=[0, 1])
    return out

In [17]:
def main():
    os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
    np.random.seed(SEED); tf.random.set_seed(SEED)

    # Data
    ts = load_and_tidy(EXCEL_PATH, SHEET_NAME)
    var = build_variation_scaled(ts)
    fert = ts["Fertility"].values.astype(float)

    # Windows over the whole data
    X_all, fert_end = build_windows(var, fert, TIMESTEPS)
    print(f"Windows built: X={X_all.shape}, end-point positives@{THRESHOLD_FIXED}={(fert_end>THRESHOLD_FIXED).sum()}, total={len(fert_end)}")

    # Classification target: try fixed threshold, else fallback to median
    y_all = (fert_end > THRESHOLD_FIXED).astype(int)
    if np.unique(y_all).size < 2:
        thr = float(np.median(fert_end))
        y_all = (fert_end > thr).astype(int)
        print(f"WARNING: 470 produced a single class. Falling back to median threshold: {thr:.3f}")
    else:
        thr = THRESHOLD_FIXED
    print(f"Using threshold: {thr:.3f}; positives={int(y_all.sum())}, negatives={int((y_all==0).sum())}")

    # K-fold (stratified) with safe k
    min_class = int(np.bincount(y_all, minlength=2).min())
    if min_class < 2:
        raise ValueError("Not enough samples of one class after windowing. Reduce TIMESTEPS or collect more data.")
    k = max(2, min(REQUESTED_FOLDS, min_class))
    print(f"Stratified K-fold k={k}")

    skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=SEED)

    metrics_list: List[Dict] = []
    agg_cm = np.zeros((2, 2), dtype=int)

    for fold, (tr, va) in enumerate(skf.split(X_all, y_all), start=1):
        X_tr, y_tr = X_all[tr], y_all[tr]
        X_va, y_va = X_all[va], y_all[va]

        model = build_model(TIMESTEPS)
        es = EarlyStopping(monitor="val_loss", patience=PATIENCE, restore_best_weights=True)
        model.fit(X_tr, y_tr,
                  validation_data=(X_va, y_va),
                  epochs=EPOCHS, batch_size=BATCH,
                  class_weight=class_weights(y_tr),
                  callbacks=[es], verbose=0)

        y_prob = model.predict(X_va, verbose=0).ravel()
        m = metrics_with_cm(y_va, y_prob)
        agg_cm += m["cm"]

        print(f"[Fold {fold}] Acc={m['acc']:.3f} BAcc={m['bacc']:.3f} "
              f"F1(macro)={m['f1_macro']:.3f} F1(wtd)={m['f1_weighted']:.3f} "
              f"AP={m['ap']:.3f} AUC={m['auc'] if not np.isnan(m['auc']) else 'na'}")
        # Uncomment to see each fold's CM
        # print("CM:\n", m["cm"])

        metrics_list.append({k: v for k, v in m.items() if k != "cm"})
        K.clear_session()

    # Averages across folds (ignore NaN AUC)
    keys = ["acc","bacc","prec_macro","rec_macro","f1_macro","prec_weighted","rec_weighted","f1_weighted","ap","auc"]
    means = {k: float(np.nanmean([m[k] for m in metrics_list])) for k in keys}
    print("\n=== Mean metrics across folds ===")
    for k_, v in means.items():
        print(f"{k_}: {v:.4f}" if not np.isnan(v) else f"{k_}: na")

    print("\n=== Aggregate Confusion Matrix (sum over folds; labels=[0,1]) ===")
    print(agg_cm)

if __name__ == "__main__":
    main()

Windows built: X=(40, 6, 1), end-point positives@470.0=0, total=40
Using threshold: 231.604; positives=20, negatives=20
Stratified K-fold k=5
[Fold 1] Acc=0.750 BAcc=0.750 F1(macro)=0.733 F1(wtd)=0.733 AP=1.000 AUC=1.0

[Fold 2] Acc=0.750 BAcc=0.750 F1(macro)=0.750 F1(wtd)=0.750 AP=0.646 AUC=0.6875
[Fold 3] Acc=1.000 BAcc=1.000 F1(macro)=1.000 F1(wtd)=1.000 AP=1.000 AUC=1.0
[Fold 4] Acc=0.375 BAcc=0.375 F1(macro)=0.273 F1(wtd)=0.273 AP=0.729 AUC=0.625
[Fold 5] Acc=0.500 BAcc=0.500 F1(macro)=0.467 F1(wtd)=0.467 AP=0.747 AUC=0.6875

=== Mean metrics across folds ===
acc: 0.6750
bacc: 0.6750
prec_macro: 0.6595
rec_macro: 0.6750
f1_macro: 0.6445
prec_weighted: 0.6595
rec_weighted: 0.6750
f1_weighted: 0.6445
ap: 0.8244
auc: 0.8000

=== Aggregate Confusion Matrix (sum over folds; labels=[0,1]) ===
[[10 10]
 [ 3 17]]
