In [1]:
import os
import pickle
import numpy as np
import pandas as pd
import networkx as nx
from pathlib import Path

In [2]:
REGION_CSV = r"F:\New Dissertation - Image Generation\POC\outputs\shape_region_features.csv"
IMAGE_CSV  = r"F:\New Dissertation - Image Generation\POC\outputs\shape_image_indices.csv"

OUT_FORM_DIR = r"outputs\form\form_outputs_csv_only"

os.makedirs(OUT_FORM_DIR, exist_ok=True)

df_regions = pd.read_csv(REGION_CSV)
df_images  = pd.read_csv(IMAGE_CSV)

print("df_regions:", df_regions.shape)
print("df_images :", df_images.shape)
display(df_regions.head())

df_regions: (366, 26)
df_images : (128, 10)


Unnamed: 0,split,image_id,H,W,region_index,area_px,area_ratio,skel_mass_px,n_endpoints,n_junctions,...,is_primary,is_secondary,is_whole,centeredness,smoothness,stability_mask_mean,stability_mask_min,stability_mask_max,stability_mask_n,stability_mask_range
0,real,real\images\10pm_feeding_around_the_clock.webp,749,750,0,2120,0.003774,159,0,71,...,0,1,0,0.94881,0.74496,0.855354,0.845755,0.864953,2.0,0.019198
1,real,real\images\10pm_feeding_around_the_clock.webp,749,750,1,1976,0.003518,165,0,79,...,0,1,0,0.9578,0.766552,0.840703,0.828947,0.852459,2.0,0.023512
2,real,real\images\10pm_feeding_around_the_clock.webp,749,750,2,2486,0.004425,63,0,14,...,0,1,0,0.857058,0.732789,0.907409,0.903862,0.910956,2.0,0.007095
3,real,real\images\10pm_feeding_around_the_clock.webp,749,750,3,2664,0.004742,188,1,78,...,0,1,0,0.863538,0.766856,0.864032,0.85548,0.872584,2.0,0.017104
4,real,real\images\10pm_feeding_around_the_clock.webp,749,750,4,1817,0.003235,178,1,57,...,0,1,0,0.928498,0.728474,0.825438,0.811227,0.839649,2.0,0.028422


### Helper Functions

In [3]:
def safe_cols(df: pd.DataFrame, cols):
    return [c for c in cols if c in df.columns]

def zscore_cols(X, eps=1e-9):
    X = np.asarray(X, dtype=float)
    mu = np.nanmean(X, axis=0)
    sd = np.nanstd(X, axis=0)
    return (X - mu) / (sd + eps)

def cosine_similarity_matrix(X):
    X = np.asarray(X, dtype=float)
    norms = np.linalg.norm(X, axis=1, keepdims=True) + 1e-9
    U = X / norms
    return U @ U.T

def ensure_dir(p):
    os.makedirs(p, exist_ok=True)
    return p

def sanitize_path_component(x: str) -> str:
    """
    Windows-safe folder name, because image_id has slashes/backslashes/colon etc.
    """
    x = str(x)
    bad = ['\\', '/', ':', '*', '?', '"', '<', '>', '|']
    for b in bad:
        x = x.replace(b, "_")
    return x

### Form-0 graph builder

In [4]:
DEFAULT_FEATS = [
    "area_ratio", "skel_mass_px",
    "pca_eig_ratio", "symmetry_pca", "saliency",
    "n_endpoints", "n_junctions", "n_edges",
    "centeredness", "smoothness",
    "stability_mask_mean", "stability_mask_min", "stability_mask_max", "stability_mask_range"
]

ROLE_COLS = ["is_primary", "is_secondary", "is_whole"]

def build_form_graph_for_image_csv_only(
    split, image_id, df_img: pd.DataFrame,
    feature_cols=None,
    repetition_threshold=0.85,
    max_rep_edges_per_node=3,
    add_role_links=True
):
    """
    CSV-only Form-0 graph.

    Nodes: region_index with all CSV attributes.
    Edges:
      - repetition edges from cosine similarity on selected feature columns
      - optional role-link edges if role columns exist (is_primary/is_secondary/is_whole)
    """
    feature_cols = feature_cols or DEFAULT_FEATS
    feat_cols = safe_cols(df_img, feature_cols)
    if len(feat_cols) == 0:
        raise ValueError("No usable feature columns found to build repetition edges.")

    # ---- init graph
    G = nx.Graph(split=str(split), image_id=str(image_id), form0_mode="csv_only")

    df_img = df_img.copy().reset_index(drop=True)

    # ---- add nodes
    if "region_index" not in df_img.columns:
        raise KeyError("df_img must contain 'region_index' column.")
    for _, row in df_img.iterrows():
        rid = int(row["region_index"])
        attrs = row.to_dict()
        attrs["node_type"] = "region"
        G.add_node(rid, **attrs)

    # ---- repetition edges
    # clean numeric matrix, fill NaNs with column medians (stable for cosine)
    X = df_img[feat_cols].apply(pd.to_numeric, errors="coerce").to_numpy(dtype=float)
    # median fill
    col_meds = np.nanmedian(X, axis=0)
    inds = np.where(~np.isfinite(X))
    if len(inds[0]) > 0:
        X[inds] = np.take(col_meds, inds[1])

    Xn = zscore_cols(X)
    S = cosine_similarity_matrix(Xn)

    ids = df_img["region_index"].astype(int).tolist()
    id_to_pos = {rid: i for i, rid in enumerate(ids)}

    for rid in ids:
        i = id_to_pos[rid]
        sims = [(ids[j], float(S[i, j])) for j in range(len(ids)) if j != i]
        sims.sort(key=lambda x: x[1], reverse=True)

        added = 0
        for other, sim in sims:
            if sim < repetition_threshold:
                break
            if added >= max_rep_edges_per_node:
                break

            if not G.has_edge(rid, other):
                G.add_edge(rid, other, rel_repetition=1, repetition_sim=float(sim))
            else:
                G.edges[rid, other]["rel_repetition"] = 1
                prev = G.edges[rid, other].get("repetition_sim", -1)
                G.edges[rid, other]["repetition_sim"] = float(max(sim, prev))
            added += 1

    # ---- optional role-link edges
    if add_role_links:
        role_cols_present = safe_cols(df_img, ROLE_COLS)
        for role in role_cols_present:
            # connect all regions with role==1
            members = df_img.loc[pd.to_numeric(df_img[role], errors="coerce").fillna(0).astype(int) == 1, "region_index"]
            members = members.astype(int).tolist()
            for a_i in range(len(members)):
                for b_i in range(a_i + 1, len(members)):
                    a, b = members[a_i], members[b_i]
                    if not G.has_edge(a, b):
                        G.add_edge(a, b, rel_rolelink=1, role=str(role))
                    else:
                        G.edges[a, b]["rel_rolelink"] = 1
                        G.edges[a, b]["role"] = str(role)

    # ---- export tables
    nodes_out = pd.DataFrame([{"node": n, **G.nodes[n]} for n in G.nodes])
    edge_rows = []
    for u, v, a in G.edges(data=True):
        row = {"u": u, "v": v}
        row.update(a)
        edge_rows.append(row)
    edges_out = pd.DataFrame(edge_rows)

    return G, nodes_out, edges_out, feat_cols

### Single-Image Runner

In [5]:
def build_and_save_one_form_graph(
    df_regions: pd.DataFrame,
    split: str,
    image_id: str,
    out_root: str = OUT_FORM_DIR,
    repetition_threshold: float = 0.85,
    max_rep_edges_per_node: int = 3,
    add_role_links: bool = True,
):
    gdf = df_regions[(df_regions["split"] == split) & (df_regions["image_id"] == image_id)].copy()
    if len(gdf) == 0:
        raise ValueError(f"No rows found for split={split}, image_id={image_id}")

    G, nodes_df, edges_df, used_feat_cols = build_form_graph_for_image_csv_only(
        split, image_id, gdf,
        repetition_threshold=repetition_threshold,
        max_rep_edges_per_node=max_rep_edges_per_node,
        add_role_links=add_role_links
    )

    # safer folder names
    od = os.path.join(out_root, sanitize_path_component(split), sanitize_path_component(image_id))
    ensure_dir(od)

    with open(os.path.join(od, "form_graph.pkl"), "wb") as f:
        pickle.dump(G, f, protocol=pickle.HIGHEST_PROTOCOL)

    nodes_df.to_csv(os.path.join(od, "nodes.csv"), index=False)
    edges_df.to_csv(os.path.join(od, "edges.csv"), index=False)

    meta = {
        "split": split,
        "image_id": image_id,
        "n_nodes": int(G.number_of_nodes()),
        "n_edges": int(G.number_of_edges()),
        "feature_cols_used": ",".join(used_feat_cols),
        "repetition_threshold": float(repetition_threshold),
        "max_rep_edges_per_node": int(max_rep_edges_per_node),
        "out_dir": od
    }
    return G, nodes_df, edges_df, meta

### Run for all batch

In [6]:
all_nodes = []
all_edges = []
meta_rows = []
graphs_built = 0

for (split, image_id), gdf in df_regions.groupby(["split", "image_id"], sort=False):

    G, nodes_df, edges_df, used_feat_cols = build_form_graph_for_image_csv_only(
        split, image_id, gdf,
        repetition_threshold=0.85,
        max_rep_edges_per_node=3,
        add_role_links=True
    )

    od = os.path.join(OUT_FORM_DIR, sanitize_path_component(split), sanitize_path_component(image_id))
    ensure_dir(od)

    with open(os.path.join(od, "form_graph.pkl"), "wb") as f:
        pickle.dump(G, f, protocol=pickle.HIGHEST_PROTOCOL)

    nodes_df.to_csv(os.path.join(od, "nodes.csv"), index=False)
    edges_df.to_csv(os.path.join(od, "edges.csv"), index=False)

    # collect
    nodes_df2 = nodes_df.copy()
    edges_df2 = edges_df.copy()
    nodes_df2["split"] = split
    nodes_df2["image_id"] = image_id
    edges_df2["split"] = split
    edges_df2["image_id"] = image_id

    all_nodes.append(nodes_df2)
    all_edges.append(edges_df2)

    meta_rows.append({
        "split": split,
        "image_id": image_id,
        "n_nodes": int(G.number_of_nodes()),
        "n_edges": int(G.number_of_edges()),
        "feature_cols_used": ",".join(used_feat_cols),
        "repetition_threshold": 0.85,
        "max_rep_edges_per_node": 3
    })

    graphs_built += 1

df_form_nodes = pd.concat(all_nodes, ignore_index=True) if all_nodes else pd.DataFrame()
df_form_edges = pd.concat(all_edges, ignore_index=True) if all_edges else pd.DataFrame()
df_form_meta  = pd.DataFrame(meta_rows)

df_form_nodes.to_csv(os.path.join(OUT_FORM_DIR, "form_nodes_all.csv"), index=False)
df_form_edges.to_csv(os.path.join(OUT_FORM_DIR, "form_edges_all.csv"), index=False)
df_form_meta.to_csv(os.path.join(OUT_FORM_DIR, "form_graph_meta.csv"), index=False)

print("graphs_built:", graphs_built)
print("nodes_all:", df_form_nodes.shape)
print("edges_all:", df_form_edges.shape)
display(df_form_meta.head())

  col_meds = np.nanmedian(X, axis=0)
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  col_meds = np.nanmedian(X, axis=0)
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  col_meds = np.nanmedian(X, axis=0)
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  col_meds = np.nanmedian(X, axis=0)
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  col_meds = np.nanmedian(X, axis=0)
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  col_meds = np.nanmedian(X, axis=0)
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  col_meds = np.nanmedian(X, axis=0)
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  col_meds = np.nanmedian(X, axis=0)
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dty

graphs_built: 128
nodes_all: (366, 28)
edges_all: (417, 8)


Unnamed: 0,split,image_id,n_nodes,n_edges,feature_cols_used,repetition_threshold,max_rep_edges_per_node
0,real,real\images\10pm_feeding_around_the_clock.webp,19,108,"area_ratio,skel_mass_px,pca_eig_ratio,symmetry...",0.85,3
1,real,real\images\11pm.webp,1,0,"area_ratio,skel_mass_px,pca_eig_ratio,symmetry...",0.85,3
2,real,real\images\12_4_7_10.webp,1,0,"area_ratio,skel_mass_px,pca_eig_ratio,symmetry...",0.85,3
3,real,real\images\Belly_breast.webp,7,9,"area_ratio,skel_mass_px,pca_eig_ratio,symmetry...",0.85,3
4,real,real\images\Belly_breast_face_brain_placenta.webp,1,0,"area_ratio,skel_mass_px,pca_eig_ratio,symmetry...",0.85,3


### Orientation & frames of reference

In [7]:
import os
import numpy as np
import pandas as pd

OUT_FORM1_DIR = r"outputs\form\form1_orientation"
os.makedirs(OUT_FORM1_DIR, exist_ok=True)

### Helpers

In [8]:
def _as_float_series(s):
    """For pandas Series / array-like columns."""
    return pd.to_numeric(s, errors="coerce").astype(float)

def _as_float_scalar(x, default=np.nan):
    """For single values."""
    try:
        v = pd.to_numeric(x, errors="coerce")
        if v is None or (isinstance(v, float) and np.isnan(v)):
            return default
        return float(v)
    except Exception:
        return default

def _strength_score(row):
    """
    Axis strength proxy in [0,1].
    row is a pandas Series from iterrows()
    """
    eig = _as_float_scalar(row.get("pca_eig_ratio", np.nan))
    v = np.log1p(max(eig, 0.0))
    return float(np.clip(v / 4.0, 0.0, 1.0))

def _wrap180(angle_deg):
    """Map degrees to [0, 180)."""
    a = np.asarray(angle_deg, dtype=float)
    return np.mod(a, 180.0)

def _axis_angle_from_uv(dx, dy):
    """
    Convert axis unit vector (dx,dy) to directionless axis angle in degrees [0,180).
    dx,dy in image x,y coords.
    """
    ang = np.degrees(np.arctan2(dy, dx))
    return _wrap180(ang)

def _dist_to_axis(angle_deg, axis_deg):
    """
    Smallest absolute angular distance between two directionless axes (0..90).
    Both in [0,180).
    """
    a = np.asarray(angle_deg, dtype=float)
    b = np.asarray(axis_deg, dtype=float)
    d = np.abs(a - b)
    d = np.minimum(d, 180.0 - d)
    # directionless axis => treat 0 and 180 same already; this yields 0..90
    return d

def _min_dist_to_hv(angle_deg):
    """Distance to nearest horizontal(0) or vertical(90) in degrees (0..45)."""
    a = np.asarray(angle_deg, dtype=float)
    d0 = _dist_to_axis(a, 0.0)
    d90 = _dist_to_axis(a, 90.0)
    return np.minimum(d0, d90)

