In [7]:
import json
import warnings
from pathlib import Path

warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import StratifiedKFold, TimeSeriesSplit, train_test_split, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.tree import DecisionTreeClassifier
import joblib

In [8]:
# Optional XGBoost
try:
    from xgboost import XGBClassifier

    XGBOOST_AVAILABLE = True
except Exception:
    XGBOOST_AVAILABLE = False

In [9]:
#Configuration

CSV_FILE = "UGANDA_COFFEE_ULTIMATE_DATASET_1961_2024.csv"  # expected CSV file path (same dir)
DATE_COL = "Year"
TARGET_CONT = "Yield_kgha"  # continuous target used to derive classes
# Features to include in modeling. These are domain-relevant (climate, production, NDVI, incidence).
FEATURES_KEEP = [
    "Production_1000t",
    "Area_1000ha",
    "Rainfall_mm",
    "Temp_C",
    "NDVI",
    "Bearing_Trees_Millions",
    "ONI_lagged9",
    "Incidence_Index_pct",
]
RANDOM_STATE = 42

In [10]:
# Output directories
RESULTS_DIR = Path("classification_results")
MODELS_DIR = RESULTS_DIR / "models"
RESULTS_DIR.mkdir(exist_ok=True)
MODELS_DIR.mkdir(exist_ok=True)

In [11]:
#Utility functions 
def create_yield_classes(series: pd.Series, method: str = "tertile"):
    # Convert continuous yield values into categorical classes using quantile based approach
    if method == "tertile":
        labels = ["Low", "Medium", "High"]
        # pd.qcut ensures approximately equal counts per class 
        return pd.qcut(series, q=3, labels=labels)
    elif method == "custom":
        # Example thresholds — edit to match domain knowledge if available:
        bins = [-float("inf"), 500, 1500, float("inf")]
        labels = ["Low", "Medium", "High"]
        return pd.cut(series, bins=bins, labels=labels)
    else:
        raise ValueError("unknown method for create_yield_classes")


def get_classifiers(random_state=RANDOM_STATE):
  
    clfs = {
        "LogisticRegression": Pipeline(
            [
                # scale numeric features before LR to improve numerical stability and interpretability
                ("scaler", StandardScaler()),
                ("clf", LogisticRegression(max_iter=2000, random_state=random_state)),
            ]
        ),
        "DecisionTree": Pipeline(
            [
                # DecisionTree doesn't require scaling so we omit scaler to keep a minimal pipeline
                ("clf", DecisionTreeClassifier(random_state=random_state)),
            ]
        ),
        "RandomForest": Pipeline(
            [
                ("clf", RandomForestClassifier(n_estimators=200, random_state=random_state)),
            ]
        ),
    }
    if XGBOOST_AVAILABLE:
        clfs["XGBoost"] = Pipeline(
            [
                ("clf", XGBClassifier(use_label_encoder=False, eval_metric="mlogloss", random_state=random_state)),
            ]
        )
    return clfs


def evaluate_stratified_cv(clf, X, y_enc, cv=5):
    
   # Evaluate a classifier using stratified K-fold CV with both accuracy and F1_macro.We use StratifiedKFold to preserve class balance in each fold.
    
    skf = StratifiedKFold(n_splits=cv, shuffle=True, random_state=RANDOM_STATE)
    # cross_val_score runs fit/predict under the hood and returns per-fold scores
    acc_scores = cross_val_score(clf, X, y_enc, cv=skf, scoring="accuracy", n_jobs=-1)
    f1_scores = cross_val_score(clf, X, y_enc, cv=skf, scoring="f1_macro", n_jobs=-1)
    return {"cv_accuracy_mean": float(acc_scores.mean()), "cv_f1_macro_mean": float(f1_scores.mean())}


def evaluate_holdout(clf, X_train, X_test, y_train_enc, y_test_enc):
    
    #Train clf on X_train and evaluate on X_test.Returns a small dict of metrics and the predicted labels (ypred).
    clf.fit(X_train, y_train_enc)
    ypred = clf.predict(X_test)
    return {
        "test_accuracy": float(accuracy_score(y_test_enc, ypred)),
        "test_f1_macro": float(f1_score(y_test_enc, ypred, average="macro")),
    }, ypred


