In [6]:
import pandas as pd
df = pd.read_csv('tree_data.csv')

In [20]:
import re
import numpy as np
import cv2
import pandas as pd
import torch

# =========================
# 0) Utilities
# =========================

def _mask_from_polys(polys, H, W):
    """Fill polygon(s) (list of Nx2) into a HxW uint8 mask."""
    m = np.zeros((int(H), int(W)), dtype=np.uint8)
    good = []
    for p in polys:
        p = np.asarray(p, dtype=np.int32)
        if p.ndim == 2 and p.shape[1] == 2 and len(p) >= 3:
            good.append(p)
    if good:
        cv2.fillPoly(m, good, 1)
    return m

# =========================
# 1) Decode mask (handles: string repr, Ultralytics Masks, numpy, dict)
# =========================

def parse_mask_repr_to_binary(s: str):
    """
    Parse a string repr like:
      '... orig_shape: (720, 1024) ... xy: [array([[ 80, 0], ...]], dtype=float32)] ...'
    or with xyn: normalized coords in [0,1].
    Returns uint8 binary mask (H,W) with {0,1}.
    """
    # 1) orig_shape
    m = re.search(r"orig_shape:\s*\((\d+)\s*,\s*(\d+)\)", s)
    if not m:
        raise ValueError("orig_shape not found in mask string")
    H, W = map(int, m.groups())

    # Helper: extract the first array(...) block after a prefix (xy/xyn)
    def extract_first_array_block(prefix):
        # Look for: prefix: [array([[ ... ] , dtype=...)]  (non-greedy)
        pat = rf"{prefix}\s*:\s*\[\s*array\s*\(\s*\[\s*(.*?)\s*\]\s*\)"
        m2 = re.search(pat, s, flags=re.S)
        if m2:
            return m2.group(1)
        return None

    # 2) Prefer pixel-space 'xy'
    inner = extract_first_array_block("xy")
    if inner:
        nums = [float(x) for x in re.findall(r"-?\d+\.?\d*", inner)]
        if len(nums) >= 6:  # at least 3 points
            arr = np.array(nums, dtype=np.float32).reshape(-1, 2)
            pts = np.round(arr).astype(np.int32)
            return _mask_from_polys([pts], H, W)

    # 3) Fallback to normalized 'xyn'
    inner = extract_first_array_block("xyn")
    if inner:
        nums = [float(x) for x in re.findall(r"-?\d+\.?\d*", inner)]
        if len(nums) >= 6:
            arr = np.array(nums, dtype=np.float32).reshape(-1, 2)
            xs = np.clip(np.round(arr[:, 0] * W), 0, W-1).astype(np.int32)
            ys = np.clip(np.round(arr[:, 1] * H), 0, H-1).astype(np.int32)
            pts = np.stack([xs, ys], axis=1)
            return _mask_from_polys([pts], H, W)

    # 4) Optional: parse 'data: tensor([...])' (often coarse or empty); usually xy/xyn is better.
    # If needed, add a parser for the dense tensor block here.

    raise ValueError("Neither 'xy' nor 'xyn' polygon found in mask string")