def _global_align_score(angle_deg):
    """
    1.0 = perfectly horizontal/vertical
    0.0 = maximally diagonal (45 deg away)
    """
    d = _min_dist_to_hv(angle_deg)  # 0..45
    return 1.0 - (d / 45.0)

def _pick_parent(df_img):
    """
    Choose a 'parent' / dominant frame region.
    Preference: is_whole==1, else highest saliency, else largest area_ratio.
    Returns region_index or None.
    """
    if "is_whole" in df_img.columns and (df_img["is_whole"].astype(int) == 1).any():
        return int(df_img.loc[df_img["is_whole"].astype(int) == 1, "region_index"].iloc[0])

    if "saliency" in df_img.columns and np.isfinite(_as_float_series(df_img["saliency"])).any():
        j = int(_as_float_series(df_img["saliency"]).fillna(-1).idxmax())
        return int(df_img.loc[j, "region_index"])

    if "area_ratio" in df_img.columns and np.isfinite(_as_float_series(df_img["area_ratio"])).any():
        j = int(_as_float_series(df_img["area_ratio"]).fillna(-1).idxmax())
        return int(df_img.loc[j, "region_index"])

    return None

In [9]:
if "pca_angle_deg" not in df_regions.columns:
    raise KeyError(
        "df_regions is missing 'pca_angle_deg'. "
        "Add it during Shape (Step4 PCA) export, then rerun Form-1."
    )

# Ensure numeric
df_regions = df_regions.copy()
df_regions["pca_angle_deg"] = _as_float_series(df_regions["pca_angle_deg"])
if "pca_eig_ratio" in df_regions.columns:
    df_regions["pca_eig_ratio"] = _as_float_series(df_regions["pca_eig_ratio"])
if "saliency" in df_regions.columns: df_regions["saliency"] = _as_float_series(df_regions["saliency"])
if "area_ratio" in df_regions.columns: df_regions["area_ratio"] = _as_float_series(df_regions["area_ratio"])
if "pca_eig_ratio" in df_regions.columns: df_regions["pca_eig_ratio"] = _as_float_series(df_regions["pca_eig_ratio"])

### Compute Per Image

In [10]:
rows_region = []
rows_image  = []

for (split, image_id), g in df_regions.groupby(["split", "image_id"], sort=False):
    df_img = g.copy().reset_index(drop=True)

    # (1) Global frame alignment per region
    ang = df_img["pca_angle_deg"].to_numpy(dtype=float)
    global_align = _global_align_score(ang)  # [0,1]
    dist_hv_deg  = _min_dist_to_hv(ang)      # [0,45]

    # choose parent frame
    parent_rid = _pick_parent(df_img)
    if parent_rid is not None and (df_img["region_index"].astype(int) == parent_rid).any():
        parent_angle = float(df_img.loc[df_img["region_index"].astype(int) == parent_rid, "pca_angle_deg"].iloc[0])
    else:
        parent_angle = np.nan

    # (2) Local frame influence: child alignment to parent vs global H/V
    # child-parent distance (0..90), child-global distance (0..45)
    child_parent_dist = _dist_to_axis(ang, parent_angle) if np.isfinite(parent_angle) else np.full(len(df_img), np.nan)
    child_global_dist = dist_hv_deg

    # influence score:
    # positive if child is closer to parent frame than to global H/V,
    # normalized to [-1, +1]
    #   +1: much closer to parent than to global
    #    0: equal
    #   -1: closer to global than parent
    denom = np.maximum(child_parent_dist + child_global_dist, 1e-9)
    local_influence = (child_global_dist - child_parent_dist) / denom
    local_influence = np.clip(local_influence, -1.0, 1.0)

    # (3) Orientation conflict index ("crossfire"):
    # count strong axes that disagree with the dominant frame and with each other
    # We'll define strong by axis strength score >= 0.35 (tunable)
    strength = np.array([_strength_score(r) for _, r in df_img.iterrows()], dtype=float)
    strong_mask = strength >= 0.35

    strong_angles = ang[strong_mask]
    # pairwise disagreement: count pairs with angular distance >= 25 deg
    conflict_pairs = 0
    total_pairs = 0
    if len(strong_angles) >= 2:
        for i in range(len(strong_angles)):
            for j in range(i+1, len(strong_angles)):
                d = _dist_to_axis(strong_angles[i], strong_angles[j])  # 0..90
                total_pairs += 1
                if d >= 25.0:
                    conflict_pairs += 1
    conflict_index = (conflict_pairs / total_pairs) if total_pairs > 0 else 0.0

    # also "dominant frame disagreement": fraction of strong regions far from parent (>25deg)
    if np.isfinite(parent_angle) and len(strong_angles) > 0:
        d_parent = _dist_to_axis(strong_angles, parent_angle)
        frac_disagree_parent = float(np.mean(d_parent >= 25.0))
    else:
        frac_disagree_parent = np.nan

    # ---- save per-region
    for i, r in df_img.iterrows():
        rid = int(r["region_index"])
        rows_region.append({
            "split": split,
            "image_id": image_id,
            "region_index": rid,

            "pca_angle_deg": float(r["pca_angle_deg"]),
            "dist_to_hv_deg": float(dist_hv_deg[i]),
            "global_frame_align_01": float(global_align[i]),

            "parent_region_index": parent_rid,
            "parent_angle_deg": (float(parent_angle) if np.isfinite(parent_angle) else np.nan),
            "child_parent_dist_deg": (float(child_parent_dist[i]) if np.isfinite(child_parent_dist[i]) else np.nan),
            "child_global_dist_deg": float(child_global_dist[i]),
            "local_frame_influence_m11": (float(local_influence[i]) if np.isfinite(local_influence[i]) else np.nan),

            "axis_strength_01": float(strength[i]),
        })

    # ---- per-image summary (weighted by saliency if available, else area_ratio)
    if "saliency" in df_img.columns and np.isfinite(df_img["saliency"].to_numpy(dtype=float)).any():
        w = df_img["saliency"].to_numpy(dtype=float)
    else:
        w = df_img["area_ratio"].to_numpy(dtype=float) if "area_ratio" in df_img.columns else np.ones(len(df_img), dtype=float)
    w = np.where(np.isfinite(w) & (w > 0), w, 0.0)
    wsum = float(w.sum() + 1e-12)

    # weighted global alignment
    img_global_align = float(np.nansum(global_align * w) / wsum)

    # weighted magnitude of local influence (how strong local frame effects are)
    if np.isfinite(local_influence).any():
        img_local_influence_abs = float(np.nansum(np.abs(local_influence) * w) / wsum)
        img_local_influence_signed = float(np.nansum(local_influence * w) / wsum)
    else:
        img_local_influence_abs = np.nan
        img_local_influence_signed = np.nan

    rows_image.append({
        "split": split,
        "image_id": image_id,
        "parent_region_index": parent_rid,
        "parent_angle_deg": (float(parent_angle) if np.isfinite(parent_angle) else np.nan),

        "form1_global_frame_alignment_01": img_global_align,
        "form1_local_frame_influence_abs_01": img_local_influence_abs,     # 0..1
        "form1_local_frame_influence_signed": img_local_influence_signed,  # -1..+1

        "form1_orientation_conflict_index_01": float(conflict_index),      # 0..1
        "form1_frac_strong_disagree_parent_01": frac_disagree_parent,

        "n_regions": int(len(df_img)),
        "n_strong_axes": int(np.sum(strong_mask)),
        "conflict_pairs": int(conflict_pairs),
        "total_pairs": int(total_pairs),
    })

In [11]:
df_form1_regions = pd.DataFrame(rows_region)
df_form1_images  = pd.DataFrame(rows_image)

df_form1_regions.to_csv(os.path.join(OUT_FORM1_DIR, "form1_region_orientation.csv"), index=False)
df_form1_images.to_csv(os.path.join(OUT_FORM1_DIR, "form1_image_orientation.csv"), index=False)

print("Saved:")
print(" -", os.path.join(OUT_FORM1_DIR, "form1_region_orientation.csv"), df_form1_regions.shape)
print(" -", os.path.join(OUT_FORM1_DIR, "form1_image_orientation.csv"),  df_form1_images.shape)

display(df_form1_images.head())

Saved:
 - outputs\form\form1_orientation\form1_region_orientation.csv (366, 12)
 - outputs\form\form1_orientation\form1_image_orientation.csv (128, 13)


Unnamed: 0,split,image_id,parent_region_index,parent_angle_deg,form1_global_frame_alignment_01,form1_local_frame_influence_abs_01,form1_local_frame_influence_signed,form1_orientation_conflict_index_01,form1_frac_strong_disagree_parent_01,n_regions,n_strong_axes,conflict_pairs,total_pairs
0,real,real\images\10pm_feeding_around_the_clock.webp,18,57.090095,0.771307,0.76257,-0.616509,0.043956,1.0,19,14,4,91
1,real,real\images\11pm.webp,0,99.685522,0.784766,1.0,1.0,0.0,,1,0,0,0
2,real,real\images\12_4_7_10.webp,0,85.450598,0.898902,1.0,1.0,0.0,,1,0,0,0
3,real,real\images\Belly_breast.webp,6,96.212515,0.66209,0.439827,0.009007,0.6,0.333333,7,6,9,15
4,real,real\images\Belly_breast_face_brain_placenta.webp,0,0.372193,0.991729,1.0,1.0,0.0,,1,0,0,0


### Projection / foreshortening / “deviation from the simple”

In [12]:
def _as_float_series(s):
    return pd.to_numeric(s, errors="coerce").astype(float)

def _wrap180(a):
    a = np.asarray(a, dtype=float)
    return np.mod(a, 180.0)

def _dist_to_axis(a_deg, b_deg):
    a = np.asarray(a_deg, dtype=float)
    b = np.asarray(b_deg, dtype=float)
    d = np.abs(a - b)
    d = np.minimum(d, 180.0 - d)
    return d  # 0..90

def _min_dist_to_hv(a_deg):
    a = np.asarray(a_deg, dtype=float)
    d0  = _dist_to_axis(a, 0.0)
    d90 = _dist_to_axis(a, 90.0)
    return np.minimum(d0, d90)  # 0..45

def _strength_01_from_eigratio(eig_ratio):
    # log1p saturation to [0,1]
    eig = np.asarray(eig_ratio, dtype=float)
    v = np.log1p(np.maximum(eig, 0.0))
    return np.clip(v / 4.0, 0.0, 1.0)

def _weighted_mean(x, w):
    x = np.asarray(x, dtype=float)
    w = np.asarray(w, dtype=float)
    m = np.isfinite(x) & np.isfinite(w) & (w > 0)
    if m.sum() == 0:
        return np.nan
    return float(np.sum(x[m] * w[m]) / (np.sum(w[m]) + 1e-12))

def _norm01_minmax(x):
    x = np.asarray(x, dtype=float)
    m = np.isfinite(x)
    if m.sum() == 0:
        return x * np.nan
    mn, mx = np.nanmin(x[m]), np.nanmax(x[m])
    if not np.isfinite(mn) or not np.isfinite(mx) or (mx - mn) < 1e-12:
        return np.zeros_like(x, dtype=float)
    return (x - mn) / (mx - mn)

### Affine Distortion Score

In [13]:
def form2_affine_distortion_region(df_img):
    eig = _as_float_series(df_img.get("pca_eig_ratio", np.nan))
    sym = _as_float_series(df_img.get("symmetry_pca", np.nan))

    # anisotropy: 0 if eig_ratio==1, grows as eig_ratio increases
    # log compress then map to [0,1] with soft saturation
    aniso = np.log1p(np.maximum(eig, 1e-9))     # >=0
    aniso01 = np.clip(aniso / 4.0, 0.0, 1.0)

    # symmetry loss: 0 if sym==1, 1 if sym==0
    sym_loss = np.clip(1.0 - sym, 0.0, 1.0)

    # combine (tunable weights)
    # higher = more "projection-like distortion"
    distortion = 0.6 * aniso01 + 0.4 * sym_loss
    return distortion.to_numpy(dtype=float)

#### Symmetry-axis Degradation under tilt / weak-stimulus sweep (per-region)

In [14]:
def form2_symmetry_degradation_region(df_img,
                                      sym_strong_thr=0.65,
                                      use_tilt=True,
                                      use_weak_stim=True):
    sym = _as_float_series(df_img.get("symmetry_pca", np.nan)).to_numpy(dtype=float)

    # Only measure degradation if symmetry is strong enough; else set NaN (not applicable)
    applicable = np.isfinite(sym) & (sym >= float(sym_strong_thr))

    deg_terms = []

    if use_tilt and ("pca_angle_deg" in df_img.columns):
        ang = _as_float_series(df_img["pca_angle_deg"]).to_numpy(dtype=float)
        tilt01 = np.clip(_min_dist_to_hv(ang) / 45.0, 0.0, 1.0)  # 0..1
        deg_terms.append(tilt01)

    if use_weak_stim and ("stability_mask_range" in df_img.columns):
        rngv = _as_float_series(df_img["stability_mask_range"]).to_numpy(dtype=float)
        # normalize per-image to keep scale stable
        rng01 = _norm01_minmax(rngv)
        deg_terms.append(np.clip(rng01, 0.0, 1.0))

    if use_weak_stim and ("stability_mask_mean" in df_img.columns):
        meanv = _as_float_series(df_img["stability_mask_mean"]).to_numpy(dtype=float)
        instab01 = np.clip(1.0 - meanv, 0.0, 1.0)  # lower stability => higher degradation
        deg_terms.append(instab01)

    if len(deg_terms) == 0:
        out = np.full(len(df_img), np.nan, dtype=float)
        return out

    # average the available degradation terms
    deg = np.nanmean(np.stack(deg_terms, axis=0), axis=0)

    # gate by applicability: non-strong symmetry => NaN
    out = np.full(len(df_img), np.nan, dtype=float)
    out[applicable] = np.clip(deg[applicable], 0.0, 1.0)
    return out

### Viewpoint ambiguity proxy (per-image)

