In [1]:
#!/usr/bin/env python3
# mask_rule_checker.py

import os
from pathlib import Path
import numpy as np
import pandas as pd
from tqdm import tqdm
# --------------- CONFIG ---------------
NPZ_DIR_IN   = Path("CAFO/sam_on_yolo_all")   # folder of .npz per image
DET_CSV      = Path("CAFO/yolo_img_preds_all/detections.csv")  # YOLO dets with labels
OUT_SUMMARY  = Path("CAFO/reports/mask_rule_summary.csv")
SAVE_FILTERED_NPZ = True
NPZ_DIR_OUT  = Path("CAFO/sam_on_yolo_all_filtered")

# Column names in detections.csv
IMG_COL  = "image"
BOX_COLS = ["x1","y1","x2","y2"]
LBL_COL  = "label"        # YOLO class name per bbox row

# Global thresholds / knobs
INSIDE_THRESH_BARN      = 0.90   # >=90% of mask inside bbox
RECTANGULARITY_MIN_BARN = 0.30   # mask_area / mask_tight_bbox_area
SIZE_RATIO_MIN_BARN     = 0.15   # mask_bbox_area / det_bbox_area
SIZE_RATIO_MAX_BARN     = 1.30

INSIDE_THRESH_BUILDING  = 0.90

COVER_THRESH_POND       = 0.50   # fraction of det box covered by mask
KEEP_TIES_EPS           = 1e-6   # tie tolerance

OVERLAP_MIN_FEEDLOT     = 0.20   # overlap fraction of det box
SIZE_RATIO_MIN_FEEDLOT  = 0.15

INSIDE_MIN_SILO         = 0.80
SIZE_RATIO_MAX_SILO     = 0.35   # small-ish objects typically

OVERLAP_MIN_KEEPALL     = 0.20   # for “keep all but must overlap bbox at least a bit”
# --------------- END CONFIG -----------

# ---------- helpers ----------
def _np_load_safe(p: Path):
    try:
        return np.load(p, allow_pickle=False)
    except ValueError:
        return np.load(p, allow_pickle=True)

def _area_box(bx):
    x1,y1,x2,y2 = map(int, bx)
    return max(0, x2-x1) * max(0, y2-y1)

def _tight_bbox_of_mask(m):
    ys, xs = np.where(m > 0)
    if ys.size == 0:
        return None
    return [int(xs.min()), int(ys.min()), int(xs.max())+1, int(ys.max())+1]

def _frac_inside_mask(m, bx):
    """Fraction of mask pixels that lie inside the bbox."""
    x1,y1,x2,y2 = map(int, bx)
    H, W = m.shape
    x1 = max(0, min(x1, W)); x2 = max(0, min(x2, W))
    y1 = max(0, min(y1, H)); y2 = max(0, min(y2, H))
    if x2 <= x1 or y2 <= y1:
        return 0.0
    total = int((m > 0).sum())
    if total == 0:
        return 0.0
    inside = int((m[y1:y2, x1:x2] > 0).sum())
    return inside / float(total)

def _overlap_frac_of_box(m, bx):
    """Fraction of bbox area (~det box) covered by the mask."""
    x1,y1,x2,y2 = map(int, bx)
    H, W = m.shape
    x1 = max(0, min(x1, W)); x2 = max(0, min(x2, W))
    y1 = max(0, min(y1, H)); y2 = max(0, min(y2, H))
    det_area = (x2-x1)*(y2-y1)
    if det_area <= 0:
        return 0.0
    cov = int((m[y1:y2, x1:x2] > 0).sum())
    return cov / float(det_area)

def _rectangularity(m):
    """mask_area / area(tight bbox). Closer to 1 means more rectangle-filled."""
    mb = _tight_bbox_of_mask(m)
    if mb is None:
        return 0.0
    bbox_area = _area_box(mb)
    if bbox_area <= 0:
        return 0.0
    return float((m > 0).sum()) / float(bbox_area)

def _aspect_ratio_from_mask(m):
    mb = _tight_bbox_of_mask(m)
    if mb is None:
        return 0.0
    x1,y1,x2,y2 = mb
    w = max(1, x2-x1); h = max(1, y2-y1)
    return w / float(h)

def _size_ratio_mask_vs_det(m, det_box):
    mb = _tight_bbox_of_mask(m)
    if mb is None:
        return 0.0
    return _area_box(mb) / float(max(1, _area_box(det_box)))