def mask_to_binary_ultra(mask_obj, thresh=0.5):
    """
    Accepts:
      - Ultralytics Masks object with .data/.xy and .orig_shape
      - numpy array mask (H,W) or (H,W,1)
      - dict with {"xy":[polys], "orig_shape":(H,W)}
      - string repr (your current df['mask'])
    Returns uint8 mask {0,1} in (H,W) = orig shape, if available.
    """
    # numpy array already?
    if isinstance(mask_obj, np.ndarray):
        mb = mask_obj.squeeze()
        if mb.ndim != 2:
            raise TypeError(f"Unexpected mask ndarray shape: {mb.shape}")
        return (mb.astype(np.float32) >= thresh).astype(np.uint8)

    # dict with xy/orig_shape or data/orig_shape
    if isinstance(mask_obj, dict) and "orig_shape" in mask_obj:
        H, W = map(int, mask_obj["orig_shape"])
        if "xy" in mask_obj:
            return _mask_from_polys(mask_obj["xy"], H, W)
        if "data" in mask_obj:
            mb = mask_obj["data"]
            if isinstance(mb, torch.Tensor):
                mb = mb.detach().float().cpu().numpy()
            if mb.ndim == 3:
                mb = mb[0]
            if mb.shape != (H, W):
                mb = cv2.resize(mb, (W, H), interpolation=cv2.INTER_NEAREST)
            return (mb >= thresh).astype(np.uint8)

    # Ultralytics-like object (duck-typing)
    if hasattr(mask_obj, "orig_shape"):
        H, W = mask_obj.orig_shape
        # try dense data
        if hasattr(mask_obj, "data"):
            mb = mask_obj.data
            if isinstance(mb, torch.Tensor):
                mb = mb.detach().float().cpu().numpy()
            if mb.ndim == 3:
                mb = mb[0]
            if mb.shape != (H, W):
                mb = cv2.resize(mb, (W, H), interpolation=cv2.INTER_NEAREST)
            mb_bin = (mb >= thresh).astype(np.uint8)
            # if empty but we have polygons, try polygons
            if mb_bin.sum() == 0 and hasattr(mask_obj, "xy") and mask_obj.xy:
                try:
                    return _mask_from_polys(mask_obj.xy, H, W)
                except Exception:
                    pass
            return mb_bin
        # or polygons
        if hasattr(mask_obj, "xy") and mask_obj.xy:
            return _mask_from_polys(mask_obj.xy, H, W)

    # String repr (your case)
    if isinstance(mask_obj, str):
        return parse_mask_repr_to_binary(mask_obj)

    raise TypeError(f"Unsupported mask type: {type(mask_obj)}")

# =========================
# 2) Shape features (inside mask bbox)
# =========================

