# 42177 — XGBoost (clean + degraded), manifest-driven
- Uses data/clean splits and manifest.csv label order
- Global streaming + StandardScaler + IncrementalPCA (NCOMP=64) for consistency
- Saves JSON results to artifacts/

In [1]:
# %% 0) Setup
# If needed: pip install xgboost
!pip install xgboost

import os, json, time, random, gc
import numpy as np
import pandas as pd
from PIL import Image

from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import IncrementalPCA
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score
from xgboost import XGBClassifier

# Paths
MANIFEST   = r"D:\LocalUser\42177 Project\data\manifest.csv"
ROOT_CLEAN = r"D:\LocalUser\42177 Project\data\clean"
ROOT_DEG   = r"D:\LocalUser\42177 Project\data\degraded"
ART        = r"D:\LocalUser\42177 Project\artifacts"
os.makedirs(ART, exist_ok=True)

# Hyperparameters (low-RAM defaults)
TARGET_HW  = (256, 256)
SEED       = 42177
BATCH      = 64
NCOMP      = 64

random.seed(SEED); np.random.seed(SEED)
EXTS = ('.png','.jpg','.jpeg','.tif','.tiff','.bmp')




In [2]:
# %% 1) Labels from manifest (fixed order)
df = pd.read_csv(MANIFEST)  # expects: id, filepath, label, subset, mag
LABELS = sorted(df['label'].unique().tolist())
LABELS


['babesia',
 'leishmania',
 'plasmodium',
 'toxoplasma1000x',
 'toxoplasma400x',
 'trichomonad',
 'trypanosome']

In [3]:
# %% 2) Streaming helpers
def stream_clean_global(subset, batch=BATCH, target=TARGET_HW):
    paths = []
    for lab in LABELS:
        d = os.path.join(ROOT_CLEAN, subset, lab)
        if not os.path.isdir(d): 
            continue
        paths += [(os.path.join(d,f), lab) for f in os.listdir(d) if f.lower().endswith(EXTS)]
    for i in range(0, len(paths), batch):
        chunk = paths[i:i+batch]
        Xb, yb = [], []
        for p, lab in chunk:
            im = Image.open(p).convert("RGB").resize(target)
            arr = np.asarray(im, dtype=np.float32)/255.0
            Xb.append(arr.reshape(-1)); yb.append(lab)
        if Xb:
            yield np.stack(Xb), np.array(yb)

def stream_degraded_global(cond, subset="test", batch=BATCH, target=TARGET_HW):
    base = os.path.join(ROOT_DEG, cond, subset)
    if not os.path.isdir(base): 
        return
    paths = []
    for lab in LABELS:
        d = os.path.join(base, lab)
        if not os.path.isdir(d): 
            continue
        paths += [(os.path.join(d,f), lab) for f in os.listdir(d) if f.lower().endswith(EXTS)]
    for i in range(0, len(paths), batch):
        chunk = paths[i:i+batch]
        Xb, yb = [], []
        for p, lab in chunk:
            im = Image.open(p).convert("RGB").resize(target)
            arr = np.asarray(im, dtype=np.float32)/255.0
            Xb.append(arr.reshape(-1)); yb.append(lab)
        if Xb:
            yield np.stack(Xb), np.array(yb)


In [4]:
# %% 3) Metrics packer (aligned with evaluate_model.mlx)
def evaluate_and_pack(y_true, y_pred, labels=LABELS):
    y_true = np.asarray(y_true).astype(str)
    y_pred = np.asarray(y_pred).astype(str)
    labels = [str(x) for x in labels]
    cm = confusion_matrix(y_true, y_pred, labels=labels)
    acc = accuracy_score(y_true, y_pred)
    prec, rec, f1, _ = precision_recall_fscore_support(
        y_true, y_pred, labels=labels, average="macro", zero_division=0
    )
    return {
        "Accuracy": float(acc),
        "Precision": float(prec),
        "Recall": float(rec),
        "F1": float(f1),
        "ConfusionMatrix": cm.tolist(),
        "Labels": labels
    }


In [5]:
# %% 4) Fit StandardScaler + IncrementalPCA on TRAIN (global streaming, low RAM)
scaler = StandardScaler(with_mean=True, with_std=True)
for Xb, _ in stream_clean_global("train"):
    scaler.partial_fit(Xb.astype(np.float32, copy=False))
    del Xb; gc.collect()