# ---------- rule per label ----------
def decide_keep_for_label(label: str, m: np.ndarray, det_box) -> (bool, str, float):
    """
    Return (keep_bool, reason_text, score_for_ranking).
    For manure_pond we’ll later keep only the best score per det.
    """
    lb = (label or "").strip().lower()

    # Barn
    if lb == "barn":
        frac_in  = _frac_inside_mask(m, det_box)
        rect     = _rectangularity(m)
        sz_ratio = _size_ratio_mask_vs_det(m, det_box)
        ok = (frac_in >= INSIDE_THRESH_BARN) and \
             (rect >= RECTANGULARITY_MIN_BARN) and \
             (SIZE_RATIO_MIN_BARN <= sz_ratio <= SIZE_RATIO_MAX_BARN)
        score = 0.6*frac_in + 0.2*rect + 0.2*min(1.0, sz_ratio)
        return ok, f"barn: in={frac_in:.2f}, rect={rect:.2f}, r={sz_ratio:.2f}", score

    # Building
    if lb == "building":
        frac_in = _frac_inside_mask(m, det_box)
        ok = (frac_in >= INSIDE_THRESH_BUILDING)
        return ok, f"building: in={frac_in:.2f}", frac_in

    # Manure pond → rank by coverage of box; keep best later
    if lb == "manure_pond":
        cover = _overlap_frac_of_box(m, det_box)
        ok = (cover >= COVER_THRESH_POND)  # we’ll still rank, then keep best
        return ok, f"manure_pond: cover={cover:.2f}", cover

    # Silo → fairly inside + smallish
    if lb == "silo":
        frac_in  = _frac_inside_mask(m, det_box)
        sz_ratio = _size_ratio_mask_vs_det(m, det_box)
        ok = (frac_in >= INSIDE_MIN_SILO) and (sz_ratio <= SIZE_RATIO_MAX_SILO)
        score = 0.7*frac_in + 0.3*(1.0 - min(sz_ratio,1.0))
        return ok, f"silo: in={frac_in:.2f}, r={sz_ratio:.2f}", score

    # Feedlot → overlap a chunk + not tiny
    if lb == "feedlot":
        overlap = _overlap_frac_of_box(m, det_box)
        sz_ratio = _size_ratio_mask_vs_det(m, det_box)
        ok = (overlap >= OVERLAP_MIN_FEEDLOT) and (sz_ratio >= SIZE_RATIO_MIN_FEEDLOT)
        score = 0.7*overlap + 0.3*min(1.0, sz_ratio)
        return ok, f"feedlot: ov={overlap:.2f}, r={sz_ratio:.2f}", score

    # Silage/storage variants → keep if they overlap somewhat
    if ("silage" in lb) or ("storage" in lb):
        overlap = _overlap_frac_of_box(m, det_box)
        ok = (overlap >= OVERLAP_MIN_KEEPALL)
        return ok, f"{lb}: ov={overlap:.2f}", overlap

    # Fallback: require some overlap
    overlap = _overlap_frac_of_box(m, det_box)
    return (overlap >= OVERLAP_MIN_KEEPALL), f"default: ov={overlap:.2f}", overlap

