In [1]:
"""
Assign row,col labels to yEd GraphML nodes based on their x/y positions.

Algorithm (per your spec):
- Group nodes into columns using 1D greedy clustering on x (tolerates small jitters).
- For each column:
  - Determine the "last row" (bottom-most index) as:
      max(existing row labels in that column), else
      median of last-rows across other columns with labels, else
      number of nodes in the column.
  - Estimate the vertical step as the median gap between consecutive y's.
  - Assign rows from bottom→top using a monotone stepper:
      bottom node gets last_row;
      each node above: row = prev_row - max(1, round(dy / y_gap)).
    This handles missing rows (skips numbers) and fuzzy spacing robustly.
- Assign column numbers:
    If a column has consistent existing ",col" labels, keep that number.
    Otherwise assign by left→right order (1..K).
- Write labels back into the existing <y:NodeLabel> text as "row,col" (same field).

Outputs:
- Updated GraphML with new labels.
- Optional CSV report mapping old→new labels.

Usage (script):
    python assign_labels.py input.graphml -o output.graphml -r report.csv

Usage (notebook):
    relabel_graphml("HIMCM_graph.graphml", "HIMCM_graph_assignedNode.graphml", "HIMCM_node_label_report.csv")
"""

import argparse
import math
import re
from collections import Counter, defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import xml.etree.ElementTree as ET

# --- Namespaces ---
NS = {
    "g": "http://graphml.graphdrawing.org/xmlns",
    "y": "http://www.yworks.com/xml/graphml",
}
# Preserve namespace prefixes in the output
ET.register_namespace("", NS["g"])
ET.register_namespace("y", NS["y"])

LABEL_RE = re.compile(r"^\s*(\d+)\s*,\s*(\d+)\s*$")


def _shape_node(node: ET.Element) -> Optional[ET.Element]:
    for d in node.findall("./g:data", NS):
        sn = d.find("./y:ShapeNode", NS)
        if sn is not None:
            return sn
    return None


def _geometry(sn: ET.Element) -> Optional[Tuple[float, float, float, float]]:
    geom = sn.find("./y:Geometry", NS)
    if geom is None:
        return None
    x = float(geom.attrib.get("x", "0"))
    y = float(geom.attrib.get("y", "0"))
    w = float(geom.attrib.get("width", "0"))
    h = float(geom.attrib.get("height", "0"))
    return x, y, w, h


def _get_label(sn: ET.Element) -> Tuple[Optional[ET.Element], str, Optional[int], Optional[int]]:
    lab = sn.find("./y:NodeLabel", NS)
    txt = ""
    row = col = None
    if lab is not None and lab.text:
        txt = lab.text.strip()
        m = LABEL_RE.match(txt)
        if m:
            row, col = int(m.group(1)), int(m.group(2))
    return lab, txt, row, col


def _ensure_label(sn: ET.Element) -> ET.Element:
    lab = sn.find("./y:NodeLabel", NS)
    if lab is None:
        lab = ET.SubElement(sn, f"{{{NS['y']}}}NodeLabel")
        lab.set("visible", "true")
    return lab


def _median_vertical_gap(y_values: List[float]) -> float:
    if len(y_values) < 2:
        return float("nan")
    diffs = np.diff(sorted(y_values))
    diffs = np.abs(diffs)
    return float(np.median(diffs))