In [15]:
def form2_viewpoint_ambiguity_image(df_img,
                                   strong_thr=0.35,
                                   use_angles=True):
    eig = _as_float_series(df_img.get("pca_eig_ratio", np.nan)).to_numpy(dtype=float)
    strength = _strength_01_from_eigratio(eig)  # 0..1
    strong = np.isfinite(strength) & (strength >= float(strong_thr))

    s = strength[strong]
    if len(s) < 2:
        return 0.0, int(np.sum(strong)), np.nan  # ambiguity, n_strong, angle_spread

    # similar-strength competition: if top1 ~= top2 => high ambiguity
    s_sorted = np.sort(s)[::-1]
    top1, top2 = float(s_sorted[0]), float(s_sorted[1])
    similar01 = np.clip(1.0 - ((top1 - top2) / (top1 + 1e-9)), 0.0, 1.0)

    angle_spread01 = np.nan
    if use_angles and ("pca_angle_deg" in df_img.columns):
        ang = _as_float_series(df_img["pca_angle_deg"]).to_numpy(dtype=float)[strong]
        # mean pairwise distance normalized by 90
        if len(ang) >= 2:
            dsum = 0.0
            cnt = 0
            for i in range(len(ang)):
                for j in range(i+1, len(ang)):
                    dsum += float(_dist_to_axis(ang[i], ang[j]))
                    cnt += 1
            mean_d = (dsum / cnt) if cnt > 0 else 0.0
            angle_spread01 = np.clip(mean_d / 90.0, 0.0, 1.0)

    # ambiguity combines "similar strengths" + (optional) angular diversity
    if np.isfinite(angle_spread01):
        amb = float(0.6 * similar01 + 0.4 * angle_spread01)
    else:
        amb = float(similar01)

    return amb, int(np.sum(strong)), angle_spread01

### Run

In [16]:
rows_region = []
rows_image  = []

for (split, image_id), g in df_regions.groupby(["split", "image_id"], sort=False):
    df_img = g.copy().reset_index(drop=True)

    # weights for aggregations
    if "saliency" in df_img.columns and np.isfinite(_as_float_series(df_img["saliency"]).to_numpy()).any():
        w = _as_float_series(df_img["saliency"]).to_numpy(dtype=float)
    elif "area_ratio" in df_img.columns and np.isfinite(_as_float_series(df_img["area_ratio"]).to_numpy()).any():
        w = _as_float_series(df_img["area_ratio"]).to_numpy(dtype=float)
    else:
        w = np.ones(len(df_img), dtype=float)
    w = np.where(np.isfinite(w) & (w > 0), w, 0.0)

    # (1) Affine distortion (per-region + per-image index)
    aff_dist = form2_affine_distortion_region(df_img)              # 0..1
    img_aff_dist = _weighted_mean(aff_dist, w)                     # 0..1

    # (2) Symmetry degradation (per-region + per-image index)
    sym_deg = form2_symmetry_degradation_region(df_img)            # 0..1 for applicable regions, else NaN
    # aggregate: weighted mean over applicable
    img_sym_deg = _weighted_mean(sym_deg, w)                       # 0..1 or NaN if none applicable

    # (3) Viewpoint ambiguity (per-image index)
    img_amb, n_strong, angle_spread01 = form2_viewpoint_ambiguity_image(df_img)

    # save per-region rows
    for i, r in df_img.iterrows():
        rows_region.append({
            "split": split,
            "image_id": image_id,
            "region_index": int(r["region_index"]),

            # helpful carry-through
            "pca_eig_ratio": float(pd.to_numeric(r.get("pca_eig_ratio", np.nan), errors="coerce")),
            "symmetry_pca": float(pd.to_numeric(r.get("symmetry_pca", np.nan), errors="coerce")),

            # FORM-2 region measures
            "form2_affine_distortion_01": float(aff_dist[i]) if np.isfinite(aff_dist[i]) else np.nan,
            "form2_symmetry_degradation_01": float(sym_deg[i]) if np.isfinite(sym_deg[i]) else np.nan,
        })

    # save per-image indices
    rows_image.append({
        "split": split,
        "image_id": image_id,

        # FORM-2 indices (report these)
        "form2_affine_distortion_index_01": float(img_aff_dist) if np.isfinite(img_aff_dist) else np.nan,
        "form2_symmetry_degradation_index_01": float(img_sym_deg) if np.isfinite(img_sym_deg) else np.nan,
        "form2_viewpoint_ambiguity_index_01": float(img_amb),

        # diagnostics
        "n_regions": int(len(df_img)),
        "n_strong_axes": int(n_strong),
        "form2_angle_spread_01": float(angle_spread01) if np.isfinite(angle_spread01) else np.nan,
    })

df_form2_region = pd.DataFrame(rows_region)
df_form2_image  = pd.DataFrame(rows_image)

# Example save paths (edit as needed)
df_form2_region.to_csv(r"outputs/form2_region.csv", index=False)
df_form2_image.to_csv(r"outputs/form2_image.csv", index=False)

print("Form-2 region table:", df_form2_region.shape)
print("Form-2 image table :", df_form2_image.shape)
df_form2_image.head(10)

Form-2 region table: (366, 7)
Form-2 image table : (128, 8)


Unnamed: 0,split,image_id,form2_affine_distortion_index_01,form2_symmetry_degradation_index_01,form2_viewpoint_ambiguity_index_01,n_regions,n_strong_axes,form2_angle_spread_01
0,real,real\images\10pm_feeding_around_the_clock.webp,0.529536,0.177887,0.601091,19,14,0.119052
1,real,real\images\11pm.webp,0.458337,,0.0,1,0,
2,real,real\images\12_4_7_10.webp,0.132365,0.035642,0.0,1,0,
3,real,real\images\Belly_breast.webp,0.664815,,0.70792,7,6,0.422198
4,real,real\images\Belly_breast_face_brain_placenta.webp,0.120302,0.008271,0.0,1,0,
5,real,real\images\Bellyscape_and_plumb_bob.webp,0.149785,0.004707,0.0,1,0,
6,real,real\images\Birth_perspective_from_above_and_b...,0.166507,0.258381,0.0,1,0,
7,real,real\images\Blue_0_with_red_halo.webp,0.153609,0.002657,0.0,1,0,
8,real,real\images\Blue_Nip.webp,0.578118,,0.0,2,1,
9,real,real\images\Blue_eye.webp,0.445482,,0.0,1,0,


### Overlapping & depth-order cues

In [17]:
REGION_CSV = r"F:\New Dissertation - Image Generation\POC\outputs\shape_region_features.csv"
MASK_ROOT  = Path(r"F:\New Dissertation - Image Generation\POC\outputs\form3_masks")  # must match export path
OUT_DIR    = Path(r"outputs\form\form3")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# tuning
MIN_OVERLAP_IOU = 0.002       # very small overlap is enough to flag
DILATE_OVERLAP  = 1           # pixels: allow near-overlap
TJ_WIN          = 7           # local window for polarity
TJ_MIN_SUPPORT  = 3           # min pixels supporting bar direction
MAX_TJ_PER_PAIR = 200         # cap to avoid blowups on noisy images

### Helpers

In [18]:
import cv2


def sanitize_path(s: str) -> str:
    s = s.replace("\\", "/")
    return s.replace(":", "").replace("/", "__")

def load_masks(split, image_id):
    p = MASK_ROOT / split / sanitize_path(image_id) / "regions.npz"
    if not p.exists():
        return None
    z = np.load(str(p), allow_pickle=True)
    masks = z["masks"].astype(np.uint8)  # (n,H,W)
    return masks

def boundary_from_mask(m01):
    m = (m01 > 0).astype(np.uint8) * 255
    k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3,3))
    er = cv2.erode(m, k, iterations=1)
    b = ((m > 0) & (er == 0)).astype(np.uint8)
    return b

def edge_junctions_from_edges(Eb01):
    """
    Approximate edge-junction pixels from binary edge map using 8-neighbor degree.
    T-junction-like points tend to have degree>=3.
    """
    e = (Eb01 > 0).astype(np.uint8)
    # count neighbors via convolution
    kernel = np.array([[1,1,1],[1,0,1],[1,1,1]], dtype=np.uint8)
    deg = cv2.filter2D(e, -1, kernel, borderType=cv2.BORDER_CONSTANT)
    # junction candidates: edge pixel with >=3 neighbors
    j = ((e > 0) & (deg >= 3)).astype(np.uint8)
    return j

def overlap_iou(a, b):
    a = (a > 0); b = (b > 0)
    inter = np.logical_and(a,b).sum()
    union = np.logical_or(a,b).sum()
    return float(inter) / float(union + 1e-12)

def dilate01(m01, k=1):
    if k <= 0:
        return (m01 > 0).astype(np.uint8)
    ker = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2*k+1, 2*k+1))
    return (cv2.dilate((m01>0).astype(np.uint8), ker, iterations=1) > 0).astype(np.uint8)

In [19]:
def local_polarity_vote(j_pt_rc, bA, bB, Eb, win=7):
    """
    Decide occluder at a junction point using a simple polarity heuristic:
    - Look in a window around the junction
    - Count edge continuity on each region boundary
    - The region whose boundary is more continuous through the junction is treated as 'occluder' (bar),
      the one that terminates is treated as 'occluded' (stem).
    Returns:
      occluder (0/1) where 0->A occludes B, 1->B occludes A, or None if ambiguous.
    """
    r, c = j_pt_rc
    H, W = Eb.shape
    half = win//2
    r0 = max(0, r-half); r1 = min(H, r+half+1)
    c0 = max(0, c-half); c1 = min(W, c+half+1)

    # local edge support on boundaries
    Eb_loc = Eb[r0:r1, c0:c1] > 0
    a_loc  = (bA[r0:r1, c0:c1] > 0) & Eb_loc
    b_loc  = (bB[r0:r1, c0:c1] > 0) & Eb_loc

    sa = int(a_loc.sum())
    sb = int(b_loc.sum())

    # if both tiny -> can't decide
    if sa < TJ_MIN_SUPPORT and sb < TJ_MIN_SUPPORT:
        return None

    # if one much larger -> that one likely continues (bar)
    if sa >= sb + 2:
        return 0  # A occludes B
    if sb >= sa + 2:
        return 1  # B occludes A

    return None

In [20]:
def build_depth_graph_for_image(split, image_id, gdf, masks):
    """
    Produces:
      per_region rows (dominance stats)
      per_pair edges (occlusion edges + overlap edges)
      per_image summary row
    """
    n = masks.shape[0]
    H, W = masks.shape[1], masks.shape[2]

    # optional: if you still have Eb saved somewhere, great.
    # If not, we approximate an edge map from region boundaries (works OK for occlusion evidence).
    boundaries = [boundary_from_mask(masks[i]) for i in range(n)]
    Eb = np.zeros((H,W), np.uint8)
    for b in boundaries:
        Eb = np.maximum(Eb, b)
    junctions = edge_junctions_from_edges(Eb)

    # overlap candidates
    rows_pair = []
    occl_counts = np.zeros(n, dtype=int)
    occd_counts = np.zeros(n, dtype=int)

    # region strength weights (for dominance weighting if needed)
    if "saliency" in gdf.columns and np.isfinite(pd.to_numeric(gdf["saliency"], errors="coerce")).any():
        w = pd.to_numeric(gdf["saliency"], errors="coerce").to_numpy(dtype=float)
    else:
        w = pd.to_numeric(gdf.get("area_ratio", pd.Series(np.ones(n))), errors="coerce").to_numpy(dtype=float)
    w = np.where(np.isfinite(w) & (w>0), w, 0.0)

    # Pairwise overlap + occlusion
    for i in range(n):
        Ai = masks[i]
        bA = boundaries[i]
        for j in range(i+1, n):
            Bj = masks[j]
            bB = boundaries[j]

            # ---- (1) overlap detection ----
            Ai2 = dilate01(Ai, DILATE_OVERLAP)
            Bj2 = dilate01(Bj, DILATE_OVERLAP)
            iou = overlap_iou(Ai2, Bj2)

            is_overlap = (iou >= MIN_OVERLAP_IOU)

            # ---- (2) occlusion evidence: T-junctions near both boundaries ----
            # candidate junction pixels close to both boundaries
            near = (junctions > 0) & ((cv2.dilate(bA, np.ones((3,3),np.uint8))>0) & (cv2.dilate(bB, np.ones((3,3),np.uint8))>0))
            pts = np.argwhere(near)

            # polarity voting
            votes_A_over_B = 0
            votes_B_over_A = 0
            used = 0

            if len(pts) > 0:
                # cap for safety
                if len(pts) > MAX_TJ_PER_PAIR:
                    pts = pts[np.random.choice(len(pts), MAX_TJ_PER_PAIR, replace=False)]
                for (r,c) in pts:
                    v = local_polarity_vote((int(r),int(c)), bA, bB, Eb, win=TJ_WIN)
                    if v == 0: votes_A_over_B += 1
                    elif v == 1: votes_B_over_A += 1
                    used += 1

            # infer direction if enough imbalance
            occl_dir = None
            if (votes_A_over_B + votes_B_over_A) >= 3:
                if votes_A_over_B >= votes_B_over_A + 2:
                    occl_dir = (i, j)  # i occludes j
                elif votes_B_over_A >= votes_A_over_B + 2:
                    occl_dir = (j, i)  # j occludes i

            if occl_dir is not None:
                occl_counts[occl_dir[0]] += 1
                occd_counts[occl_dir[1]] += 1

            rows_pair.append({
                "split": split,
                "image_id": image_id,
                "i": i, "j": j,
                "overlap_iou": float(iou),
                "is_overlap": int(is_overlap),

                "tj_points": int(len(pts)),
                "votes_i_over_j": int(votes_A_over_B),
                "votes_j_over_i": int(votes_B_over_A),

                "occluder": int(occl_dir[0]) if occl_dir else np.nan,
                "occluded": int(occl_dir[1]) if occl_dir else np.nan,
                "has_occlusion": int(occl_dir is not None),
            })

    # ---- (3) depth-order graph ----
    G = nx.DiGraph()
    for rid in range(n):
        G.add_node(rid, weight=float(w[rid]) if rid < len(w) else 1.0)

    for row in rows_pair:
        if row["has_occlusion"] == 1:
            a = int(row["occluder"]); b = int(row["occluded"])
            if G.has_edge(a,b):
                G[a][b]["support"] += 1
            else:
                G.add_edge(a,b, support=1)

    # ---- (4) dominance / subservience per region ----
    rows_region = []
    for rid in range(n):
        oc = int(occl_counts[rid])
        od = int(occd_counts[rid])
        denom = oc + od
        dom = float(oc / denom) if denom > 0 else np.nan  # undefined if no evidence
        rows_region.append({
            "split": split,
            "image_id": image_id,
            "region_index": int(rid),
            "occludes_count": oc,
            "occluded_count": od,
            "dominance_index_01": dom,  # 0..1
        })

    # image summary
    total_occl = int(np.sum(occl_counts))
    denom_all = int(np.sum(occl_counts + occd_counts))
    img_dom = float(np.sum(occl_counts) / denom_all) if denom_all > 0 else np.nan

    rows_image = [{
        "split": split,
        "image_id": image_id,
        "n_regions": int(n),
        "overlap_pairs": int(sum(1 for r in rows_pair if r["is_overlap"]==1)),
        "occlusion_edges": int(G.number_of_edges()),
        "total_occlusion_votes": int(total_occl),
        "form3_dominance_subservience_index_01": img_dom,
    }]

    return rows_region, rows_pair, rows_image, G