# ---------- main ----------
def main():
    OUT_SUMMARY.parent.mkdir(parents=True, exist_ok=True)
    if SAVE_FILTERED_NPZ:
        NPZ_DIR_OUT.mkdir(parents=True, exist_ok=True)

    # Load detections and index by image; we will address rows by index (row number)
    det = pd.read_csv(DET_CSV)
    # Ensure needed columns
    for c in [IMG_COL, *BOX_COLS, LBL_COL]:
        if c not in det.columns:
            raise SystemExit(f"detections.csv missing column: {c}")

    # We’ll use row index as “det_row_id”
    det = det.reset_index().rename(columns={"index": "det_row_id"})

    # Group detection rows by image for quick lookup
    det_by_img = {}
    for img, grp in det.groupby(IMG_COL):
        det_by_img[img] = grp

    rows_out = []  # for CSV summary

    npz_files = sorted([p for p in NPZ_DIR_IN.glob("*.npz")])
                        # if not p.name.endswith("_neg.npz")])

    for npz_path in tqdm(npz_files):
        data = _np_load_safe(npz_path)
        masks = data.get("masks", None)
        if masks is None:
            continue
        masks = (np.array(masks) > 0)
        n = masks.shape[0]

        # Try to figure out the image path/filename used in detections.csv
        # Most of your flows save per-image npz named after the image file
        stem = npz_path.stem
        # Try exact match first (full path), then by basename
        # If detections uses absolute paths, try endswith
        det_img_rows = None
        # 1) exact key
        if stem in det_by_img:
            det_img_rows = det_by_img[stem]
        else:
            # 2) try match by basename
            candidates = [k for k in det_by_img.keys() if Path(k).stem == stem]
            if candidates:
                det_img_rows = det_by_img[candidates[0]]
            else:
                # 3) last resort: no rows found for this image
                # we’ll mark mask labels as unknown
                det_img_rows = pd.DataFrame(columns=["det_row_id", *BOX_COLS, LBL_COL])
                

        # If the NPZ already carries per-mask det mapping (common in your earlier scripts)
        # prefer that:
        row_idx_arr = data.get("row_indices", None)   # det row indices (into original CSV)
        det_boxes_arr = data.get("det_boxes_xyxy", None)

        # Build a per-mask (det_box, det_label) pairing
        det_boxes = []
        det_labels = []

        if row_idx_arr is not None and len(det_img_rows):
            # Map via det_row_id
            mp = det_img_rows.set_index("det_row_id")
            for rid in map(int, np.array(row_idx_arr).reshape(-1)):
                if rid in mp.index:
                    r = mp.loc[rid]
                    det_boxes.append([int(r[BOX_COLS[0]]), int(r[BOX_COLS[1]]),
                                      int(r[BOX_COLS[2]]), int(r[BOX_COLS[3]])])
                    det_labels.append(str(r[LBL_COL]))
                else:
                    # fallback unknown
                    det_boxes.append([0,0,0,0])
                    det_labels.append("")
        elif det_boxes_arr is not None and len(det_img_rows):
            # If we only have a shared det box array, we pair by min(len(masks), len(det_boxes_arr))
            det_boxes_arr = np.array(det_boxes_arr)
            K = min(n, det_boxes_arr.shape[0])
            for i in range(n):
                if i < K:
                    bx = list(map(int, det_boxes_arr[i]))
                    det_boxes.append(bx)
                else:
                    det_boxes.append([0,0,0,0])
                # label: take first row’s label if only one, else empty if mismatch
                if len(det_img_rows) == 1:
                    det_labels.append(str(det_img_rows.iloc[0][LBL_COL]))
                else:
                    det_labels.append("")
        else:
            # No mapping info; leave unknown
            det_boxes = [[0,0,0,0] for _ in range(n)]
            det_labels = [""] * n

        # Apply rules per mask
        keeps = []
        scores = []
        reasons = []
        for i in range(n):
            m  = masks[i]
            bx = det_boxes[i]
            lb = det_labels[i]
            keep, reason, score = decide_keep_for_label(lb, m, bx)
            keeps.append(bool(keep))
            scores.append(float(score))
            reasons.append(reason)

            rows_out.append({
                "npz_file": str(npz_path),
                "mask_idx": i,
                "det_box": bx,
                "det_label": lb,
                "keep": bool(keep),
                "score": float(score),
                "reason": reason,
                "mask_area": int(m.sum()),
                "mask_rectangularity": float(_rectangularity(m)),
                "mask_aspect_ratio": float(_aspect_ratio_from_mask(m)),
                "frac_inside": float(_frac_inside_mask(m, bx)),
                "cover_frac": float(_overlap_frac_of_box(m, bx)),
            })

        # Special handling: for manure_pond, keep ONLY the best coverage per det box
        # (we approximate “same det” by identical det_box coords)
        for lbl in ["manure_pond"]:
            # indices for this label
            idxs = [i for i, l in enumerate(det_labels) if (l or "").lower() == lbl]
            if not idxs:
                continue
            # group by det_box tuple
            groups = {}
            for i in idxs:
                groups.setdefault(tuple(det_boxes[i]), []).append(i)
            for g in groups.values():
                # choose best by score (we defined score = coverage for manure_pond)
                best_i = max(g, key=lambda i: scores[i])
                for i in g:
                    keeps[i] = (i == best_i)

        # Optionally save a filtered NPZ
        if SAVE_FILTERED_NPZ:
            out_path = NPZ_DIR_OUT / npz_path.name
            out_path.parent.mkdir(parents=True, exist_ok=True)
            keep_idx = [i for i, k in enumerate(keeps) if k]
            if keep_idx:
                np.savez_compressed(
                    out_path.with_suffix(".tmp.npz"),
                    masks=(masks[keep_idx]).astype(np.uint8),
                    det_boxes_xyxy=np.array([det_boxes[i] for i in keep_idx], dtype=np.int32),
                    det_labels=np.array([det_labels[i] for i in keep_idx], dtype=object),
                    keep_scores=np.array([scores[i] for i in keep_idx], dtype=np.float32),
                )
                os.replace(out_path.with_suffix(".tmp.npz"), out_path)
            else:
                # write empty with metadata
                np.savez_compressed(
                    out_path.with_suffix(".tmp.npz"),
                    masks=np.zeros((0, 1, 1), dtype=np.uint8),
                    det_boxes_xyxy=np.zeros((0, 4), dtype=np.int32),
                    det_labels=np.array([], dtype=object),
                    keep_scores=np.zeros((0,), dtype=np.float32),
                )
                os.replace(out_path.with_suffix(".tmp.npz"), out_path)

    # Write the CSV summary
    out_df = pd.DataFrame(rows_out)
    OUT_SUMMARY.parent.mkdir(parents=True, exist_ok=True)
    tmp = OUT_SUMMARY.with_suffix(".tmp.csv")
    out_df.to_csv(tmp, index=False)
    os.replace(tmp, OUT_SUMMARY)
    #print(f"[OK] Wrote summary: {OUT_SUMMARY}  (rows={len(out_df)})")