def _cluster_columns(df: pd.DataFrame,
                     width_factor: float = 1.5,
                     nn_percentile: float = 5.0,
                     min_eps: float = 8.0) -> Tuple[pd.DataFrame, Dict[int, float]]:
    """Greedy 1D clustering on x-center with adaptive epsilon."""
    sorted_x = np.sort(df["xc"].values)
    x_diffs = np.diff(sorted_x) if len(sorted_x) > 1 else np.array([0.0])
    median_w = float(np.median(df["w"])) if len(df) else 10.0
    eps_candidates = [
        median_w * width_factor,
        (np.percentile(x_diffs, nn_percentile) * 2.0) if len(x_diffs) else median_w * width_factor,
        min_eps,
    ]
    eps = float(max(eps_candidates))

    df_sorted = df.sort_values("xc").reset_index(drop=True)

    col_centers: List[float] = []   # running centers
    col_counts: List[int] = []
    col_ids: List[int] = []

    for _, row in df_sorted.iterrows():
        xcur = row["xc"]
        if col_centers:
            dists = [abs(xcur - c) for c in col_centers]
            j = int(np.argmin(dists))
            if dists[j] <= eps:
                # assign to existing cluster j
                new_center = (col_centers[j] * col_counts[j] + xcur) / (col_counts[j] + 1)
                col_centers[j] = new_center
                col_counts[j] += 1
                col_ids.append(j)
                continue
        # start a new cluster
        col_centers.append(xcur)
        col_counts.append(1)
        col_ids.append(len(col_centers) - 1)

    df_sorted["col_cluster"] = col_ids

    # Map clusters to visual order (left→right) as 1..K
    ordering = sorted([(c, i) for i, c in enumerate(col_centers)], key=lambda t: t[0])
    cluster_to_visual = {cluster_idx: (rank + 1) for rank, (_, cluster_idx) in enumerate(ordering)}
    df_sorted["col_visual"] = df_sorted["col_cluster"].map(cluster_to_visual)

    return df_sorted, {i: c for i, c in enumerate(col_centers)}


def _assign_columns(df: pd.DataFrame, consistency_ratio: float = 0.6) -> Tuple[pd.DataFrame, Dict[int, int]]:
    """Prefer existing ',col' labels if consistent; else use visual order."""
    cluster_final_col: Dict[int, int] = {}
    for cluster_idx, sub in df.groupby("col_cluster"):
        known_cols = [int(c) for c in sub["old_col"].dropna().tolist()]
        if known_cols:
            mode_col, cnt = Counter(known_cols).most_common(1)[0]
            if cnt >= max(1, int(consistency_ratio * len(known_cols))):
                cluster_final_col[cluster_idx] = mode_col
                continue
        cluster_final_col[cluster_idx] = int(sub["col_visual"].iloc[0])
    df["assigned_col"] = df["col_cluster"].map(cluster_final_col)
    return df, cluster_final_col


def _compute_last_rows_and_gaps(df: pd.DataFrame) -> Tuple[Dict[int, int], Dict[int, float], Dict[int, float]]:
    """Collect last_row (bottom-most index), bottom y, and y_gap per column."""
    # Last row per column from known labels
    last_rows_known: Dict[int, int] = {}
    for cluster_idx, sub in df.groupby("col_cluster"):
        known_rows = [int(r) for r in sub["old_row"].dropna().tolist()]
        if known_rows:
            last_rows_known[cluster_idx] = max(known_rows)

    # Fallback last-row: median across known columns (if any), else number of nodes in that column
    last_row_global_fallback = int(np.median(list(last_rows_known.values()))) if last_rows_known else None

    # Global gap fallback
    global_gaps = []
    for _, sub in df.groupby("col_cluster"):
        gap = _median_vertical_gap(sub["yc"].tolist())
        if not math.isnan(gap):
            global_gaps.append(gap)
    global_gap_fallback = float(np.median(global_gaps)) if global_gaps else 20.0

    y_bottom: Dict[int, float] = {}
    y_gap: Dict[int, float] = {}
    last_row: Dict[int, int] = {}

    for cluster_idx, sub in df.groupby("col_cluster"):
        ys = sub["yc"].tolist()
        y_bottom[cluster_idx] = max(ys)
        gap = _median_vertical_gap(ys)
        y_gap[cluster_idx] = gap if not math.isnan(gap) and gap >= 1e-6 else global_gap_fallback

        if cluster_idx in last_rows_known:
            last_row[cluster_idx] = int(last_rows_known[cluster_idx])
        elif last_row_global_fallback is not None:
            last_row[cluster_idx] = int(last_row_global_fallback)
        else:
            last_row[cluster_idx] = int(len(ys))  # rough fallback

    return last_row, y_bottom, y_gap