# ----------------------------
# RUN ALL IMAGES
# ----------------------------
df_regions = pd.read_csv(REGION_CSV)

all_region = []
all_pair   = []
all_image  = []

graphs = {}  # optional: keep in memory

for (split, image_id), gdf in df_regions.groupby(["split","image_id"], sort=False):
    masks = load_masks(split, image_id)
    if masks is None:
        print("Missing masks for:", split, image_id)
        continue

    r_rows, p_rows, i_rows, G = build_depth_graph_for_image(split, image_id, gdf, masks)

    all_region.extend(r_rows)
    all_pair.extend(p_rows)
    all_image.extend(i_rows)
    graphs[(split, image_id)] = G

In [21]:
df_form3_region = pd.DataFrame(all_region)
df_form3_pair   = pd.DataFrame(all_pair)
df_form3_image  = pd.DataFrame(all_image)

df_form3_region.to_csv(OUT_DIR / "form3_region.csv", index=False)
df_form3_pair.to_csv(OUT_DIR / "form3_pairs.csv", index=False)
df_form3_image.to_csv(OUT_DIR / "form3_image.csv", index=False)

print("Saved:")
print(" ", OUT_DIR / "form3_region.csv", df_form3_region.shape)
print(" ", OUT_DIR / "form3_pairs.csv", df_form3_pair.shape)
print(" ", OUT_DIR / "form3_image.csv", df_form3_image.shape)

Saved:
  outputs\form\form3\form3_region.csv (366, 6)
  outputs\form\form3\form3_pairs.csv (769, 12)
  outputs\form\form3\form3_image.csv (128, 7)


### Completion & Continuity under Disruption

In [22]:
DATA_ROOT = Path("data")  # expects data/real and data/generated
OUT_DIR = Path("outputs/form4")
OUT_DIR.mkdir(parents=True, exist_ok=True)

SIGMAS = (1, 2, 4)
EDGE_PERCENTILE = 85.0
MIN_AREA_RATIO = 0.00005
CLOSE_K_RATIO = 0.004
CLOSE_ITERS = 1

# Linking hyperparams (tunable)
MAX_LINK_DIST_FRAC = 0.08   # max endpoint distance as fraction of image diagonal
NBR_WIN = 9                 # local neighborhood for tangent estimate
COLLIN_W = 0.55
DIST_W   = 0.25
EDGE_W   = 0.20

# For plausibility
TOP_LINKS_PER_IMAGE = 12    # evaluate strongest K candidate links
PLAUS_ENDPT_GAIN_W  = 0.7
PLAUS_TURN_W        = 0.3

# For amputation risk
CURV_WIN = 11               # curvature window on contour
CURV_THRESH = 0.35          # higher => more “corner/concavity”
STUMP_CLOSE_EDGE_FRAC = 0.03

### Basic Image + Edge Pipe

In [23]:
def read_image_bgr(path: str) -> np.ndarray:
    img = cv2.imread(path, cv2.IMREAD_UNCHANGED)
    if img is None:
        raise FileNotFoundError(path)
    if img.ndim == 2:
        return cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
    if img.shape[2] == 4:
        bgr = img[:, :, :3].astype(np.float32)
        alpha = img[:, :, 3:4].astype(np.float32) / 255.0
        white = np.full_like(bgr, 255.0, dtype=np.float32)
        comp = bgr * alpha + white * (1.0 - alpha)
        return np.clip(comp, 0, 255).astype(np.uint8)
    return img[:, :, :3]

def preprocess_gray01(path: str) -> np.ndarray:
    bgr = read_image_bgr(path)
    gray = cv2.cvtColor(bgr, cv2.COLOR_BGR2GRAY).astype(np.float32) / 255.0
    # mild denoise (same spirit as your pipeline)
    gray = cv2.GaussianBlur(gray, (0,0), 1.2)
    return gray

def gaussian_blur(gray_f: np.ndarray, sigma: float) -> np.ndarray:
    return cv2.GaussianBlur(gray_f, (0, 0), sigmaX=sigma, sigmaY=sigma)

def sobel_magnitude(gray_f: np.ndarray) -> np.ndarray:
    gx = cv2.Sobel(gray_f, cv2.CV_32F, 1, 0, ksize=3)
    gy = cv2.Sobel(gray_f, cv2.CV_32F, 0, 1, ksize=3)
    return cv2.magnitude(gx, gy)

def normalize_01(x: np.ndarray, eps: float = 1e-6) -> np.ndarray:
    x = x.astype(np.float32)
    mn, mx = float(np.min(x)), float(np.max(x))
    return (x - mn) / (mx - mn + eps)

def multiscale_edge_strength(gray_f: np.ndarray, sigmas=(1,2,4)):
    mags = []
    for s in sigmas:
        sm = gaussian_blur(gray_f, float(s))
        mag = sobel_magnitude(sm)
        mags.append(normalize_01(mag))
    E = np.max(np.stack(mags, axis=0), axis=0)
    return E

def remove_small_components(binary01: np.ndarray, min_area_px: int) -> np.ndarray:
    num, labels, stats, _ = cv2.connectedComponentsWithStats(binary01.astype(np.uint8), connectivity=8)
    out = np.zeros_like(binary01, dtype=np.uint8)
    for i in range(1, num):
        if stats[i, cv2.CC_STAT_AREA] >= min_area_px:
            out[labels == i] = 1
    return out

def morph_close(binary01: np.ndarray, k: int, iters: int) -> np.ndarray:
    if k % 2 == 0: k += 1
    k = max(3, int(k))
    ker = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k,k))
    closed = cv2.morphologyEx((binary01*255).astype(np.uint8), cv2.MORPH_CLOSE, ker, iterations=int(iters))
    return (closed > 0).astype(np.uint8)

def make_edge_maps(gray01: np.ndarray):
    H, W = gray01.shape
    E = multiscale_edge_strength(gray01, sigmas=SIGMAS)
    thr = np.percentile(E, EDGE_PERCENTILE)
    Eb = (E >= thr).astype(np.uint8)

    min_area_px = int(max(10, round(MIN_AREA_RATIO * H * W)))
    Eb = remove_small_components(Eb, min_area_px=min_area_px)

    k = int(max(3, round(CLOSE_K_RATIO * min(H, W))))
    Eb = morph_close(Eb, k=k, iters=CLOSE_ITERS)

    return E, Eb

### Edge Skeleton + Endpoints

In [24]:
def zhang_suen_thinning(bin01: np.ndarray) -> np.ndarray:
    img = bin01.copy().astype(np.uint8)
    img[img > 0] = 1
    H, W = img.shape

    def neighbors(x, y):
        return [img[x-1,y], img[x-1,y+1], img[x,y+1], img[x+1,y+1],
                img[x+1,y], img[x+1,y-1], img[x,y-1], img[x-1,y-1]]

    def transitions(nb):
        n = nb + [nb[0]]
        return sum((n[i] == 0 and n[i+1] == 1) for i in range(8))

    changed = True
    while changed:
        changed = False
        to_del = []
        for x in range(1, H-1):
            for y in range(1, W-1):
                if img[x,y] != 1:
                    continue
                nb = neighbors(x,y)
                B = sum(nb)
                A = transitions(nb)
                if 2 <= B <= 6 and A == 1 and (nb[0]*nb[2]*nb[4] == 0) and (nb[2]*nb[4]*nb[6] == 0):
                    to_del.append((x,y))
        if to_del:
            for x,y in to_del: img[x,y] = 0
            changed = True

        to_del = []
        for x in range(1, H-1):
            for y in range(1, W-1):
                if img[x,y] != 1:
                    continue
                nb = neighbors(x,y)
                B = sum(nb)
                A = transitions(nb)
                if 2 <= B <= 6 and A == 1 and (nb[0]*nb[2]*nb[6] == 0) and (nb[0]*nb[4]*nb[6] == 0):
                    to_del.append((x,y))
        if to_del:
            for x,y in to_del: img[x,y] = 0
            changed = True
    return img

def skeleton_degrees(skel01: np.ndarray) -> np.ndarray:
    sk = (skel01 > 0).astype(np.uint8)
    H, W = sk.shape
    deg = np.zeros((H,W), np.uint8)
    pts = np.argwhere(sk > 0)
    for r,c in pts:
        n = 0
        for dr in (-1,0,1):
            for dc in (-1,0,1):
                if dr==0 and dc==0:
                    continue
                rr, cc = r+dr, c+dc
                if 0 <= rr < H and 0 <= cc < W and sk[rr,cc] > 0:
                    n += 1
        deg[r,c] = n
    return deg

def edge_endpoints(Eb01: np.ndarray):
    sk = zhang_suen_thinning(Eb01.astype(np.uint8))
    deg = skeleton_degrees(sk)
    eps = np.argwhere((sk > 0) & (deg == 1))  # (r,c)
    return sk, eps

### Good Continuation Link Score

In [25]:
def _unit(v, eps=1e-9):
    v = np.asarray(v, dtype=float)
    n = np.linalg.norm(v)
    return v / (n + eps)

def estimate_tangent(skel01: np.ndarray, pt_rc, win=NBR_WIN):
    """Local PCA on skeleton pixels in a window → tangent direction (dx,dy) in x,y coords."""
    r,c = int(pt_rc[0]), int(pt_rc[1])
    H,W = skel01.shape
    h = win//2
    r0,r1 = max(0,r-h), min(H,r+h+1)
    c0,c1 = max(0,c-h), min(W,c+h+1)
    ys,xs = np.where(skel01[r0:r1, c0:c1] > 0)
    if len(xs) < 5:
        return np.array([1.0, 0.0], dtype=float)
    xs = xs + c0
    ys = ys + r0
    pts = np.stack([xs.astype(float), ys.astype(float)], axis=1) # (x,y)
    ctr = pts.mean(axis=0)
    X = pts - ctr
    C = (X.T @ X) / max(1, len(X)-1)
    eigvals, eigvecs = np.linalg.eigh(C)
    v = eigvecs[:, np.argmax(eigvals)]  # principal axis
    return _unit(v)

def sample_edge_strength_along_segment(E01: np.ndarray, a_rc, b_rc, n=30):
    """Mean E along straight segment between endpoints."""
    ar,ac = float(a_rc[0]), float(a_rc[1])
    br,bc = float(b_rc[0]), float(b_rc[1])
    H,W = E01.shape
    rs = np.linspace(ar, br, n)
    cs = np.linspace(ac, bc, n)
    vals = []
    for r,c in zip(rs,cs):
        rr = int(np.clip(round(r), 0, H-1))
        cc = int(np.clip(round(c), 0, W-1))
        vals.append(float(E01[rr,cc]))
    return float(np.mean(vals)) if vals else 0.0

def link_score(E01, skel01, a_rc, b_rc, diag):
    """
    Good-continuation score in [0,1]:
    - collinearity: tangent at endpoints aligns with connecting vector
    - distance penalty: closer is better
    - edge support: if segment crosses high E, more plausible
    """
    a = np.array([float(a_rc[1]), float(a_rc[0])])  # (x,y)
    b = np.array([float(b_rc[1]), float(b_rc[0])])
    v = b - a
    d = float(np.linalg.norm(v))
    if d < 1e-6:
        return 0.0, d, 0.0, 0.0

    vhat = _unit(v)

    ta = estimate_tangent(skel01, a_rc)
    tb = estimate_tangent(skel01, b_rc)

    # endpoint tangents should point *toward* the connection (directionless)
    col_a = abs(float(np.dot(ta, vhat)))
    col_b = abs(float(np.dot(tb, -vhat)))
    col = 0.5*(col_a + col_b)  # 0..1

    # distance term (0..1), soft decay
    d01 = np.clip(d / (MAX_LINK_DIST_FRAC*diag + 1e-9), 0.0, 1.0)
    dist_term = 1.0 - d01

    # edge support (0..1)
    edge_term = sample_edge_strength_along_segment(E01, a_rc, b_rc, n=30)

    score = COLLIN_W*col + DIST_W*dist_term + EDGE_W*edge_term
    return float(np.clip(score, 0.0, 1.0)), d, col, edge_term

def build_links_for_image(E01, Eb01):
    H,W = Eb01.shape
    diag = float(np.hypot(H,W))
    skel01, eps = edge_endpoints(Eb01)

    # candidate pairs within max distance
    max_d = MAX_LINK_DIST_FRAC * diag
    links = []
    for i in range(len(eps)):
        for j in range(i+1, len(eps)):
            d = np.linalg.norm(eps[j].astype(float) - eps[i].astype(float))
            if d <= max_d:
                s, dist_px, col, edge_term = link_score(E01, skel01, eps[i], eps[j], diag)
                links.append((i, j, s, dist_px, col, edge_term))

    # sort by score desc
    links.sort(key=lambda x: x[2], reverse=True)
    return skel01, eps, links

### Completion Plausibility

In [26]:
def mean_turning_at_endpoint(skel01, pt_rc, win=11):
    """
    Quick local “simplicity” proxy:
    - take skeleton pixels in window, compute PCA eccentricity
    - higher eccentricity => more line-like continuation => simpler
    """
    r,c = int(pt_rc[0]), int(pt_rc[1])
    H,W = skel01.shape
    h = win//2
    r0,r1 = max(0,r-h), min(H,r+h+1)
    c0,c1 = max(0,c-h), min(W,c+h+1)
    ys,xs = np.where(skel01[r0:r1, c0:c1] > 0)
    if len(xs) < 10:
        return 0.0
    xs = xs + c0
    ys = ys + r0
    pts = np.stack([xs.astype(float), ys.astype(float)], axis=1)
    ctr = pts.mean(axis=0)
    X = pts - ctr
    C = (X.T @ X) / max(1, len(X)-1)
    eigvals, _ = np.linalg.eigh(C)
    eigvals = np.sort(eigvals)[::-1]
    e1, e2 = float(eigvals[0]), float(eigvals[1] + 1e-9)
    ecc = e1 / e2
    # map to 0..1 (soft saturate)
    return float(np.clip(np.log1p(max(ecc,0.0))/4.0, 0.0, 1.0))