if __name__ == "__main__":
    main()


100%|██████████| 24049/24049 [1:33:25<00:00,  4.29it/s]  


In [1]:
#!/usr/bin/env python3
# label_masks_by_best_box.py

from pathlib import Path
import os
import numpy as np
import pandas as pd
from tqdm import tqdm

# ---------- CONFIG ----------
NPZ_DIR_IN   = Path("CAFO/sam_on_yolo_all_filtered")
NPZ_DIR_OUT  = Path("CAFO/sam_on_yolo_all_labeled")
NPZ_DIR_OUT.mkdir(parents=True, exist_ok=True)

# YOLO detections: one row per detection with image path + bbox + label
DET_CSV      = Path("CAFO/yolo_img_preds_all/detections.csv")
DET_IMG_COL  = "image"
DET_X1, DET_Y1, DET_X2, DET_Y2 = "x1", "y1", "x2", "y2"
DET_LABEL_COL = "label"  # required

# Category CSV: one row per image with image path + category (e.g., Swine/Dairy/…)
CAT_CSV          = Path("CAFO/csvs/predict_all_cafo_wNeg.csv")
CAT_PATH_CAND    = {"patch file","patche_file","patch_file","patch","patchfile","image","img","path","filepath","file"}
CAT_CATEGORY_COL = "category"   # expected name for category
# ----------------------------

def _stem(p: str) -> str:
    return Path(str(p)).stem.lower()

def _frac_inside(mask_bool: np.ndarray, box_xyxy) -> float:
    """Fraction of mask pixels that lie inside the bbox."""
    H, W = mask_bool.shape
    x1,y1,x2,y2 = map(int, box_xyxy)
    # clip to image
    x1 = max(0, min(x1, W-1)); x2 = max(0, min(x2, W))
    y1 = max(0, min(y1, H-1)); y2 = max(0, min(y2, H))
    if x2 <= x1 or y2 <= y1:
        return 0.0
    total = int(mask_bool.sum())
    if total == 0:
        return 0.0
    inside = int(mask_bool[y1:y2, x1:x2].sum())
    return inside / total

def _index_detections_by_stem(det_df: pd.DataFrame) -> dict:
    """Group YOLO detections by image filename stem."""
    buckets = {}
    for i, p in det_df[DET_IMG_COL].items():
        buckets.setdefault(_stem(p), []).append(i)
    return {k: det_df.loc[idxs].reset_index(drop=True) for k, idxs in buckets.items()}

def _build_category_map(cat_df: pd.DataFrame) -> dict:
    """Map image filename stem -> category."""
    # find the path column in category CSV
    path_col = None
    for c in cat_df.columns:
        if c.strip().lower() in CAT_PATH_CAND:
            path_col = c
            break
    if path_col is None or CAT_CATEGORY_COL not in cat_df.columns:
        return {}

    mp = {}
    for _, r in cat_df.iterrows():
        p = str(r.get(path_col, "")).strip()
        if not p:
            continue
        mp[_stem(p)] = str(r.get(CAT_CATEGORY_COL, "")).strip()
    return mp