def evaluate_time_series(clf, X_df, y_enc, n_splits=4):
    
    #TimeSeriesSplit evaluation:Splits data by index order,trains on past folds and evaluates on subsequent folds.

    tscv = TimeSeriesSplit(n_splits=n_splits)
    accs = []
    f1s = []
    # TimeSeriesSplit yields indices that progress forward — ensure chronology
    for train_idx, test_idx in tscv.split(X_df):
        Xtr, Xte = X_df.iloc[train_idx], X_df.iloc[test_idx]
        ytr, yte = y_enc[train_idx], y_enc[test_idx]
        # Fit on only the training chunk (past) and evaluate on the test chunk (future)
        clf.fit(Xtr, ytr)
        ypred = clf.predict(Xte)
        accs.append(accuracy_score(yte, ypred))
        f1s.append(f1_score(yte, ypred, average="macro"))
    return {"ts_accuracy_mean": float(np.mean(accs)), "ts_f1_macro_mean": float(np.mean(f1s))}



In [12]:
def main():
    
    if not Path(CSV_FILE).exists():
        raise FileNotFoundError(
            f"{CSV_FILE} not found. Place the dataset CSV in the same directory as this script and re-run."
        )

    # Load data and ensure chronological order )
    df = pd.read_csv(CSV_FILE)
    df = df.sort_values(DATE_COL).reset_index(drop=True)

    # Convert non-year columns to numeric (coerce errors -> NaN). This ensures types are correct.
    for c in df.columns:
        if c != DATE_COL:
            df[c] = pd.to_numeric(df[c], errors="coerce")

    # Create categorical yield target
    df = df.copy()
    df["Yield_class"] = create_yield_classes(df[TARGET_CONT], method="tertile")

    # Pick features and drop rows with missing values for simplicity.
    
    features = [f for f in FEATURES_KEEP if f in df.columns]
    model_df = df[[DATE_COL] + features + ["Yield_class"]].dropna().reset_index(drop=True)

    # Warn if dataset becomes small after dropping missing values
    if model_df.shape[0] < 20:
        print("Warning: small dataset after dropping missing values:", model_df.shape)

    # Feature matrix and target (categorical)
    X = model_df[features].copy()
    y = model_df["Yield_class"].copy()

    # Label-encode target to integers for classifiers that require numeric labels
    le = LabelEncoder()
    y_enc = le.fit_transform(y)
    label_map = {int(i): str(c) for i, c in enumerate(le.classes_)}

    # Stratified hold-out split (random). This is a common evaluation but can leak time information.
    # If forecasting is the goal, prefer TimeSeriesSplit for final evaluation.
    X_train, X_test, y_train_enc, y_test_enc = train_test_split(
        X, y_enc, test_size=0.2, random_state=RANDOM_STATE, stratify=y_enc
    )

    # Prepare chronological data for time-series CV: keep model_df order (already sorted)
    X_time = model_df[features].reset_index(drop=True)
    y_time_enc = le.transform(model_df["Yield_class"].values)

    classifiers = get_classifiers()

    # Container for all results
    all_results = {}
    print("Running classifiers:", list(classifiers.keys()))

    for name, clf in classifiers.items():
        print(f"\n--- Evaluating {name} ---")
        res = {}

        # 1) Stratified CV (random folds) - quick check for discoverable patterns
        try:
            cv_res = evaluate_stratified_cv(clf, X, y_enc, cv=5)
            res.update(cv_res)
            print(
                "Stratified CV -> accuracy:",
                f"{cv_res['cv_accuracy_mean']:.4f}",
                "f1_macro:",
                f"{cv_res['cv_f1_macro_mean']:.4f}",
            )
        except Exception as e:
            # Catch exceptions (e.g., if classifier cannot be cross-validated for some reason)
            print("Stratified CV failed:", e)

        # 2) Hold-out test (train on X_train, evaluate on X_test)
        try:
            test_res, ypred = evaluate_holdout(clf, X_train, X_test, y_train_enc, y_test_enc)
            res.update(test_res)
            print(
                "Hold-out test -> accuracy:",
                f"{test_res['test_accuracy']:.4f}",
                "f1_macro:",
                f"{test_res['test_f1_macro']:.4f}",
            )
            # Save model trained on the entire feature set (X, y_enc) for convenience (production use)
            # Important: This model is trained on all data (including what was used for evaluation).
            # If you want to keep a strictly "only-trained-on-train-set" model, fit on X_train only.
            clf.fit(X, y_enc)
            joblib.dump(clf, MODELS_DIR / f"{name}_full.joblib")
        except Exception as e:
            print("Hold-out evaluation failed:", e)

        # 3) Time-series CV (no leakage) — recommended when forecasting into the future
        try:
            ts_res = evaluate_time_series(clf, X_time, y_time_enc, n_splits=4)
            res.update(ts_res)
            print(
                "Time-series CV -> accuracy:",
                f"{ts_res['ts_accuracy_mean']:.4f}",
                "f1_macro:",
                f"{ts_res['ts_f1_macro_mean']:.4f}",
            )
        except Exception as e:
            print("Time-series CV failed:", e)

        # Store results for this classifier
        all_results[name] = res

    # Summarize results into a DataFrame and identify best models per metric
    summary = []
    for name, metrics in all_results.items():
        entry = {"model": name}
        entry.update(metrics)
        summary.append(entry)
    summary_df = pd.DataFrame(summary).fillna(-1)

    # Sort by test_f1_macro when available (practical metric for balanced multiclass)
    if "test_f1_macro" in summary_df.columns:
        summary_df = summary_df.sort_values(by="test_f1_macro", ascending=False)
    else:
        summary_df = summary_df.sort_values(by=list(summary_df.columns[1:])[0], ascending=False)

    # Identify best model for each candidate metric
    best_by = {}
    metric_candidates = [
        "test_accuracy",
        "test_f1_macro",
        "cv_accuracy_mean",
        "cv_f1_macro_mean",
        "ts_accuracy_mean",
        "ts_f1_macro_mean",
    ]
    for metric in metric_candidates:
        if metric in summary_df.columns:
            # idxmax will raise if column is all -1/NaN, guard with try/except
            try:
                best_row = summary_df.loc[summary_df[metric].idxmax()]
                best_by[metric] = {"model": best_row["model"], metric: float(best_row[metric])}
            except Exception:
                # ignore metrics that could not be computed
                pass

    # Save metrics & best-by info
    out_metrics = {"label_mapping": label_map, "results": all_results, "best_by_metric": best_by}
    out_path = RESULTS_DIR / "yield_classification_results.json"
    with open(out_path, "w") as f:
        json.dump(out_metrics, f, indent=2)

    # Print a human-readable summary
    print("\n\nSummary (sorted by test_f1_macro if available):")
    if summary_df.empty:
        print("No results to display.")
    else:
        print(summary_df.to_string(index=False))

    print("\nBest models by metric:")
    if not best_by:
        print("No best-by metrics computed (columns missing or all metrics failed).")
    else:
        for m, v in best_by.items():
            print(f"  {m}: {v['model']} -> {v[m]:.4f}")

    print(f"\nSaved detailed results to {out_path}")
    print(f"Saved trained models (full-data fit) to {MODELS_DIR} (one file per classifier)")

    # Return results dictionary for programmatic consumption if needed
    return out_metrics