def mask_shape_features(m_bin, bottom_band_frac=0.10):
    """
    m_bin: uint8 binary mask {0,1} in original image coords.
    Returns: dict of features.
    """
    m = (m_bin > 0).astype(np.uint8)
    ys, xs = np.where(m)
    if len(ys) == 0:
        return {"empty": 1}

    y0, y1 = ys.min(), ys.max()
    x0, x1 = xs.min(), xs.max()
    crop = m[y0:y1+1, x0:x1+1]
    hh, ww = crop.shape
    if hh < 3 or ww < 3:
        return {"empty": 1, "tiny_bbox": 1}

    # Area & aspect
    area = int(crop.sum())
    bbox_area = int(hh * ww)
    area_frac = area / bbox_area if bbox_area > 0 else 0.0
    aspect = hh / ww if ww > 0 else np.inf

    # Width profile
    row_counts = crop.sum(axis=1).astype(np.float32)  # per row
    width_frac = row_counts / float(max(ww, 1))

    band = max(1, int(bottom_band_frac * hh))
    bottom_band = width_frac[-band:]
    mid_lo, mid_hi = int(0.40*hh), int(0.60*hh)
    if mid_hi <= mid_lo:
        mid_hi = min(hh, mid_lo+1)
    mid_band = width_frac[mid_lo:mid_hi]

    bottom_mean = float(bottom_band.mean())
    bottom_min  = float(bottom_band.min())
    mid_mean    = float(mid_band.mean()) if len(mid_band) > 0 else float(width_frac[hh//2])
    widen       = float(mid_mean - bottom_mean)

    # Convex hull solidity
    pts = np.column_stack(np.where(crop > 0))[:, ::-1]  # (x,y)
    hull = cv2.convexHull(pts.astype(np.int32))
    hull_area = float(cv2.contourArea(hull))
    solidity = (area / hull_area) if hull_area > 1e-6 else 1.0

    # Vertical run-length in bottom band
    bb_rows = crop[-band:]
    col_runs = []
    for c in range(ww):
        col = bb_rows[:, c][::-1]  # bottom-up
        run = 0
        for v in col:
            if v:
                run += 1
            else:
                break
        col_runs.append(run)
    vertical_rl_max = int(max(col_runs)) if col_runs else 0

    touches_bottom_bbox = int(crop[-1, :].any())

    return {
        "empty": 0,
        "y0": int(y0), "y1": int(y1), "x0": int(x0), "x1": int(x1),
        "hh": int(hh), "ww": int(ww),
        "area": int(area), "bbox_area": int(bbox_area),
        "area_frac": float(area_frac),
        "aspect": float(aspect),
        "bottom_mean": float(bottom_mean),
        "bottom_min":  float(bottom_min),
        "mid_mean":    float(mid_mean),
        "widen":       float(widen),
        "solidity":    float(solidity),
        "vertical_rl_max": int(vertical_rl_max),
        "touches_bottom_bbox": touches_bottom_bbox,
    }

# =========================
# 3) Add features to DataFrame (with error logging)
# =========================

def add_tree_features(df, mask_col="mask", verbose=False):
    feats, errs = [], []
    for i, m in enumerate(df[mask_col].tolist()):
        try:
            m_bin = mask_to_binary_ultra(m)
            f = mask_shape_features(m_bin)
            feats.append(f)
            if verbose:
                print(i, f)
        except Exception as e:
            feats.append({"empty": 1, "error": str(e)})
            errs.append((i, type(m).__name__, str(e)))
    if errs:
        print("Decode errors (showing up to 5):")
        for row in errs[:5]:
            print("  idx:", row[0], "type:", row[1], "err:", row[2])
    fdf = pd.DataFrame(feats)
    out = pd.concat([df.reset_index(drop=True), fdf], axis=1)
    return out

# =========================
# 4) Rules & evaluation
# =========================

def keep_rule_strict(row,
                     area_frac_range=(0.01, 0.60),
                     aspect_min=1.2,
                     solidity_min=0.25,
                     bottom_mean_thresh=0.30,
                     bottom_min_thresh=0.15,
                     widen_thresh=0.10,
                     vertical_rl_min=6):
    if row.get("empty", 1) == 1:
        return 0
    if not (area_frac_range[0] <= row["area_frac"] <= area_frac_range[1]):
        return 0
    if row["aspect"] < aspect_min:
        return 0
    if row["solidity"] < solidity_min:
        return 0
    # Reject if base is wide AND doesn't widen AND no neck
    if (row["bottom_mean"] > bottom_mean_thresh) and (row["widen"] < widen_thresh) and (row["bottom_min"] > bottom_min_thresh):
        return 0
    if row["vertical_rl_max"] < vertical_rl_min:
        return 0
    return 1

def keep_rule_vote(row, K=3):
    """Less brittle: accumulate evidence; keep if score >= K."""
    if row.get("empty", 1) == 1:
        return 0
    score = 0
    # shape / structure
    if 0.005 <= row["area_frac"] <= 0.80: score += 1
    if row["aspect"] >= 1.05:             score += 1
    if row["solidity"] >= 0.30:           score += 1
    if row["vertical_rl_max"] >= 5:       score += 1
    if row.get("touches_bottom_bbox", 0): score += 1
    # trunk-ish cues
    if row["bottom_mean"] <= 0.35:        score += 1
    if row["widen"] >= 0.05:              score += 1
    if row["bottom_min"] <= 0.12:         score += 1
    # optional model conf if present
    if "conf" in row and str(row["conf"]) != "nan" and float(row["conf"]) >= 0.6:
        score += 1
    return 1 if score >= K else 0

def evaluate_rules(df, label_col="correct", rule=keep_rule_strict):
    # map labels to {0,1}
    y = df[label_col].astype(str).str.lower().map({"yes":1,"true":1,"1":1}).fillna(0).astype(int).to_numpy()
    y_pred = df.apply(rule, axis=1).to_numpy()
    tp = int(((y==1)&(y_pred==1)).sum()); tn = int(((y==0)&(y_pred==0)).sum())
    fp = int(((y==0)&(y_pred==1)).sum()); fn = int(((y==1)&(y_pred==0)).sum())
    acc = (tp+tn)/max(1,len(y))
    prec_keep = tp / max(1,(tp+fp))
    rec_keep  = tp / max(1,(tp+fn))
    prec_inc  = tn / max(1,(tn+fn))  # precision for eliminating incorrects
    rec_inc   = tn / max(1,(tn+fp))  # recall for incorrects
    return {
        "acc": round(acc,4), "tp": tp, "tn": tn, "fp": fp, "fn": fn,
        "precision_keep": round(prec_keep,4), "recall_keep": round(rec_keep,4),
        "precision_incorrect": round(prec_inc,4), "recall_incorrect": round(rec_inc,4)
    }

def audit_gates(df):
    out = {}
    N = len(df)
    def get(col, default):
        return df[col] if col in df.columns else pd.Series([default]*N, index=df.index)
    empty     = (get("empty", 1) == 1)
    area_bad  = ~get("area_frac", np.nan).between(0.01, 0.60)
    aspect_bad= get("aspect", 0) < 1.2
    sol_bad   = get("solidity", 0) < 0.25
    trunk_bad = (get("bottom_mean",1.0) > 0.30) & (get("widen",0.0) < 0.10) & (get("bottom_min",1.0) > 0.15)
    vert_bad  = get("vertical_rl_max", 0) < 6
    fail_any  = empty | area_bad | aspect_bad | sol_bad | trunk_bad | vert_bad
    out["empty"] = int(empty.sum())
    out["area_frac_bad"] = int(area_bad.fillna(True).sum())
    out["aspect_bad"] = int(aspect_bad.sum())
    out["solidity_bad"] = int(sol_bad.sum())
    out["trunk_rule_bad"] = int(trunk_bad.fillna(False).sum())
    out["vertical_bad"] = int(vert_bad.sum())
    out["fail_any"] = int(fail_any.fillna(True).sum())
    out["n"] = N
    return out

# =========================
# 5) HOW TO RUN
# =========================
# df must have: 'mask' (string repr or real object/np array), and 'correct' (yes/no or 1/0)

# Example usage:
# df_feats = add_tree_features(df, mask_col="mask")        # decode + features
# print(df_feats.filter(["empty","area_frac","aspect","solidity","bottom_mean","widen","vertical_rl_max"]).head())
# print(audit_gates(df_feats))
# print("Strict:", evaluate_rules(df_feats, label_col="correct", rule=keep_rule_strict))
# print("Vote K=3:", evaluate_rules(df_feats, label_col="correct", rule=lambda r: keep_rule_vote(r, K=3)))
# print("Vote K=4:", evaluate_rules(df_feats, label_col="correct", rule=lambda r: keep_rule_vote(r, K=4)))


In [21]:
# 1) Compute features
df_feats = add_tree_features(df, mask_col="mask")


Decode errors (showing up to 5):
  idx: 0 type: str err: Neither 'xy' nor 'xyn' polygon found in mask string
  idx: 1 type: str err: Neither 'xy' nor 'xyn' polygon found in mask string
  idx: 2 type: str err: Neither 'xy' nor 'xyn' polygon found in mask string
  idx: 3 type: str err: Neither 'xy' nor 'xyn' polygon found in mask string
  idx: 4 type: str err: Neither 'xy' nor 'xyn' polygon found in mask string


In [19]:
print(type(df.loc[df.index[0], "mask"]))
# If this prints <class 'str'>, you have strings, not mask objects.

<class 'str'>


In [9]:
def audit_gates(df):
    # expects the feature columns added earlier (area_frac, aspect, solidity, bottom_mean, bottom_min, widen, vertical_rl_max)
    checks = {}
    checks['empty'] = (df['empty'] == 1)

    checks['area_frac'] = ~df['area_frac'].between(0.01, 0.60)
    checks['aspect'] = df['aspect'] < 1.2
    checks['solidity'] = df['solidity'] < 0.25

    # trunk-wide base & no widening & no neck
    checks['trunk_rule'] = (df['bottom_mean'] > 0.30) & (df['widen'] < 0.10) & (df['bottom_min'] > 0.15)

    checks['vertical'] = df['vertical_rl_max'] < 6

    fail_any = checks['empty'] | checks['area_frac'] | checks['aspect'] | checks['solidity'] | checks['trunk_rule'] | checks['vertical']

    out = {name: int(mask.sum()) for name, mask in checks.items()}
    out['fail_any'] = int(fail_any.sum())
    out['n'] = len(df)
    return out

audit_gates(df_feats)


KeyError: 'area_frac'