def _assign_rows_monotone(df: pd.DataFrame,
                          last_row: Dict[int, int],
                          y_bottom: Dict[int, float],
                          y_gap: Dict[int, float]) -> pd.DataFrame:
    """Bottom→top monotone row assignment (handles missing rows)."""
    assigned_rows: List[int] = []

    for cluster_idx, sub in df.groupby("col_cluster", sort=False):
        sub_sorted = sub.sort_values("yc", ascending=False)  # bottom (max y) first
        rows_for_sub: Dict[int, int] = {}

        prev_y = None
        prev_row = None
        for idx, r in sub_sorted.iterrows():
            if prev_row is None:
                cur_row = int(last_row[cluster_idx])
            else:
                dy = prev_y - r["yc"]  # positive moving upward
                step = int(round(dy / y_gap[cluster_idx]))
                if step < 1:
                    step = 1
                cur_row = int(prev_row - step)
                if cur_row < 1:
                    cur_row = 1
            rows_for_sub[idx] = cur_row
            prev_y, prev_row = r["yc"], cur_row

        # Write back
        for idx in sub_sorted.index:
            assigned_rows.append((idx, rows_for_sub[idx]))

    # Merge to df in original df order (df currently is grouped copy; ensure position by index)
    row_map = dict(assigned_rows)
    df = df.copy()
    df["assigned_row"] = df.index.map(lambda i: row_map[i])
    return df

def relabel_graphml(
    input_path: str,
    output_path: str,
    report_csv: Optional[str] = None,
    width_factor: float = 2.0,
    nn_percentile: float = 5.0,
    min_eps: float = 10.0,
    consistency_ratio: float = 0.6,
    print_warnings: bool = True,
) -> None:
    in_path = Path(input_path)
    out_path = Path(output_path)
    assert in_path.exists(), f"Input file not found: {in_path}"

    tree = ET.parse(in_path)
    root = tree.getroot()
    node_elems = root.findall(".//g:graph/g:node", NS)

    rows = []
    for node in node_elems:
        nid = node.attrib.get("id")
        sn = _shape_node(node)
        if sn is None:
            continue
        geom = _geometry(sn)
        if geom is None:
            continue
        x, y, w, h = geom
        xc, yc = x + w / 2.0, y + h / 2.0
        lab_elem, txt, old_row, old_col = _get_label(sn)
        rows.append(
            {"id": nid, "elem": node, "sn": sn, "x": x, "y": y, "w": w, "h": h,
             "xc": xc, "yc": yc, "old_label": txt, "old_row": old_row, "old_col": old_col}
        )

    if not rows:
        raise RuntimeError("No drawable y:ShapeNode nodes with geometry found.")

    df = pd.DataFrame(rows)

    df_sorted, centers = _cluster_columns(df, width_factor, nn_percentile, min_eps)
    df_sorted, cluster_final_col = _assign_columns(df_sorted, consistency_ratio)
    last_row, y_bottom, y_gap = _compute_last_rows_and_gaps(df_sorted)
    df_sorted = _assign_rows_monotone(df_sorted, last_row, y_bottom, y_gap)

    # --- NaN-safe warning checks ---
    def _has_number(x):
        return x is not None and not (isinstance(x, float) and math.isnan(x))

    warnings = []
    for _, r in df_sorted.iterrows():
        if _has_number(r["old_row"]) and int(r["old_row"]) != int(r["assigned_row"]):
            warnings.append(
                f"Seed conflict at node {r['id']}: old_row={int(r['old_row'])} assigned={int(r['assigned_row'])}"
            )
        if _has_number(r["old_col"]) and int(r["old_col"]) != int(r["assigned_col"]):
            warnings.append(
                f"Seed conflict at node {r['id']}: old_col={int(r['old_col'])} assigned={int(r['assigned_col'])}"
            )

    # Write labels back
    for _, r in df_sorted.iterrows():
        sn = r["sn"]
        new_txt = f"{int(r['assigned_row'])},{int(r['assigned_col'])}"
        lab = _ensure_label(sn)
        lab.text = new_txt

    out_path.parent.mkdir(parents=True, exist_ok=True)
    tree.write(out_path, encoding="utf-8", xml_declaration=True)

    if report_csv:
        rcsv = Path(report_csv)
        report = df_sorted[["id", "xc", "yc", "old_label", "assigned_row", "assigned_col"]].copy()
        report["new_label"] = (
            report["assigned_row"].astype(int).astype(str) + "," + report["assigned_col"].astype(int).astype(str)
        )
        report = report.sort_values(["assigned_col", "assigned_row"])[
            ["id", "xc", "yc", "old_label", "new_label"]
        ]
        report.to_csv(rcsv, index=False)

    unique_cols = int(df_sorted["assigned_col"].nunique())
    global_gap_fallback = float(np.median([g for g in y_gap.values()])) if y_gap else float("nan")
    print(f"Columns detected: {unique_cols}")
    print(f"Median vertical gap (per-col medians → global median): {global_gap_fallback:.3f}")
    print(f"GraphML saved to: {out_path}")
    if report_csv:
        print(f"CSV report saved to: {report_csv}")
    if print_warnings and warnings:
        print("Sample warnings (up to 15):")
        for w in warnings[:15]:
            print("  " + w)