print("Scaler fitted (global stream).")

ipca = IncrementalPCA(n_components=NCOMP, batch_size=BATCH)
for Xb, _ in stream_clean_global("train"):
    Xb_std = scaler.transform(Xb.astype(np.float32, copy=False))
    if Xb_std.shape[0] >= NCOMP:
        ipca.partial_fit(Xb_std)
    del Xb, Xb_std; gc.collect()
print(f"IPCA fitted (n_components={NCOMP}).")


Scaler fitted (global stream).
IPCA fitted (n_components=64).


In [6]:
# %% 5) Transform TRAIN/VAL/TEST in streams → compact features
def transform_split(split):
    Xc, yc = [], []
    for Xb, yb in stream_clean_global(split):
        Xb_std = scaler.transform(Xb.astype(np.float32, copy=False))
        Xb_pca = ipca.transform(Xb_std).astype(np.float32, copy=False)
        Xc.append(Xb_pca); yc.append(yb)
    X = np.vstack(Xc); y = np.concatenate(yc)
    return X, y

Xtr, ytr = transform_split("train")
Xva, yva = transform_split("val")
Xte, yte = transform_split("test")

Xtr.shape, Xva.shape, Xte.shape


((14390, 64), (4810, 64), (4812, 64))

## Train XGBoost (multiclass) — with label encoding

In [7]:
# %% 6) Train XGBoost (multiclass) — with label encoding

from sklearn.preprocessing import LabelEncoder

# after LABELS is defined and Xtr,ytr,Xva,yva,Xte,yte are built (strings)
le = LabelEncoder()
le.classes_ = np.array(LABELS)  # keep consistent label order

ytr_i = le.transform(ytr)
yva_i = le.transform(yva)
yte_i = le.transform(yte)

# train
xgb = XGBClassifier(
    objective="multi:softmax",
    num_class=len(LABELS),
    n_estimators=400,
    max_depth=8,
    learning_rate=0.05,
    subsample=0.9,
    colsample_bytree=0.9,
    reg_lambda=1.0,
    reg_alpha=0.0,
    tree_method="hist",
    random_state=SEED,
    n_jobs=-1
)

print("Training XGBoost...")
t_train = time.time()
xgb.fit(Xtr, ytr_i, eval_set=[(Xva, yva_i)], verbose=False)
train_secs = time.time() - t_train
print(f"Training completed in {train_secs/60:.2f} minutes")

# Predict and decode labels back to strings for evaluation
t_eval = time.time()
yhat_i = xgb.predict(Xte)
test_secs = time.time() - t_eval
yhat = le.inverse_transform(yhat_i)
print(f"Clean test time: {test_secs:.1f} s")

Training XGBoost...
Training completed in 0.18 minutes
Clean test time: 0.0 s


In [8]:
# %% 7) EVALUATE — clean test set
# Predict (ints) then decode to strings
t_eval = time.time()
yhat_i = xgb.predict(Xte)
test_secs = time.time() - t_eval
yhat = le.inverse_transform(yhat_i)          # strings

# --- sanity checks
print("Xte shape:", Xte.shape)
print("len(yte):", len(yte), "len(yhat):", len(yhat))
print("labels:", LABELS)

yte  = np.asarray(yte).astype(str)
yhat = np.asarray(yhat).astype(str)
LABELS = [str(x) for x in LABELS]

# show mismatches if any
uniq_yte  = sorted(set(yte))
uniq_yhat = sorted(set(yhat))
missing_from_LABELS_yte  = sorted(set(uniq_yte)  - set(LABELS))
missing_from_LABELS_yhat = sorted(set(uniq_yhat) - set(LABELS))
print("unique yte :", uniq_yte)
print("unique yhat:", uniq_yhat)
print("missing_from_LABELS_yte :", missing_from_LABELS_yte)
print("missing_from_LABELS_yhat:", missing_from_LABELS_yhat)

assert len(yte) == len(yhat) and len(yte) > 0
assert set(uniq_yte).issubset(set(LABELS))
assert set(uniq_yhat).issubset(set(LABELS))

# Evaluate and save
res_clean = evaluate_and_pack(yte, yhat, LABELS)
res_clean["TrainElapsedSeconds"] = round(train_secs, 2)
res_clean["TestElapsedSeconds"]  = round(test_secs, 2)