def plausibility_from_top_links(skel01, eps, links):
    """
    If we connect good pairs, endpoints should reduce and local continuation should become more line-like.
    We simulate by “virtually” reducing endpoints count based on accepted links, and compute average tangent simplicity.
    """
    if len(eps) == 0:
        return np.nan

    used = set()
    accepted = []
    for (i,j,s,dist_px,col,edge_term) in links[:TOP_LINKS_PER_IMAGE]:
        if i in used or j in used:
            continue
        if s < 0.55:  # accept only strong continuations
            continue
        used.add(i); used.add(j)
        accepted.append((i,j,s))

    # endpoints reduction proxy: each accepted link “resolves” 2 endpoints
    resolved = 2 * len(accepted)
    gain = resolved / max(1, len(eps))  # 0..1

    # local simplicity near endpoints involved
    sims = []
    for (i,j,s) in accepted:
        sims.append(mean_turning_at_endpoint(skel01, eps[i]))
        sims.append(mean_turning_at_endpoint(skel01, eps[j]))
    sim = float(np.mean(sims)) if sims else 0.0

    plaus = PLAUS_ENDPT_GAIN_W*gain + PLAUS_TURN_W*sim
    return float(np.clip(plaus, 0.0, 1.0)), len(accepted), gain, sim

### Amputation Risk

In [27]:
def contour_curvature_points(Eb01, win=CURV_WIN):
    """
    Extract contour points from Eb, estimate curvature magnitude at each point.
    Returns list of (r,c,curv01).
    """
    e = (Eb01*255).astype(np.uint8)
    contours, _ = cv2.findContours(e, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)
    out = []
    if not contours:
        return out

    w = max(5, int(win) | 1)
    h = w//2

    for cnt in contours:
        pts = cnt[:,0,:]  # (x,y)
        if len(pts) < w + 2:
            continue
        # make cyclic to avoid edges
        P = np.vstack([pts[-h:], pts, pts[:h]])
        for i in range(h, len(P)-h):
            p0 = P[i-h].astype(float)
            p1 = P[i].astype(float)
            p2 = P[i+h].astype(float)
            v1 = p0 - p1
            v2 = p2 - p1
            n1 = np.linalg.norm(v1) + 1e-9
            n2 = np.linalg.norm(v2) + 1e-9
            c = abs(float(np.dot(v1, v2) / (n1*n2)))   # collinearity
            curv = 1.0 - c                             # curvature-ish
            x,y = int(p1[0]), int(p1[1])
            out.append((y, x, float(np.clip(curv, 0.0, 1.0))))  # store as (r,c)
    return out

def amputation_risk(Eb01, skel01, eps):
    """
    “High risk stump” proxy:
    - endpoint sits near a high-curvature contour zone (disruption near concavity/joint)
    - and the endpoint neighborhood is locally edge-dense (looks complete)
    """
    H,W = Eb01.shape
    diag = float(np.hypot(H,W))
    close_r = STUMP_CLOSE_EDGE_FRAC * diag

    curv_pts = contour_curvature_points(Eb01, win=CURV_WIN)
    if len(curv_pts) == 0 or len(eps) == 0:
        return np.nan, 0, 0

    # build quick list for nearest-curvature search (brutal but OK for your sizes)
    curv_rc = np.array([(p[0], p[1]) for p in curv_pts], dtype=float)
    curv_v  = np.array([p[2] for p in curv_pts], dtype=float)

    risky = 0
    checked = 0

    for ep in eps:
        checked += 1
        d = np.linalg.norm(curv_rc - ep.astype(float), axis=1)
        j = int(np.argmin(d))
        if float(d[j]) > close_r:
            continue

        curv = float(curv_v[j])
        if curv < CURV_THRESH:
            continue

        # local edge density around endpoint (stump “looks complete” if dense)
        r,c = int(ep[0]), int(ep[1])
        win = 11
        h = win//2
        r0,r1 = max(0,r-h), min(H,r+h+1)
        c0,c1 = max(0,c-h), min(W,c+h+1)
        dens = float(np.mean(Eb01[r0:r1, c0:c1]))

        if dens >= 0.15:
            risky += 1

    risk01 = risky / max(1, checked)
    return float(np.clip(risk01, 0.0, 1.0)), risky, checked

### Run Batch for All Images

In [28]:
# ---- Safety knobs (useful if it hangs)
MAX_IMAGES = None          # e.g. 50 for testing; None = all
MAX_ENDPOINTS = 250        # cap endpoints to avoid O(N^2) blowups
DENSITY_THINNING_SKIP = 0.18  # if Eb is too dense, skip thinning/linking

def iter_images(data_root: Path):
    exts = {".png", ".jpg", ".jpeg", ".webp", ".tif", ".tiff", ".bmp"}
    for split in ["real", "generated"]:
        base = data_root / split
        if not base.exists():
            continue
        for p in base.rglob("*"):
            if p.suffix.lower() in exts:
                rel = str(p.relative_to(data_root))
                yield split, rel, str(p)

def edge_endpoints_safe(Eb01: np.ndarray):
    """Endpoints with density guard (prevents thinning hangs on very dense edge maps)."""
    dens = float(Eb01.mean())
    if dens > DENSITY_THINNING_SKIP:
        # Too dense => thinning expensive + endpoints not meaningful anyway
        sk = np.zeros_like(Eb01, dtype=np.uint8)
        eps = np.zeros((0, 2), dtype=int)
        return sk, eps, dens, True
    sk = zhang_suen_thinning(Eb01.astype(np.uint8))
    deg = skeleton_degrees(sk)
    eps = np.argwhere((sk > 0) & (deg == 1))
    return sk, eps, dens, False

def build_links_for_image_safe(E01, Eb01):
    H, W = Eb01.shape
    diag = float(np.hypot(H, W))

    skel01, eps, dens, skipped = edge_endpoints_safe(Eb01)

    # cap endpoints to avoid O(N^2)
    if len(eps) > MAX_ENDPOINTS:
        eps = eps[:MAX_ENDPOINTS]

    max_d = MAX_LINK_DIST_FRAC * diag
    links = []

    if len(eps) >= 2 and not skipped:
        for i in range(len(eps)):
            ei = eps[i].astype(float)
            for j in range(i + 1, len(eps)):
                d = float(np.linalg.norm(eps[j].astype(float) - ei))
                if d <= max_d:
                    s, dist_px, col, edge_term = link_score(E01, skel01, eps[i], eps[j], diag)
                    links.append((i, j, s, dist_px, col, edge_term))

        links.sort(key=lambda x: x[2], reverse=True)

    return skel01, eps, links, dens, skipped

In [29]:
import time

rows_links = []
rows_img = []

t0 = time.time()
ok = 0
fail = 0
total = 0

for k, (split, image_id, path) in enumerate(iter_images(DATA_ROOT), start=1):
    if MAX_IMAGES is not None and k > MAX_IMAGES:
        break

    total += 1
    t_img0 = time.time()

    try:
        gray01 = preprocess_gray01(path)
        E01, Eb01 = make_edge_maps(gray01)

        H, W = Eb01.shape
        skel01, eps, links, dens, skipped = build_links_for_image_safe(E01, Eb01)

        # (1) Good-continuation index (top K mean score)
        top = links[:TOP_LINKS_PER_IMAGE]
        gc01 = float(np.mean([x[2] for x in top])) if len(top) else 0.0

        for (i, j, s, dist_px, col, edge_term) in top:
            a = eps[i]; b = eps[j]
            rows_links.append({
                "split": split,
                "image_id": image_id,
                "ep_i": int(i),
                "ep_j": int(j),
                "a_r": int(a[0]), "a_c": int(a[1]),
                "b_r": int(b[0]), "b_c": int(b[1]),
                "link_score_01": float(s),
                "dist_px": float(dist_px),
                "collinearity_01": float(col),
                "edge_support_01": float(edge_term),
            })

        # (2) Completion plausibility
        plaus = plausibility_from_top_links(skel01, eps, links)
        if isinstance(plaus, tuple):
            plaus01, n_acc, gain01, simp01 = plaus
        else:
            plaus01, n_acc, gain01, simp01 = np.nan, 0, np.nan, np.nan

        # (3) Amputation risk
        risk01, risky_n, checked_n = amputation_risk(Eb01, skel01, eps)

        rows_img.append({
            "split": split,
            "image_id": image_id,

            "form4_good_continuation_01": float(gc01),
            "form4_completion_plausibility_01": (float(plaus01) if np.isfinite(plaus01) else np.nan),
            "form4_amputation_risk_01": (float(risk01) if np.isfinite(risk01) else np.nan),

            "n_endpoints": int(len(eps)),
            "n_candidate_links": int(len(links)),
            "n_accepted_links": int(n_acc),

            "endpoint_gain_01": (float(gain01) if np.isfinite(gain01) else np.nan),
            "local_simplicity_01": (float(simp01) if np.isfinite(simp01) else np.nan),

            "risky_endpoints": int(risky_n),
            "checked_endpoints": int(checked_n),

            "edge_density_01": float(dens),
            "thinning_skipped": int(bool(skipped)),

            "H": int(H),
            "W": int(W),
        })

        ok += 1
        dt = time.time() - t_img0
        print(f"[{k}] OK  {split} | {image_id} | dens={dens:.3f} skip={int(skipped)} "
              f"eps={len(eps)} links={len(links)} topK={len(top)} t={dt:.2f}s")

    except Exception as e:
        fail += 1
        dt = time.time() - t_img0
        print(f"[{k}] FAIL {split} | {image_id} | {type(e).__name__}: {e} (t={dt:.2f}s)")

[1] OK  real | real\images\10pm_feeding_around_the_clock.webp | dens=0.150 skip=0 eps=88 links=197 topK=12 t=9.53s
[2] OK  real | real\images\11pm.webp | dens=0.150 skip=0 eps=13 links=43 topK=12 t=6.99s
[3] OK  real | real\images\12_4_7_10.webp | dens=0.151 skip=0 eps=173 links=1556 topK=12 t=10.65s
[4] OK  real | real\images\around_the_clock.webp | dens=0.150 skip=0 eps=43 links=81 topK=12 t=10.81s
[5] OK  real | real\images\around_the_clock_alizarin.webp | dens=0.151 skip=0 eps=8 links=9 topK=9 t=20.84s
[6] OK  real | real\images\Bellyscape_and_plumb_bob.webp | dens=0.152 skip=0 eps=21 links=24 topK=12 t=15.58s
[7] OK  real | real\images\Belly_breast.webp | dens=0.147 skip=0 eps=250 links=3123 topK=12 t=14.49s
[8] OK  real | real\images\Belly_breast_face_brain_placenta.webp | dens=0.148 skip=0 eps=190 links=1228 topK=12 t=7.89s
[9] OK  real | real\images\Birth_perspective_from_above_and_below.webp | dens=0.149 skip=0 eps=157 links=977 topK=12 t=14.35s
[10] OK  real | real\images\Blu

In [30]:
df_links = pd.DataFrame(rows_links)
df_img = pd.DataFrame(rows_img)

df_links.to_csv(OUT_DIR / "form4_links.csv", index=False)
df_img.to_csv(OUT_DIR / "form4_image_indices.csv", index=False)

print("\nSaved:", OUT_DIR / "form4_links.csv", "rows=", len(df_links))
print("Saved:", OUT_DIR / "form4_image_indices.csv", "rows=", len(df_img))
print(f"Done. ok={ok} fail={fail} total={total} elapsed={time.time()-t0:.1f}s")

df_img.head(10)


Saved: outputs\form4\form4_links.csv rows= 1342
Saved: outputs\form4\form4_image_indices.csv rows= 130
Done. ok=130 fail=0 total=130 elapsed=2692.6s


Unnamed: 0,split,image_id,form4_good_continuation_01,form4_completion_plausibility_01,form4_amputation_risk_01,n_endpoints,n_candidate_links,n_accepted_links,endpoint_gain_01,local_simplicity_01,risky_endpoints,checked_endpoints,edge_density_01,thinning_skipped,H,W
0,real,real\images\10pm_feeding_around_the_clock.webp,0.771539,0.202122,0.352273,88,197,10,0.227273,0.143435,31,88,0.150277,0,749,750
1,real,real\images\11pm.webp,0.621331,0.430769,0.538462,13,43,4,0.615385,0.0,7,13,0.150075,0,642,500
2,real,real\images\12_4_7_10.webp,0.815214,0.09711,0.416185,173,1556,12,0.138728,0.0,72,173,0.151121,0,749,750
3,real,real\images\around_the_clock.webp,0.663651,0.335998,0.372093,43,81,10,0.465116,0.034722,16,43,0.150061,0,799,750
4,real,real\images\around_the_clock_alizarin.webp,0.497145,0.525,0.25,8,9,3,0.75,0.0,2,8,0.150786,0,704,1000
5,real,real\images\Bellyscape_and_plumb_bob.webp,0.746824,0.333333,0.428571,21,24,5,0.47619,0.0,9,21,0.15176,0,595,750
6,real,real\images\Belly_breast.webp,0.838962,0.0672,0.548,250,3123,12,0.096,0.0,137,250,0.14707,0,667,1000
7,real,real\images\Belly_breast_face_brain_placenta.webp,0.822282,0.094294,0.352632,190,1228,11,0.115789,0.044139,67,190,0.148355,0,680,750
8,real,real\images\Birth_perspective_from_above_and_b...,0.820921,0.139384,0.426752,157,977,12,0.152866,0.107926,67,157,0.148619,0,571,750
9,real,real\images\Blue_0_with_red_halo.webp,0.0,,,0,0,0,,,0,0,0.152359,0,640,500


### Symmetry, repetition, ornamentality

#### Helpers

In [31]:
def _as_float_series(s):
    return pd.to_numeric(s, errors="coerce").astype(float)

def _as_float_scalar(x, default=np.nan):
    try:
        v = pd.to_numeric(x, errors="coerce")
        if v is None:
            return default
        if isinstance(v, (np.ndarray, list, tuple)):
            v = np.array(v).ravel()[0] if len(v) else default
        v = float(v)
        return v if np.isfinite(v) else default
    except Exception:
        return default

def _wrap180(a):
    a = np.asarray(a, dtype=float)
    return np.mod(a, 180.0)

def _dist_to_axis(a_deg, b_deg):
    a = np.asarray(a_deg, dtype=float)
    b = np.asarray(b_deg, dtype=float)
    d = np.abs(a - b)
    d = np.minimum(d, 180.0 - d)  # 0..90
    return d

def _min_dist_to_hv(a_deg):
    a = np.asarray(a_deg, dtype=float)
    d0  = _dist_to_axis(a, 0.0)
    d90 = _dist_to_axis(a, 90.0)
    return np.minimum(d0, d90)     # 0..45

def _safe_cols(df, cols):
    return [c for c in cols if c in df.columns]

def _pick_centroid_cols(df_img):
    # common names we might have used earlier
    candidates = [
        ("cx","cy"),
        ("centroid_x","centroid_y"),
        ("x_centroid","y_centroid"),
        ("center_x","center_y"),
        ("cX","cY"),
    ]
    for xcol, ycol in candidates:
        if xcol in df_img.columns and ycol in df_img.columns:
            return xcol, ycol
    return None, None