In [2]:
relabel_graphml(
    input_path="/home/popsatorn/Desktop/HiMCM_kickoff_2026/code/HIMCM_graph.graphml",
    output_path="/home/popsatorn/Desktop/HiMCM_kickoff_2026/code/HIMCM_graph_assignedNode.graphml",
    report_csv="/home/popsatorn/Desktop/HiMCM_kickoff_2026/code/HIMCM_graph_assignedNode.csv"
)

Columns detected: 40
Median vertical gap (per-col medians → global median): 49.288
GraphML saved to: /home/popsatorn/Desktop/HiMCM_kickoff_2026/code/HIMCM_graph_assignedNode.graphml
CSV report saved to: /home/popsatorn/Desktop/HiMCM_kickoff_2026/code/HIMCM_graph_assignedNode.csv


# Visualize

In [3]:
"""
Scrollable yEd GraphML viewer (HTML+SVG)

- Positions match yEd: uses y:Geometry (x,y,width,height) directly.
- Y axis direction matches screens (downward), same as yEd.
- Nodes drawn as ellipses sized to width/height; labels centered.
- Edges drawn as straight lines between node centers.
- Wraps the large SVG in a fixed-size, scrollable <div>.
- Optional scale factor to make everything bigger.

Usage (notebook or script):
    write_scrollable_svg_html(
        graphml_path="HIMCM_graph_assignedNode.graphml",
        out_html="HIMCM_graph_scroll.html",
        scale=1.25,            # make it bigger
        viewport_w=1400,       # scrollable viewport width (px)
        viewport_h=900,        # scrollable viewport height (px)
        show_labels=True,
        show_edges=True,
    )
"""

import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Dict, Tuple, List, Optional

# yEd / GraphML namespaces
NS = {
    "g": "http://graphml.graphdrawing.org/xmlns",
    "y": "http://www.yworks.com/xml/graphml",
}