if __name__ == "__main__":
    main()

Running classifiers: ['LogisticRegression', 'DecisionTree', 'RandomForest', 'XGBoost']

--- Evaluating LogisticRegression ---
Stratified CV -> accuracy: 0.8897 f1_macro: 0.8807
Hold-out test -> accuracy: 0.9231 f1_macro: 0.9259
Time-series CV failed: This solver needs samples of at least 2 classes in the data, but the data contains only one class: np.int64(1)

--- Evaluating DecisionTree ---
Stratified CV -> accuracy: 0.8923 f1_macro: 0.8886
Hold-out test -> accuracy: 1.0000 f1_macro: 1.0000
Time-series CV -> accuracy: 0.6667 f1_macro: 0.4716

--- Evaluating RandomForest ---
Stratified CV -> accuracy: 0.9231 f1_macro: 0.9194
Hold-out test -> accuracy: 1.0000 f1_macro: 1.0000
Time-series CV -> accuracy: 0.6875 f1_macro: 0.5154

--- Evaluating XGBoost ---
Stratified CV -> accuracy: 0.9218 f1_macro: 0.9172
Hold-out test -> accuracy: 1.0000 f1_macro: 1.0000
Time-series CV failed: Invalid classes inferred from unique values of `y`.  Expected: [0], got [1]


Summary (sorted by test_f1_macro 