def _weights(df_img):
    # prefer saliency, else area_ratio, else uniform
    if "saliency" in df_img.columns and np.isfinite(_as_float_series(df_img["saliency"])).any():
        w = _as_float_series(df_img["saliency"]).to_numpy()
    elif "area_ratio" in df_img.columns and np.isfinite(_as_float_series(df_img["area_ratio"])).any():
        w = _as_float_series(df_img["area_ratio"]).to_numpy()
    else:
        w = np.ones(len(df_img), dtype=float)
    w = np.where(np.isfinite(w) & (w > 0), w, 0.0)
    return w

def _normalize01(x, lo=None, hi=None):
    x = np.asarray(x, dtype=float)
    if lo is None: lo = np.nanmin(x)
    if hi is None: hi = np.nanmax(x)
    return np.clip((x - lo) / (hi - lo + 1e-9), 0.0, 1.0)

In [32]:
def reflection_symmetry_region(row):
    """
    CSV-only proxy:
    - if symmetry_pca exists (0..1-ish), use it
    - else fallback from pca_eig_ratio (more elongated + stable axis -> easier reflection)
    """
    if "symmetry_pca" in row.index:
        v = _as_float_scalar(row.get("symmetry_pca", np.nan), default=np.nan)
        if np.isfinite(v):
            return float(np.clip(v, 0.0, 1.0))

    eig = _as_float_scalar(row.get("pca_eig_ratio", np.nan), default=np.nan)
    if np.isfinite(eig):
        # log-saturate -> 0..1
        return float(np.clip(np.log1p(max(eig, 0.0)) / 4.0, 0.0, 1.0))

    return np.nan

def reflection_symmetry_whole(df_img):
    v = np.array([reflection_symmetry_region(r) for _, r in df_img.iterrows()], dtype=float)
    w = _weights(df_img)
    wsum = float(np.sum(w) + 1e-12)
    return float(np.nansum(v * w) / wsum) if np.isfinite(v).any() else np.nan

### Rotational symmetry score (structural proxy)

In [33]:
def rotational_symmetry_proxy(df_img):
    """
    CSV-only proxy:
    - need centroids + angles
    - estimate center (weighted centroid)
    - take polar angles of regions around center
    - measure how well angles align to a K-fold pattern (K=2..6), choose best
    Output: 0..1
    """
    xcol, ycol = _pick_centroid_cols(df_img)
    if xcol is None:
        return np.nan

    if "pca_angle_deg" not in df_img.columns:
        return np.nan

    x = _as_float_series(df_img[xcol]).to_numpy()
    y = _as_float_series(df_img[ycol]).to_numpy()
    ang = _wrap180(_as_float_series(df_img["pca_angle_deg"]).to_numpy())

    ok = np.isfinite(x) & np.isfinite(y) & np.isfinite(ang)
    if np.sum(ok) < 4:
        return np.nan

    w = _weights(df_img)[ok]
    wsum = float(np.sum(w) + 1e-12)

    cx = float(np.sum(x[ok] * w) / wsum)
    cy = float(np.sum(y[ok] * w) / wsum)

    # polar angles around center (0..2pi)
    dx = x[ok] - cx
    dy = y[ok] - cy
    phi = np.arctan2(dy, dx)  # -pi..pi

    # test K-fold regularity: we want phis to fall near multiples of 2pi/K
    best = 0.0
    for K in range(2, 7):
        # map each phi to nearest lattice point
        step = 2*np.pi / K
        # distance to nearest multiple, normalized
        d = np.abs(((phi + np.pi) % step) - step/2)  # 0..step/2-ish (cheap)
        # turn into score: smaller d => higher score
        d01 = np.clip(d / (step/2 + 1e-9), 0.0, 1.0)
        score = float(1.0 - (np.sum(d01 * w) / wsum))
        best = max(best, score)

    return float(np.clip(best, 0.0, 1.0))

### Repetition / frieze-like regularity

In [34]:
def _descriptor_matrix(df_img):
    # choose a stable descriptor set (only existing columns)
    base = [
        "area_ratio", "pca_eig_ratio", "symmetry_pca",
        "smoothness", "centeredness", "saliency"
    ]
    cols = _safe_cols(df_img, base)

    if not cols:
        return None, []

    X = df_img[cols].copy()
    for c in cols:
        X[c] = _as_float_series(X[c])
    X = X.to_numpy(dtype=float)

    # z-score (per image) to compare patterns inside same image
    mu = np.nanmean(X, axis=0)
    sd = np.nanstd(X, axis=0) + 1e-9
    Xz = (X - mu) / sd
    Xz = np.where(np.isfinite(Xz), Xz, 0.0)
    return Xz, cols

def _cos_sim(X):
    norms = np.linalg.norm(X, axis=1, keepdims=True) + 1e-9
    U = X / norms
    return U @ U.T

def repetition_index(df_img, sim_threshold=0.88):
    """
    Repetition density:
    - build descriptor similarity matrix
    - count how many near-duplicates (above threshold)
    - normalize by N*(N-1)
    Output: 0..1
    """
    Xz, cols = _descriptor_matrix(df_img)
    if Xz is None or len(df_img) < 2:
        return np.nan, np.nan, cols

    S = _cos_sim(Xz)
    N = S.shape[0]
    # ignore diagonal
    mask = np.ones_like(S, dtype=bool)
    np.fill_diagonal(mask, False)
    sims = S[mask]

    rep = float(np.mean(sims >= sim_threshold))  # 0..1

    # spacing regularity (frieze-like): along dominant global axis if centroids exist
    xcol, ycol = _pick_centroid_cols(df_img)
    spacing01 = np.nan
    if xcol is not None:
        x = _as_float_series(df_img[xcol]).to_numpy()
        y = _as_float_series(df_img[ycol]).to_numpy()
        ok = np.isfinite(x) & np.isfinite(y)
        if np.sum(ok) >= 4:
            # dominant axis via PCA on centroids
            pts = np.stack([x[ok], y[ok]], axis=1)
            ctr = pts.mean(axis=0)
            X = pts - ctr
            C = (X.T @ X) / max(1, len(X)-1)
            eigvals, eigvecs = np.linalg.eigh(C)
            v = eigvecs[:, np.argmax(eigvals)]  # dominant direction
            proj = X @ v                         # 1D projection
            proj = np.sort(proj)
            gaps = np.diff(proj)
            gaps = gaps[gaps > 1e-9]
            if len(gaps) >= 3:
                cv = float(np.std(gaps) / (np.mean(gaps) + 1e-9))  # lower is more regular
                spacing01 = float(np.clip(1.0 - cv, 0.0, 1.0))

    return rep, spacing01, cols

### Ornamentality index

In [35]:
def structural_surprise(df_img, Xz=None):
    """
    Low “structural surprise” = low diversity of descriptors.
    Proxy: average distance of each region descriptor to the mean (normalized).
    Output: 0..1 (higher = more surprise/variety)
    """
    if Xz is None:
        Xz, _ = _descriptor_matrix(df_img)
    if Xz is None or len(df_img) < 2:
        return np.nan

    mu = np.mean(Xz, axis=0, keepdims=True)
    d = np.linalg.norm(Xz - mu, axis=1)
    # normalize by robust scale
    d01 = float(np.clip(np.mean(d) / (np.percentile(d, 90) + 1e-9), 0.0, 1.0))
    return d01

def ornamentality_index(sym_ref_whole, sym_rot, rep, spacing01, surprise01):
    """
    Ornamentality = high symmetry + high repetition + regular spacing + low surprise.
    All inputs assumed 0..1.
    Output: 0..1
    """
    parts = [
        sym_ref_whole,
        (sym_rot if np.isfinite(sym_rot) else 0.0),
        (rep if np.isfinite(rep) else 0.0),
        (spacing01 if np.isfinite(spacing01) else 0.0),
        (1.0 - surprise01 if np.isfinite(surprise01) else 0.0),
    ]
    # weights: symmetry+repetition dominate
    w = np.array([0.28, 0.12, 0.28, 0.12, 0.20], dtype=float)
    v = np.array([p if np.isfinite(p) else 0.0 for p in parts], dtype=float)
    return float(np.clip(np.sum(w*v), 0.0, 1.0))

### Batch Computing

In [36]:
rows_region = []
rows_image  = []

# tuning knobs
REP_SIM_THRESHOLD = 0.88

# progress without extra deps
groups = list(df_regions.groupby(["split","image_id"], sort=False))
total = len(groups)

for gi, ((split, image_id), g) in enumerate(groups, start=1):
    df_img = g.copy().reset_index(drop=True)

    # Per-region reflection symmetry
    sym_ref_r = np.array([reflection_symmetry_region(r) for _, r in df_img.iterrows()], dtype=float)

    # Whole reflection symmetry (weighted)
    sym_ref_wh = reflection_symmetry_whole(df_img)

    # Rotational symmetry proxy
    sym_rot = rotational_symmetry_proxy(df_img)

    # Repetition + spacing regularity
    rep01, spacing01, desc_cols = repetition_index(df_img, sim_threshold=REP_SIM_THRESHOLD)

    # Surprise
    Xz, _ = _descriptor_matrix(df_img)
    surprise01 = structural_surprise(df_img, Xz=Xz)

    # Ornamentality
    orn01 = ornamentality_index(sym_ref_wh, sym_rot, rep01, spacing01, surprise01)

    # Per-region rows
    for i, r in df_img.iterrows():
        rows_region.append({
            "split": split,
            "image_id": image_id,
            "region_index": int(r["region_index"]) if "region_index" in df_img.columns else int(i),

            "form5_reflection_symmetry_region_01": float(sym_ref_r[i]) if np.isfinite(sym_ref_r[i]) else np.nan,
            "form5_reflection_symmetry_whole_01": float(sym_ref_wh) if np.isfinite(sym_ref_wh) else np.nan,

            # optional: keep useful context
            "pca_angle_deg": float(_as_float_scalar(r.get("pca_angle_deg", np.nan))),
        })

    # Per-image summary row
    rows_image.append({
        "split": split,
        "image_id": image_id,

        "form5_reflection_symmetry_whole_01": float(sym_ref_wh) if np.isfinite(sym_ref_wh) else np.nan,
        "form5_rotational_symmetry_proxy_01": float(sym_rot) if np.isfinite(sym_rot) else np.nan,

        "form5_repetition_density_01": float(rep01) if np.isfinite(rep01) else np.nan,
        "form5_spacing_regularity_01": float(spacing01) if np.isfinite(spacing01) else np.nan,

        "form5_structural_surprise_01": float(surprise01) if np.isfinite(surprise01) else np.nan,
        "form5_ornamentality_index_01": float(orn01) if np.isfinite(orn01) else np.nan,

        "n_regions": int(len(df_img)),
        "descriptor_cols_used": ",".join(desc_cols),
        "rep_sim_threshold": float(REP_SIM_THRESHOLD),
    })

    if gi % 25 == 0 or gi == total:
        print(f"[Form5] {gi}/{total} done | split={split} | image_id={image_id}")

df_form5_region = pd.DataFrame(rows_region)
df_form5_image  = pd.DataFrame(rows_image)

# Save
OUT_FORM5_DIR = r"outputs\form5"
import os
os.makedirs(OUT_FORM5_DIR, exist_ok=True)

df_form5_region.to_csv(os.path.join(OUT_FORM5_DIR, "form5_region_indices.csv"), index=False)
df_form5_image.to_csv(os.path.join(OUT_FORM5_DIR, "form5_image_indices.csv"), index=False)

print("Saved:",
      os.path.join(OUT_FORM5_DIR, "form5_region_indices.csv"),
      os.path.join(OUT_FORM5_DIR, "form5_image_indices.csv"))
df_form5_image.head(10)

  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,


[Form5] 25/128 done | split=real | image_id=real\images\Four_breasts_pink_purple_red_yellow.webp
[Form5] 50/128 done | split=real | image_id=real\images\Overview_Effect_blue_red_mandala.webp
[Form5] 75/128 done | split=generated | image_id=generated\ComfyUI_00091_.png
[Form5] 100/128 done | split=generated | image_id=generated\ComfyUI_00126_.png
[Form5] 125/128 done | split=generated | image_id=generated\ComfyUI_00159_.png
[Form5] 128/128 done | split=generated | image_id=generated\ComfyUI_00163_.png
Saved: outputs\form5\form5_region_indices.csv outputs\form5\form5_image_indices.csv


  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  mu = np.nanmean(X, axis=0)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,


Unnamed: 0,split,image_id,form5_reflection_symmetry_whole_01,form5_rotational_symmetry_proxy_01,form5_repetition_density_01,form5_spacing_regularity_01,form5_structural_surprise_01,form5_ornamentality_index_01,n_regions,descriptor_cols_used,rep_sim_threshold
0,real,real\images\10pm_feeding_around_the_clock.webp,0.391989,,0.070175,,0.704366,0.188533,19,"area_ratio,pca_eig_ratio,symmetry_pca,smoothne...",0.88
1,real,real\images\11pm.webp,0.182162,,,,,0.051005,1,"area_ratio,pca_eig_ratio,symmetry_pca,smoothne...",0.88
2,real,real\images\12_4_7_10.webp,0.932644,,,,,0.26114,1,"area_ratio,pca_eig_ratio,symmetry_pca,smoothne...",0.88
3,real,real\images\Belly_breast.webp,0.267258,,0.0,,0.810257,0.112781,7,"area_ratio,pca_eig_ratio,symmetry_pca,smoothne...",0.88
4,real,real\images\Belly_breast_face_brain_placenta.webp,0.988779,,,,,0.276858,1,"area_ratio,pca_eig_ratio,symmetry_pca,smoothne...",0.88
5,real,real\images\Bellyscape_and_plumb_bob.webp,0.988288,,,,,0.276721,1,"area_ratio,pca_eig_ratio,symmetry_pca,smoothne...",0.88
6,real,real\images\Birth_perspective_from_above_and_b...,0.88845,,,,,0.248766,1,"area_ratio,pca_eig_ratio,symmetry_pca,smoothne...",0.88
7,real,real\images\Blue_0_with_red_halo.webp,0.997315,,,,,0.279248,1,"area_ratio,pca_eig_ratio,symmetry_pca,smoothne...",0.88
8,real,real\images\Blue_Nip.webp,0.28447,,0.0,,1.0,0.079652,2,"area_ratio,pca_eig_ratio,symmetry_pca,smoothne...",0.88
9,real,real\images\Blue_eye.webp,0.378793,,,,,0.106062,1,"area_ratio,pca_eig_ratio,symmetry_pca,smoothne...",0.88


### Abstraction level & “form as invention” proxies

In [37]:
DATA_ROOT = Path("data")  # expects data/real and data/generated
OUT_DIR = Path("outputs/form6")
OUT_DIR.mkdir(parents=True, exist_ok=True)