out_clean = os.path.join(ART, "results_xgb_clean.json")
with open(out_clean, "w") as f:
    json.dump(res_clean, f, indent=2)

print("Saved:", out_clean)
print("Accuracy: %.2f%%  F1: %.2f%%" % (100*res_clean["Accuracy"], 100*res_clean["F1"]))


Xte shape: (4812, 64)
len(yte): 4812 len(yhat): 4812
labels: ['babesia', 'leishmania', 'plasmodium', 'toxoplasma1000x', 'toxoplasma400x', 'trichomonad', 'trypanosome']
unique yte : ['babesia', 'leishmania', 'plasmodium', 'toxoplasma1000x', 'toxoplasma400x', 'trichomonad', 'trypanosome']
unique yhat: ['babesia', 'leishmania', 'plasmodium', 'toxoplasma1000x', 'toxoplasma400x', 'trichomonad', 'trypanosome']
missing_from_LABELS_yte : []
missing_from_LABELS_yhat: []
Saved: D:\LocalUser\42177 Project\artifacts\results_xgb_clean.json
Accuracy: 96.07%  F1: 95.45%


In [9]:
print(f"Precision: {100*res_clean['Precision']:.2f}%  Recall: {100*res_clean['Recall']:.2f}%")


Precision: 97.23%  Recall: 93.90%


In [10]:
import csv

# %% 8) Evaluate on DEGRADED test sets (CSV per condition)
def evaluate_degraded_condition(cond, subset="test"):
    y_true_all, y_pred_all = [], []
    t0 = time.time()

    for Xb, yb in stream_degraded_global(cond, subset=subset, batch=BATCH, target=TARGET_HW):
        Xb_std = scaler.transform(Xb.astype(np.float32, copy=False))
        Xb_pca = ipca.transform(Xb_std).astype(np.float32, copy=False)

        yhat_i = xgb.predict(Xb_pca)
        yhat = le.inverse_transform(yhat_i)

        y_true_all.append(yb.astype(str))
        y_pred_all.append(yhat.astype(str))

        del Xb, yb, Xb_std, Xb_pca, yhat_i, yhat
        gc.collect()

    if not y_true_all:
        print(f"[WARN] No samples found for degraded condition '{cond}'")
        return None

    y_true = np.concatenate(y_true_all)
    y_pred = np.concatenate(y_pred_all)

    metrics = evaluate_and_pack(y_true, y_pred, LABELS)
    metrics["TestElapsedSeconds"] = round(time.time() - t0, 2)
    return metrics

degraded_conditions = [
    c for c in os.listdir(ROOT_DEG)
    if os.path.isdir(os.path.join(ROOT_DEG, c))
]

all_deg_results = {}
for c in degraded_conditions:
    print(f"Evaluating degraded condition: {c}")
    res_deg = evaluate_degraded_condition(c, subset="test")
    if res_deg is None:
        print(f"Skipped: {c}")
        continue

    all_deg_results[c] = res_deg

    # flatten for CSV
    out = os.path.join(ART, f"results_xgb_{c}.csv")
    with open(out, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["Metric", "Value"])
        writer.writerow(["Accuracy", res_deg["Accuracy"]])
        writer.writerow(["Precision", res_deg["Precision"]])
        writer.writerow(["Recall", res_deg["Recall"]])
        writer.writerow(["F1", res_deg["F1"]])
        writer.writerow(["TestElapsedSeconds", res_deg["TestElapsedSeconds"]])
        # confusion matrix as JSON string to keep 1-row format
        writer.writerow(["Labels", json.dumps(res_deg["Labels"])])
        writer.writerow(["ConfusionMatrix", json.dumps(res_deg["ConfusionMatrix"])])
    print(f"Saved: {out}")

# optional index (still JSON, per run)
idx_path = os.path.join(ART, "results_xgb_degraded_all.json")
with open(idx_path, "w") as f:
    json.dump(all_deg_results, f, indent=2)
print(f"Saved index of degraded results: {idx_path}")