def _subset_by_indices_like_masks(data_files, masks_shape0, keep_idx):
    """
    Return dict with arrays subset along first dim if they match masks' length.
    """
    out = {}
    keep_idx = np.array(keep_idx, dtype=np.int64)
    for k in data_files.files:
        arr = data_files[k]
        try:
            if isinstance(arr, np.ndarray) and arr.ndim >= 1 and arr.shape[0] == masks_shape0:
                out[k] = arr[keep_idx]
            else:
                out[k] = arr
        except Exception:
            out[k] = arr
    return out

def main():
    # Load detections
    det = pd.read_csv(DET_CSV)
    need = [DET_IMG_COL, DET_X1, DET_Y1, DET_X2, DET_Y2, DET_LABEL_COL]
    missing = [c for c in need if c not in det.columns]
    if missing:
        raise SystemExit(f"Detections CSV missing columns: {missing}")

    # keep only valid boxes
    for c in (DET_X1, DET_Y1, DET_X2, DET_Y2):
        det[c] = pd.to_numeric(det[c], errors="coerce")
    det = det.dropna(subset=[DET_X1, DET_Y1, DET_X2, DET_Y2, DET_IMG_COL]).reset_index(drop=True)

    det_by_stem = _index_detections_by_stem(det)

    # Load categories (optional)
    image_category = {}
    if CAT_CSV.exists():
        try:
            cat_df = pd.read_csv(CAT_CSV)
            image_category = _build_category_map(cat_df)
            # print(image_category)
        except Exception as e:
            print(f"[warn] Could not load categories: {e}")

    npz_files = sorted([p for p in NPZ_DIR_IN.glob("*.npz")])

    if not npz_files:
        print(f"No NPZ files in {NPZ_DIR_IN}")
        return

    NEG_CATEGORIES = {"negative", "neg", "not_cafo", "not cafo", "non_cafo", "none"}

    for npz_path in tqdm(npz_files):
        data = np.load(npz_path, allow_pickle=True)
        if "masks" not in data.files:
            print(f"[skip] {npz_path.name}: no 'masks' array")
            continue

        masks = data["masks"]
        if masks.ndim == 2:
            masks = masks[None, ...]
        masks = (masks > 0)
        n_masks = masks.shape[0]

        # detections for this image
        stem = npz_path.stem.lower()
        dets_img = det_by_stem.get(stem, None)

        # best label per mask by max fraction of mask pixels inside a det box
        labels = []
        if dets_img is None or dets_img.empty:
            labels = [""] * n_masks
        else:
            for i in range(n_masks):
                m = masks[i]
                best_frac = -1.0
                best_label = ""
                for _, r in dets_img.iterrows():
                    box = (int(r[DET_X1]), int(r[DET_Y1]), int(r[DET_X2]), int(r[DET_Y2]))
                    frac = _frac_inside(m, box)
                    if frac > best_frac:
                        best_frac = frac
                        best_label = str(r[DET_LABEL_COL])
                labels.append(best_label if best_frac > 0 else "")

        # image category (single string, case-insensitive check)
        cat = image_category.get(stem, "")
        cat_norm = (cat or "").strip().lower()

        # ----- Apply your negative-category rule -----
        if cat_norm in NEG_CATEGORIES:
            # keep only masks labeled "barn" → relabel to "building"
            keep_idx = []
            new_labels = []
            for i, lab in enumerate(labels):
                lab_norm = (lab or "").strip().lower()
                if lab_norm == "barn":
                    keep_idx.append(i)
                    new_labels.append("building")
                # else: drop (skip manure_pond and everything else)

            if not keep_idx:
                # print(f"[skip] {npz_path.name}: negative category → no kept masks")
                continue

            # subset all per-mask arrays in the NPZ so they stay aligned
            out_dict = _subset_by_indices_like_masks(data, n_masks, keep_idx)
            # override mask_labels to the relabeled ones we kept
            out_dict["mask_labels"] = np.array(new_labels, dtype=object)
        else:
            # non-negative: keep all labels as computed
            out_dict = {k: data[k] for k in data.files}
            out_dict["mask_labels"] = np.array(labels, dtype=object)

        out_dict["image_category"] = np.array(cat, dtype=object)  # single string

        # write
        out_path = NPZ_DIR_OUT / npz_path.name
        tmp = out_path.with_suffix(".tmp.npz")
        np.savez_compressed(tmp, **out_dict)
        os.replace(tmp, out_path)
        kept_n = len(out_dict["mask_labels"])
        # print(f"[ok] {npz_path.name}: wrote {out_path.name}  (kept_masks={kept_n}, category='{cat}')")

    print("Done.")

if __name__ == "__main__":
    main()


100%|██████████| 24049/24049 [35:53<00:00, 11.17it/s]  


Done.