### Edge Pipe (Repeated as Before)

In [38]:
SIGMAS = (1, 2, 4)
EDGE_PERCENTILE = 85.0
MIN_AREA_RATIO = 0.00005
CLOSE_K_RATIO = 0.004
CLOSE_ITERS = 1

def read_image_bgr(path: str) -> np.ndarray:
    img = cv2.imread(path, cv2.IMREAD_UNCHANGED)
    if img is None:
        raise FileNotFoundError(path)
    if img.ndim == 2:
        return cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
    if img.shape[2] == 4:
        bgr = img[:, :, :3].astype(np.float32)
        alpha = img[:, :, 3:4].astype(np.float32) / 255.0
        white = np.full_like(bgr, 255.0, dtype=np.float32)
        comp = bgr * alpha + white * (1.0 - alpha)
        return np.clip(comp, 0, 255).astype(np.uint8)
    return img[:, :, :3]

def preprocess_gray01(path: str) -> np.ndarray:
    bgr = read_image_bgr(path)
    gray = cv2.cvtColor(bgr, cv2.COLOR_BGR2GRAY).astype(np.float32) / 255.0
    gray = cv2.GaussianBlur(gray, (0,0), 1.2)
    return gray

def gaussian_blur(gray_f: np.ndarray, sigma: float) -> np.ndarray:
    return cv2.GaussianBlur(gray_f, (0, 0), sigmaX=sigma, sigmaY=sigma)

def sobel_magnitude(gray_f: np.ndarray) -> np.ndarray:
    gx = cv2.Sobel(gray_f, cv2.CV_32F, 1, 0, ksize=3)
    gy = cv2.Sobel(gray_f, cv2.CV_32F, 0, 1, ksize=3)
    return cv2.magnitude(gx, gy)

def normalize_01(x: np.ndarray, eps: float = 1e-6) -> np.ndarray:
    x = x.astype(np.float32)
    mn, mx = float(np.min(x)), float(np.max(x))
    return (x - mn) / (mx - mn + eps)

def multiscale_edge_strength(gray_f: np.ndarray, sigmas=(1,2,4)):
    mags = []
    for s in sigmas:
        sm = gaussian_blur(gray_f, float(s))
        mag = sobel_magnitude(sm)
        mags.append(normalize_01(mag))
    return np.max(np.stack(mags, axis=0), axis=0)

def remove_small_components(binary01: np.ndarray, min_area_px: int) -> np.ndarray:
    num, labels, stats, _ = cv2.connectedComponentsWithStats(binary01.astype(np.uint8), connectivity=8)
    out = np.zeros_like(binary01, dtype=np.uint8)
    for i in range(1, num):
        if stats[i, cv2.CC_STAT_AREA] >= min_area_px:
            out[labels == i] = 1
    return out

def morph_close(binary01: np.ndarray, k: int, iters: int) -> np.ndarray:
    if k % 2 == 0: k += 1
    k = max(3, int(k))
    ker = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k,k))
    closed = cv2.morphologyEx((binary01*255).astype(np.uint8), cv2.MORPH_CLOSE, ker, iterations=int(iters))
    return (closed > 0).astype(np.uint8)

def make_edge_maps(gray01: np.ndarray):
    H, W = gray01.shape
    E = multiscale_edge_strength(gray01, sigmas=SIGMAS)
    thr = np.percentile(E, EDGE_PERCENTILE)
    Eb = (E >= thr).astype(np.uint8)

    min_area_px = int(max(10, round(MIN_AREA_RATIO * H * W)))
    Eb = remove_small_components(Eb, min_area_px=min_area_px)

    k = int(max(3, round(CLOSE_K_RATIO * min(H, W))))
    Eb = morph_close(Eb, k=k, iters=CLOSE_ITERS)
    return E, Eb

### Iter Images (same as before)

In [39]:
def iter_images(data_root: Path):
    for split in ["real", "generated"]:
        base = data_root / split
        if not base.exists():
            continue
        for p in base.rglob("*"):
            if p.suffix.lower() in [".png",".jpg",".jpeg",".webp",".tif",".tiff",".bmp"]:
                rel = str(p.relative_to(data_root)).replace("\\", "/")
                yield split, rel, str(p)

### Geometricization

#### edge pixels explained by straight lines + circles (approx)

In [40]:
def geometricization_score(Eb01: np.ndarray):
    H, W = Eb01.shape
    edge_px = int(Eb01.sum())
    if edge_px <= 0:
        return 0.0, 0.0, 0.0

    # Use Canny on Eb to get proper 0/255 edges for Hough
    edges255 = (Eb01 * 255).astype(np.uint8)

    # (a) Lines via Probabilistic Hough
    min_line_len = max(20, int(0.03 * min(H, W)))
    max_gap = max(3, int(0.01 * min(H, W)))
    lines = cv2.HoughLinesP(
        edges255, rho=1, theta=np.pi/180,
        threshold=80, minLineLength=min_line_len, maxLineGap=max_gap
    )

    line_cover = np.zeros((H, W), np.uint8)
    if lines is not None:
        for (x1,y1,x2,y2) in lines[:,0,:]:
            cv2.line(line_cover, (x1,y1), (x2,y2), 1, thickness=1)
    line_hits = int((line_cover & Eb01).sum())

    # (b) Circles via HoughCircles (often returns none; that's ok)
    # We run on a blurred grayscale derived from edges to stabilize.
    blur = cv2.GaussianBlur(edges255, (0,0), 1.2)
    circles = cv2.HoughCircles(
        blur, cv2.HOUGH_GRADIENT,
        dp=1.2, minDist=max(15, int(0.06 * min(H,W))),
        param1=120, param2=18,
        minRadius=max(8, int(0.015 * min(H,W))),
        maxRadius=int(0.45 * min(H,W))
    )

    circ_cover = np.zeros((H, W), np.uint8)
    if circles is not None:
        circles = np.round(circles[0]).astype(int)
        # Draw only a thin perimeter (arc proxy)
        for (cx, cy, r) in circles[:12]:  # cap to avoid crazy overdraw
            if r <= 0:
                continue
            cv2.circle(circ_cover, (cx,cy), r, 1, thickness=1)
    circ_hits = int((circ_cover & Eb01).sum())

    # Combined (avoid double count by OR)
    geom_cover = ((line_cover | circ_cover) & Eb01).sum()
    geom01 = float(geom_cover / (edge_px + 1e-9))
    line01 = float(line_hits / (edge_px + 1e-9))
    circ01 = float(circ_hits / (edge_px + 1e-9))

    return float(np.clip(geom01, 0.0, 1.0)), float(np.clip(line01, 0.0, 1.0)), float(np.clip(circ01, 0.0, 1.0))

### Structural Economy

"How many salient parts explain most of the mass?"
Uses df_regions if present; otherwise fallback to edge components count.

In [41]:
def economy_from_regions(df_img_regions: pd.DataFrame, target_mass=0.80):
    if df_img_regions is None or len(df_img_regions) == 0:
        return np.nan, np.nan, 0

    # choose weights: saliency if available, else area_ratio, else area_px
    if "saliency" in df_img_regions.columns and np.isfinite(df_img_regions["saliency"].to_numpy(dtype=float)).any():
        w = pd.to_numeric(df_img_regions["saliency"], errors="coerce").astype(float).fillna(0.0).to_numpy()
    elif "area_ratio" in df_img_regions.columns and np.isfinite(df_img_regions["area_ratio"].to_numpy(dtype=float)).any():
        w = pd.to_numeric(df_img_regions["area_ratio"], errors="coerce").astype(float).fillna(0.0).to_numpy()
    elif "area_px" in df_img_regions.columns:
        w = pd.to_numeric(df_img_regions["area_px"], errors="coerce").astype(float).fillna(0.0).to_numpy()
    else:
        return np.nan, np.nan, int(len(df_img_regions))

    w = np.maximum(w, 0.0)
    tot = float(w.sum())
    if tot <= 1e-12:
        return np.nan, np.nan, int(len(df_img_regions))

    idx = np.argsort(-w)
    cum = np.cumsum(w[idx]) / (tot + 1e-12)
    k = int(np.searchsorted(cum, target_mass) + 1)  # parts needed
    k = max(1, min(k, len(w)))

    # map to 0..1 where higher=more economical
    # economy01 = 1 - (k-1)/(n-1)
    n = int(len(w))
    economy01 = 1.0 if n <= 1 else 1.0 - ((k - 1) / (n - 1))
    return float(np.clip(economy01, 0.0, 1.0)), float(k), n

### Clutter proxies + compression-like proxy

In [42]:
import zlib


def compression_proxy(Eb01: np.ndarray):
    # A simple, stable proxy:
    # compress raw bits with zlib and normalize by pixels
    raw = Eb01.astype(np.uint8).tobytes()
    comp = zlib.compress(raw, level=6)
    return float(len(comp) / (Eb01.size + 1e-9))

def connected_edge_components(Eb01: np.ndarray):
    num, labels, stats, _ = cv2.connectedComponentsWithStats(Eb01.astype(np.uint8), connectivity=8)
    # components excluding background
    sizes = stats[1:, cv2.CC_STAT_AREA] if num > 1 else np.array([], dtype=int)
    return int(max(0, num-1)), sizes

### Skeleton complexity from df_regions (you already have skeleton graph features)

In [43]:
def skeleton_complexity_from_regions(df_img_regions: pd.DataFrame):
    if df_img_regions is None or len(df_img_regions) == 0:
        return np.nan, np.nan, np.nan

    def colsum(name):
        if name not in df_img_regions.columns:
            return np.nan
        v = pd.to_numeric(df_img_regions[name], errors="coerce").astype(float).to_numpy()
        v = v[np.isfinite(v)]
        return float(v.sum()) if len(v) else np.nan

    # total branch/junction/edge mass proxies
    n_junc = colsum("n_junctions")
    n_endp = colsum("n_endpoints")
    n_edges = colsum("n_edges")
    skel_mass = colsum("skel_mass_px")

    # normalized complexity index (soft saturation)
    # you can tweak weights anytime; this is stable and monotonic
    parts = []
    for v, w in [(n_junc, 1.0), (n_edges, 0.6), (n_endp, 0.5), (skel_mass, 0.001)]:
        if np.isfinite(v):
            parts.append(w * np.log1p(max(v, 0.0)))
    if not parts:
        return np.nan, np.nan, np.nan

    raw = float(np.sum(parts))
    # map raw -> 0..1 with a soft cap
    comp01 = float(np.clip(raw / 10.0, 0.0, 1.0))
    return comp01, n_junc, n_edges

### Batch Run

In [44]:
from tqdm import tqdm

rows_img = []
rows_reg_econ = []

# Build a quick lookup from df_regions (assumes you already loaded it)
# df_regions columns include: split, image_id, region_index, area_ratio, saliency, ...
df_regions = df_regions.copy()
df_regions["split"] = df_regions["split"].astype(str)
df_regions["image_id"] = (
    df_regions["image_id"]
        .astype(str)
        .str.replace("\\", "/", regex=False)   # fix backslashes
)

regions_lookup = {(s, iid): g.copy()
                  for (s, iid), g in df_regions.groupby(["split","image_id"], sort=False)}

img_list = list(iter_images(DATA_ROOT))
print("Found images:", len(img_list))

for split, image_id, path in tqdm(img_list, desc="Form-6 batch", unit="img"):
    gray01 = preprocess_gray01(path)
    E01, Eb01 = make_edge_maps(gray01)

    H, W = Eb01.shape
    diag = float(np.hypot(H, W))

    # (1) Geometricization
    geom01, line01, circ01 = geometricization_score(Eb01)

    # (2) Economy (region-based)
    df_img_regions = regions_lookup.get((split, image_id), None)
    econ01, econ_k, n_regions = economy_from_regions(df_img_regions, target_mass=0.80)

    # Store region economy detail (optional but useful)
    if df_img_regions is not None and len(df_img_regions) > 0:
        # ranked contribution curve
        if "saliency" in df_img_regions.columns and np.isfinite(pd.to_numeric(df_img_regions["saliency"], errors="coerce")).any():
            w = pd.to_numeric(df_img_regions["saliency"], errors="coerce").astype(float).fillna(0.0).to_numpy()
            w_name = "saliency"
        elif "area_ratio" in df_img_regions.columns:
            w = pd.to_numeric(df_img_regions["area_ratio"], errors="coerce").astype(float).fillna(0.0).to_numpy()
            w_name = "area_ratio"
        else:
            w = pd.to_numeric(df_img_regions.get("area_px", 0.0), errors="coerce").astype(float).fillna(0.0).to_numpy()
            w_name = "area_px"
        w = np.maximum(w, 0.0)
        tot = float(w.sum() + 1e-12)
        order = np.argsort(-w)
        cum = np.cumsum(w[order]) / tot
        for rank, idx in enumerate(order[: min(len(order), 25)], start=1):  # cap
            rows_reg_econ.append({
                "split": split,
                "image_id": image_id,
                "region_index": int(df_img_regions.iloc[idx]["region_index"]) if "region_index" in df_img_regions.columns else int(idx),
                "rank": int(rank),
                "weight_col": w_name,
                "weight": float(w[idx]),
                "cum_mass_01": float(cum[rank-1]),
            })

    # (3) Clutter proxies
    edge_density_01 = float(Eb01.mean())
    n_cc, cc_sizes = connected_edge_components(Eb01)
    cc_entropy = np.nan
    if len(cc_sizes) >= 2:
        p = cc_sizes.astype(float) / (cc_sizes.sum() + 1e-12)
        cc_entropy = float(-(p * np.log(p + 1e-12)).sum() / np.log(len(p) + 1e-12))  # normalized 0..1

    comp_ratio = compression_proxy(Eb01)

    # (4) Skeleton complexity (from df_regions)
    sk_comp01, sk_junc_sum, sk_edges_sum = skeleton_complexity_from_regions(df_img_regions)

    # A combined “clutter/complexity” index (0..1) — optional but nice for thesis
    # (more edges + more components + higher entropy + higher skeleton complexity)
    cc01 = float(np.clip(
        0.45*edge_density_01 +
        0.20*np.tanh(n_cc / 80.0) +
        0.15*(cc_entropy if np.isfinite(cc_entropy) else 0.0) +
        0.20*(sk_comp01 if np.isfinite(sk_comp01) else 0.0),
        0.0, 1.0
    ))

    rows_img.append({
        "split": split,
        "image_id": image_id,
        "H": int(H), "W": int(W),
        "edge_density_01": edge_density_01,
        "edge_cc_count": int(n_cc),
        "edge_cc_entropy_01": (cc_entropy if np.isfinite(cc_entropy) else np.nan),
        "compression_proxy": float(comp_ratio),

        # Form-6 outputs
        "form6_geometricization_01": float(geom01),
        "form6_line_explained_01": float(line01),
        "form6_circle_explained_01": float(circ01),

        "form6_structural_economy_01": (float(econ01) if np.isfinite(econ01) else np.nan),
        "form6_parts_to_explain_80pct": (float(econ_k) if np.isfinite(econ_k) else np.nan),
        "n_regions": int(n_regions) if np.isfinite(n_regions) else (int(len(df_img_regions)) if df_img_regions is not None else 0),

        "form6_skeletal_complexity_01": (float(sk_comp01) if np.isfinite(sk_comp01) else np.nan),
        "skel_junctions_sum": (float(sk_junc_sum) if np.isfinite(sk_junc_sum) else np.nan),
        "skel_edges_sum": (float(sk_edges_sum) if np.isfinite(sk_edges_sum) else np.nan),

        "form6_clutter_complexity_01": float(cc01),
    })