Evaluating degraded condition: gaussian_blur_s1.0
Saved: D:\LocalUser\42177 Project\artifacts\results_xgb_gaussian_blur_s1.0.csv
Evaluating degraded condition: gaussian_blur_s2.0
Saved: D:\LocalUser\42177 Project\artifacts\results_xgb_gaussian_blur_s2.0.csv
Evaluating degraded condition: gaussian_noise_s15
Saved: D:\LocalUser\42177 Project\artifacts\results_xgb_gaussian_noise_s15.csv
Evaluating degraded condition: gaussian_noise_s5
Saved: D:\LocalUser\42177 Project\artifacts\results_xgb_gaussian_noise_s5.csv
Evaluating degraded condition: jpeg_q20
Saved: D:\LocalUser\42177 Project\artifacts\results_xgb_jpeg_q20.csv
Evaluating degraded condition: jpeg_q40
Saved: D:\LocalUser\42177 Project\artifacts\results_xgb_jpeg_q40.csv
Evaluating degraded condition: jpeg_q60
Saved: D:\LocalUser\42177 Project\artifacts\results_xgb_jpeg_q60.csv
Evaluating degraded condition: motion_blur_k5
Saved: D:\LocalUser\42177 Project\artifacts\results_xgb_motion_blur_k5.csv
Evaluating degraded condition: resolut

In [11]:
# # %% 8) Evaluate on degraded test sets (streaming, low RAM)
# conds = [d for d in os.listdir(ROOT_DEG) if os.path.isdir(os.path.join(ROOT_DEG, d))]

# for c in conds:
#     y_true, y_pred = [], []
#     any_batch = False
#     for Xb, yb in stream_degraded_global(c, "test", batch=BATCH):
#         any_batch = True
#         Xb_std = scaler.transform(Xb.astype(np.float32, copy=False))
#         Xb_pca = ipca.transform(Xb_std)
#         yhat_b = xgb.predict(Xb_pca)
#         y_true.extend(yb.tolist()); y_pred.extend(yhat_b.tolist())
#     if not any_batch:
#         print(f"Skip {c}: no files."); continue

#     resg = evaluate_and_pack(np.array(y_true), np.array(y_pred), LABELS)
#     out = os.path.join(ART, f"results_xgb_{c}.json")
#     with open(out, "w") as f:
#         json.dump(resg, f, indent=2)
#     print("Saved:", out)


In [12]:
# # %% 8) Evaluate on degraded test sets (streaming) - v2
# conds = [d for d in os.listdir(ROOT_DEG) if os.path.isdir(os.path.join(ROOT_DEG, d))]
# cls_names = np.array([str(c) for c in xgb.classes_])   # use model’s learned order

# for c in conds:
#     y_true, y_pred = [], []
#     any_batch = False

#     for Xb, yb in stream_degraded_global(c, "test", batch=BATCH):
#         any_batch = True
#         Xb_std = scaler.transform(Xb.astype(np.float32, copy=False))
#         Xb_pca = ipca.transform(Xb_std)

#         proba_b = xgb.predict_proba(Xb_pca)
#         idx_b   = proba_b.argmax(axis=1)
#         y_pred_b = cls_names[idx_b]

#         y_true.extend(np.asarray(yb).astype(str).tolist())
#         y_pred.extend(y_pred_b.astype(str).tolist())

#     if not any_batch:
#         print(f"Skip {c}: no files.")
#         continue

#     # quick visibility
#     print(f"[{c}] unique y_true: {sorted(set(y_true))[:3]}")
#     print(f"[{c}] unique y_pred: {sorted(set(y_pred))[:3]}")

#     ##patch start
#     # y_true/y_pred built above...
#     y_true = np.asarray(y_true).astype(str)

#     y_pred = np.asarray(y_pred)
#     # If predictions look like indices ('0','1',...), convert -> ints -> names
#     try:
#         idx = y_pred.astype(int)
#         y_pred_names = np.asarray(cls_names, dtype=str)[idx]
#     except ValueError:
#         # Not pure indices (already names or probs were mapped earlier)
#         y_pred_names = y_pred.astype(str)

#     print(f"[{c}] mapped y_pred sample: {sorted(set(y_pred_names))[:3]}")
#     ##patch end

#     resg = evaluate_and_pack(np.array(y_true), np.array(y_pred), cls_names)
#     out = os.path.join(ART, f"results_xgb_{c}.json")
#     with open(out, "w") as f:
#         json.dump(resg, f, indent=2)
#     print("Saved:", out)