def _parse_graphml_positions(graphml_path: str):
    tree = ET.parse(graphml_path)
    root = tree.getroot()

    nodes = {}
    min_x = float("inf")
    min_y = float("inf")
    max_x = float("-inf")
    max_y = float("-inf")

    for n in root.findall(".//g:graph/g:node", NS):
        nid = n.attrib.get("id")

        shape = None
        for d in n.findall("./g:data", NS):
            cand = d.find("./y:ShapeNode", NS)
            if cand is not None:
                shape = cand
                break
        if shape is None:
            continue

        geom = shape.find("./y:Geometry", NS)
        if geom is None:
            continue

        x = float(geom.attrib.get("x", "0"))
        y = float(geom.attrib.get("y", "0"))
        w = float(geom.attrib.get("width", "0"))
        h = float(geom.attrib.get("height", "0"))
        xc = x + w / 2.0
        yc = y + h / 2.0

        # Colors if present
        fill = "#FFCC00"
        border = "#000000"
        fill_el = shape.find("./y:Fill", NS)
        if fill_el is not None and fill_el.attrib.get("color"):
            fill = fill_el.attrib["color"]
        border_el = shape.find("./y:BorderStyle", NS)
        if border_el is not None and border_el.attrib.get("color"):
            border = border_el.attrib["color"]

        label_el = shape.find("./y:NodeLabel", NS)
        label = label_el.text.strip() if (label_el is not None and label_el.text) else ""

        # Shape type (default ellipse)
        shp_el = shape.find("./y:Shape", NS)
        shp_type = shp_el.attrib.get("type", "ellipse") if shp_el is not None else "ellipse"

        nodes[nid] = {
            "x": x, "y": y, "w": w, "h": h,
            "xc": xc, "yc": yc,
            "fill": fill, "border": border,
            "label": label,
            "shape": shp_type,
        }

        min_x = min(min_x, x)
        min_y = min(min_y, y)
        max_x = max(max_x, x + w)
        max_y = max(max_y, y + h)

    if not nodes:
        raise RuntimeError("No y:ShapeNode/y:Geometry nodes found—are you sure this is yEd GraphML?")

    # Edges
    edges = []
    for e in root.findall(".//g:graph/g:edge", NS):
        s = e.attrib.get("source")
        t = e.attrib.get("target")
        if s in nodes and t in nodes:
            edges.append((s, t))

    return nodes, edges, (min_x, min_y, max_x, max_y)

def write_scrollable_svg_html(
    graphml_path: str,
    out_html: str,
    scale: float = 1.25,
    viewport_w: int = 1400,
    viewport_h: int = 900,
    show_labels: bool = True,
    show_edges: bool = True,
    margin: int = 20,
) -> None:
    nodes, edges, (min_x, min_y, max_x, max_y) = _parse_graphml_positions(graphml_path)

    # Normalize origin to (margin, margin) so nothing hugs the top-left
    width = (max_x - min_x) + 2 * margin
    height = (max_y - min_y) + 2 * margin

    def sx(v):  # scale x
        return (v - min_x + margin) * scale
    def sy(v):  # scale y (screen coords: y goes downward like yEd; no inversion)
        return (v - min_y + margin) * scale

    svg_w = width * scale
    svg_h = height * scale

    # Build SVG elements
    svg_parts = []
    svg_parts.append(f'<svg xmlns="http://www.w3.org/2000/svg" width="{svg_w:.2f}" height="{svg_h:.2f}" '
                     f'viewBox="0 0 {svg_w:.2f} {svg_h:.2f}">')

    # Optional background grid for orientation (very light)
    grid_step = max(50, int(100 * scale))
    svg_parts.append(f'<defs>'
                     f'  <pattern id="grid" width="{grid_step}" height="{grid_step}" patternUnits="userSpaceOnUse">'
                     f'    <path d="M {grid_step} 0 L 0 0 0 {grid_step}" fill="none" stroke="#f0f0f0" stroke-width="1"/>'
                     f'  </pattern>'
                     f'</defs>')
    svg_parts.append(f'<rect x="0" y="0" width="{svg_w:.2f}" height="{svg_h:.2f}" fill="url(#grid)"/>')

    # Edges (behind nodes)
    if show_edges and edges:
        edge_lines = []
        for s, t in edges:
            xs, ys = sx(nodes[s]["xc"]), sy(nodes[s]["yc"])
            xt, yt = sx(nodes[t]["xc"]), sy(nodes[t]["yc"])
            edge_lines.append(f'<line x1="{xs:.2f}" y1="{ys:.2f}" x2="{xt:.2f}" y2="{yt:.2f}" '
                              f'stroke="#888" stroke-opacity="0.85" stroke-width="{max(1.0, 1.0*scale):.2f}"/>')
        svg_parts.extend(edge_lines)

    # Nodes
    for nid, info in nodes.items():
        cx, cy = sx(info["xc"]), sy(info["yc"])
        w, h = info["w"] * scale, info["h"] * scale
        rx, ry = w / 2.0, h / 2.0
        fill = info["fill"]
        stroke = info["border"]

        # Draw ellipse (common for your file); fallback rectangle for non-ellipse types
        if info["shape"].lower() == "ellipse":
            svg_parts.append(
                f'<ellipse cx="{cx:.2f}" cy="{cy:.2f}" rx="{rx:.2f}" ry="{ry:.2f}" '
                f'fill="{fill}" stroke="{stroke}" stroke-width="{max(1.0, 1.0*scale):.2f}"/>'
            )
        else:
            x = sx(info["x"]); y = sy(info["y"])
            svg_parts.append(
                f'<rect x="{x:.2f}" y="{y:.2f}" width="{w:.2f}" height="{h:.2f}" '
                f'rx="{2*scale:.2f}" ry="{2*scale:.2f}" '
                f'fill="{fill}" stroke="{stroke}" stroke-width="{max(1.0, 1.0*scale):.2f}"/>'
            )

        # Label
        if show_labels and info["label"]:
            svg_parts.append(
                f'<text x="{cx:.2f}" y="{cy:.2f}" fill="#000" font-size="{max(8, int(10*scale))}" '
                f'text-anchor="middle" dominant-baseline="middle">{_escape_html(info["label"])}</text>'
            )

    svg_parts.append('</svg>')

    # Wrap in scrollable container
    html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8"/>