df_img = pd.DataFrame(rows_img)
df_reg = pd.DataFrame(rows_reg_econ)

df_img.to_csv(OUT_DIR / "form6_image_indices.csv", index=False)
df_reg.to_csv(OUT_DIR / "form6_region_economy.csv", index=False)

print("Saved:", OUT_DIR / "form6_image_indices.csv", "rows=", len(df_img))
print("Saved:", OUT_DIR / "form6_region_economy.csv", "rows=", len(df_reg))
df_img.head(10)

Found images: 130


Form-6 batch: 100%|██████████| 130/130 [04:57<00:00,  2.29s/img] 

Saved: outputs\form6\form6_image_indices.csv rows= 130
Saved: outputs\form6\form6_region_economy.csv rows= 366





Unnamed: 0,split,image_id,H,W,edge_density_01,edge_cc_count,edge_cc_entropy_01,compression_proxy,form6_geometricization_01,form6_line_explained_01,form6_circle_explained_01,form6_structural_economy_01,form6_parts_to_explain_80pct,n_regions,form6_skeletal_complexity_01,skel_junctions_sum,skel_edges_sum,form6_clutter_complexity_01
0,real,real/images/10pm_feeding_around_the_clock.webp,749,750,0.150277,37,0.637317,0.01749,0.604516,0.582163,0.054976,0.222222,15.0,19,1.0,1359.0,2207.0,0.449646
1,real,real/images/11pm.webp,642,500,0.150075,2,0.01321,0.009798,0.792564,0.779964,0.046104,1.0,1.0,1,0.770513,80.0,137.0,0.228617
2,real,real/images/12_4_7_10.webp,749,750,0.151121,78,0.392085,0.018555,0.635266,0.621872,0.039662,1.0,1.0,1,0.0,0.0,0.0,0.276996
3,real,real/images/around_the_clock.webp,799,750,0.150061,20,0.550538,0.014752,0.617221,0.593746,0.059339,1.0,1.0,1,0.096888,0.0,1.0,0.21847
4,real,real/images/around_the_clock_alizarin.webp,704,1000,0.150786,1,,0.008784,0.79214,0.783605,0.040074,1.0,1.0,1,0.097085,0.0,1.0,0.08977
5,real,real/images/Bellyscape_and_plumb_bob.webp,595,750,0.15176,5,0.044018,0.011787,0.702361,0.685026,0.058075,1.0,1.0,1,0.097018,0.0,1.0,0.106782
6,real,real/images/Belly_breast.webp,667,1000,0.14707,157,0.618057,0.02688,0.549706,0.527463,0.041847,0.166667,6.0,7,1.0,2276.0,3763.0,0.551146
7,real,real/images/Belly_breast_face_brain_placenta.webp,680,750,0.148355,71,0.394676,0.023055,0.62665,0.601049,0.052722,1.0,1.0,1,1.0,695.0,1381.0,0.467993
8,real,real/images/Birth_perspective_from_above_and_b...,571,750,0.148619,48,0.376538,0.022291,0.579047,0.548817,0.066257,1.0,1.0,1,1.0,815.0,1553.0,0.430769
9,real,real/images/Blue_0_with_red_halo.webp,640,500,0.152359,1,,0.009978,0.726346,0.700564,0.064465,1.0,1.0,1,0.097011,0.0,1.0,0.090464


### Computational Indexing

In [45]:
import os
import numpy as np
import pandas as pd

# =========================
# Output
# =========================
OUT_DIR = r"outputs\form"
os.makedirs(OUT_DIR, exist_ok=True)

# =========================
# Correct CSVs (from YOUR code)
# =========================
FILES = {
    "f1": r"outputs\form\form1_orientation\form1_image_orientation.csv",
    "f2": r"outputs\form2_image.csv",
    "f3": r"outputs\form\form3\form3_image.csv",
    "f4": r"outputs\form4\form4_image_indices.csv",
    "f5": r"outputs\form5\form5_image_indices.csv",
    "f6": r"outputs\form6\form6_image_indices.csv",
}


# =========================
# Helpers
# =========================
def _safe_read_csv(path):
    if path and os.path.exists(path):
        return pd.read_csv(path)
    print("Missing:", path)
    return pd.DataFrame()


def _as_float(s):
    return pd.to_numeric(s, errors="coerce").astype(float)


def safe_cols(df, cols):
    return [c for c in cols if c in df.columns]


def zscore_series(x, eps=1e-9):
    x = _as_float(x)
    mu = np.nanmean(x)
    sd = np.nanstd(x)
    return (x - mu) / (sd + eps)


def clip01(x):
    x = _as_float(x)
    return np.clip(x, 0.0, 1.0)


def inv01(x):
    return 1.0 - clip01(x)


def nanmean_row(df, cols):
    cols = safe_cols(df, cols)
    if not cols:
        return pd.Series(np.nan, index=df.index)
    X = df[cols].apply(_as_float)
    return X.mean(axis=1, skipna=True)


def weighted_nanmean_row(df, cols, wts):
    cols = [c for c in cols if c in df.columns]
    if not cols:
        return pd.Series(np.nan, index=df.index)
    X = df[cols].apply(_as_float)
    w = np.array([wts.get(c, 1.0) for c in cols], dtype=float)

    num = np.zeros(len(df), dtype=float)
    den = np.zeros(len(df), dtype=float)

    for j, c in enumerate(cols):
        v = X[c].to_numpy(dtype=float)
        m = np.isfinite(v)
        num[m] += w[j] * v[m]
        den[m] += w[j]

    out = np.full(len(df), np.nan, dtype=float)
    m2 = den > 0
    out[m2] = num[m2] / den[m2]
    return pd.Series(out, index=df.index)


# =========================
# Load + merge (outer)
# =========================
df1 = _safe_read_csv(FILES["f1"])
df2 = _safe_read_csv(FILES["f2"])
df3 = _safe_read_csv(FILES["f3"])
df4 = _safe_read_csv(FILES["f4"])
df5 = _safe_read_csv(FILES["f5"])
df6 = _safe_read_csv(FILES["f6"])

dfs = [df for df in [df1, df2, df3, df4, df5, df6] if not df.empty]
if not dfs:
    raise RuntimeError("No Form CSVs found. Check FILES paths.")

# validate keys
for d in dfs:
    if "split" not in d.columns or "image_id" not in d.columns:
        raise ValueError("Each CSV must include: split, image_id")

df = dfs[0].copy()
for nxt in dfs[1:]:
    df = df.merge(nxt, on=["split", "image_id"], how="outer", suffixes=("", "_dup"))

dup_cols = [c for c in df.columns if c.endswith("_dup")]
if dup_cols:
    df = df.drop(columns=dup_cols)

print("Merged form table:", df.shape)
df.head(3)

# =========================
# Subscales (0..1)
# =========================

# ---- Form-1 (correct columns)
df["form1_orientation_stability_01"] = weighted_nanmean_row(
    df,
    cols=[
        "form1_global_frame_alignment_01",
        "form1_local_frame_influence_abs_01",  # "competing frame" -> invert
        "form1_orientation_conflict_index_01",  # conflict -> invert
    ],
    wts={
        "form1_global_frame_alignment_01": 0.45,
        "form1_local_frame_influence_abs_01": 0.25,
        "form1_orientation_conflict_index_01": 0.30,
    }
)
df["_tmp_local_inv"] = inv01(df.get("form1_local_frame_influence_abs_01", np.nan))
df["_tmp_conf_inv"] = inv01(df.get("form1_orientation_conflict_index_01", np.nan))
df["form1_orientation_stability_01"] = weighted_nanmean_row(
    df,
    cols=["form1_global_frame_alignment_01", "_tmp_local_inv", "_tmp_conf_inv"],
    wts={"form1_global_frame_alignment_01": 0.45, "_tmp_local_inv": 0.25, "_tmp_conf_inv": 0.30}
)
df["form1_orientation_stability_01"] = clip01(df["form1_orientation_stability_01"])

# ---- Form-2 (your exact Form-2 outputs)
df["form2_projection_pressure_01"] = weighted_nanmean_row(
    df,
    cols=[
        "form2_affine_distortion_index_01",
        "form2_symmetry_degradation_index_01",
        "form2_viewpoint_ambiguity_index_01",
    ],
    wts={
        "form2_affine_distortion_index_01": 0.40,
        "form2_symmetry_degradation_index_01": 0.30,
        "form2_viewpoint_ambiguity_index_01": 0.30,
    }
)
df["form2_projection_pressure_01"] = clip01(df["form2_projection_pressure_01"])

# ---- Form-3 (your exact Form-3 image column)
# only one key index exists in your code:
df["form3_depth_hierarchy_01"] = clip01(df.get("form3_dominance_subservience_index_01", np.nan))

# ---- Form-4 (your exact Form-4 outputs)
df["_tmp_amp_inv"] = inv01(df.get("form4_amputation_risk_01", np.nan))
df["form4_completion_coherence_01"] = weighted_nanmean_row(
    df,
    cols=[
        "form4_good_continuation_01",
        "form4_completion_plausibility_01",
        "_tmp_amp_inv",
    ],
    wts={
        "form4_good_continuation_01": 0.45,
        "form4_completion_plausibility_01": 0.35,
        "_tmp_amp_inv": 0.20,
    }
)
df["form4_completion_coherence_01"] = clip01(df["form4_completion_coherence_01"])

# ---- Form-5 (your exact Form-5 outputs)
# Use ornamentality index directly if present
if "form5_ornamentality_index_01" in df.columns:
    df["form5_ornamentality_01"] = clip01(df["form5_ornamentality_index_01"])
else:
    # fallback build from its parts (still using your column names)
    sym = clip01(df.get("form5_reflection_symmetry_whole_01", np.nan))
    rep = clip01(df.get("form5_repetition_density_01", np.nan))
    spc = clip01(df.get("form5_spacing_regularity_01", np.nan))
    surpr = clip01(df.get("form5_structural_surprise_01", np.nan))
    df["form5_ornamentality_01"] = nanmean_row(
        pd.DataFrame({"sym": sym, "rep": rep, "spc": spc, "surpr_inv": inv01(surpr)}),
        ["sym", "rep", "spc", "surpr_inv"]
    )
df["form5_ornamentality_01"] = clip01(df["form5_ornamentality_01"])

# ---- Form-6 (your exact Form-6 outputs)
# Abstraction level = geometricization high + (1 - clutter) + economy (optional)
geo = clip01(df.get("form6_geometricization_01", np.nan))
clu = clip01(df.get("form6_clutter_complexity_01", np.nan))
eco = clip01(df.get("form6_structural_economy_01", np.nan))

df["form6_abstraction_level_01"] = weighted_nanmean_row(
    pd.DataFrame({"geo": geo, "clu_inv": inv01(clu), "eco": eco}),
    cols=["geo", "clu_inv", "eco"],
    wts={"geo": 0.45, "clu_inv": 0.35, "eco": 0.20}
)
df["form6_abstraction_level_01"] = clip01(df["form6_abstraction_level_01"])

# =========================
# Composite FORM index
# =========================
SUBSCALES = [
    "form1_orientation_stability_01",
    "form2_projection_pressure_01",
    "form3_depth_hierarchy_01",
    "form4_completion_coherence_01",
    "form5_ornamentality_01",
    "form6_abstraction_level_01",
]
df["form_composite_index_01"] = nanmean_row(df, SUBSCALES)
df["form_composite_index_01"] = clip01(df["form_composite_index_01"])

# =========================
# Z-scored versions
# =========================
for c in SUBSCALES + ["form_composite_index_01"]:
    df[c.replace("_01", "_z")] = zscore_series(df[c])

# =========================
# Save
# =========================
keep_cols = ["split", "image_id"] + SUBSCALES + ["form_composite_index_01"] + [c for c in df.columns if
                                                                               c.endswith("_z")]
out_path = os.path.join(OUT_DIR, "form_computational_indices.csv")
df[keep_cols].to_csv(out_path, index=False)

print("Saved:", out_path)
df[keep_cols].head(10)

Merged form table: (260, 55)
Saved: outputs\form\form_computational_indices.csv


  mu = np.nanmean(x)
  var = nanvar(a, axis=axis, dtype=dtype, out=out, ddof=ddof,


Unnamed: 0,split,image_id,form1_orientation_stability_01,form2_projection_pressure_01,form3_depth_hierarchy_01,form4_completion_coherence_01,form5_ornamentality_01,form6_abstraction_level_01,form_composite_index_01,form1_orientation_stability_z,form2_projection_pressure_z,form3_depth_hierarchy_z,form4_completion_coherence_z,form5_ornamentality_z,form6_abstraction_level_z,form_composite_index_z
0,generated,generated/ComfyUI_00082_.png,,,,,,0.541762,0.541762,,,,,,-0.695638,0.108628
1,generated,generated/ComfyUI_00083_.png,,,,,,0.542342,0.542342,,,,,,-0.691071,0.112691
2,generated,generated/ComfyUI_00084_.png,,,,,,0.542344,0.542344,,,,,,-0.691056,0.112704
3,generated,generated/ComfyUI_00085_.png,,,,,,0.518043,0.518043,,,,,,-0.882288,-0.057401
4,generated,generated/ComfyUI_00086_.png,,,,,,0.773744,0.773744,,,,,,1.129895,1.732486
5,generated,generated/ComfyUI_00087_.png,,,,,,0.791118,0.791118,,,,,,1.266616,1.854102
6,generated,generated/ComfyUI_00088_.png,,,,,,0.780689,0.780689,,,,,,1.184548,1.7811
7,generated,generated/ComfyUI_00089_.png,,,,,,0.529644,0.529644,,,,,,-0.790997,0.023804
8,generated,generated/ComfyUI_00090_.png,,,,,,0.538636,0.538636,,,,,,-0.720238,0.086746
9,generated,generated/ComfyUI_00091_.png,,,,,,0.690616,0.690616,,,,,,0.475736,1.150595