<title>{Path(graphml_path).name} — scrollable SVG</title>
<style>
  body {{
    margin: 0;
    font-family: system-ui, -apple-system, Segoe UI, Roboto, sans-serif;
  }}
  .toolbar {{
    padding: 8px 12px;
    background: #fafafa;
    border-bottom: 1px solid #ddd;
    position: sticky;
    top: 0;
    z-index: 2;
  }}
  .viewport {{
    width: {viewport_w}px;
    height: {viewport_h}px;
    overflow: auto;
    border: 1px solid #ccc;
    margin: 10px;
    box-shadow: 0 1px 3px rgba(0,0,0,0.06);
    background: #fff;
  }}
  .hint {{
    color: #666; font-size: 12px;
  }}
</style>
</head>
<body>
  <div class="toolbar">
    <strong>{Path(graphml_path).name}</strong>
    <span class="hint"> | Scroll to pan. Scale={scale:.2f}, Canvas={int(svg_w)}×{int(svg_h)}px</span>
  </div>
  <div class="viewport">
    {''.join(svg_parts)}
  </div>
</body>
</html>
"""

    out = Path(out_html)
    out.write_text(html, encoding="utf-8")
    print(f"Wrote scrollable viewer to: {out.resolve()}")

def _escape_html(s: str) -> str:
    return (
        s.replace("&", "&amp;")
         .replace("<", "&lt;")
         .replace(">", "&gt;")
         .replace('"', "&quot;")
    )

In [4]:
write_scrollable_svg_html(
    graphml_path="/home/popsatorn/Desktop/HiMCM_kickoff_2026/code/HIMCM_graph_assignedNode.graphml",
    out_html="/home/popsatorn/Desktop/HiMCM_kickoff_2026/code/HIMCM_graph_assignedNode.html",
    scale=1.3,
    viewport_w=1600,
    viewport_h=1000,
    show_labels=True,
    show_edges=True,
)

Wrote scrollable viewer to: /home/popsatorn/Desktop/HiMCM_kickoff_2026/code/HIMCM_graph_assignedNode.html
