In [5]:
import networkx as nx
import re

# Task 3
This notebook implements Task 3 of the ManualsGraph assignment: taking extracted constraints (assumed done in Task 2) and grounding them in a minimal graph schema that supports a neuro-symbolic verifier for proposed LLM actions.

The goal is not to build a full pipeline or production database. Instead, I’m demonstrating the smallest working loop that can serve as a guardrail:

Manual → Components → Constraints → Verify action → ALLOWED / BLOCKED / REWRITE

I use a robot arm datasheet example (Kawasaki KJ series) and focus on torque limits because they are numeric, safety-relevant, and easy to represent as inequalities.

* Minimal graph schema (semantic prior)
* Neuro-symbolic verification logic (guardrails)

## Graph outline:

I represent manual knowledge with just three 

### node types:

* `Manual`: the source document and scope

* `Component`: the machine part the constraint applies to (e.g., joint JT4)

* `Constraint`: a structured limit with a property, operator, value, unit, and provenance

### Edges encode the relationships:

`Manual -[DESCRIBES]-> Machine (document governance / scope)`

`Machine -[HAS_COMPONENT]-> Component`

`Constraint -[LIMITS_PARAMETER]-> Component`

`Constraint -[EVIDENCED_BY]-> Sentence (optional provenance node)`

In [11]:
G = nx.DiGraph()


# 1. Manual & Machine (Document Governance Layer)
G.add_node("manual_robot_arm", label="Manual", title="Kawasaki Robot KJ Series Spec", version="3.0")
G.add_node("kj314j", label="Machine", name="KJ314J Robot Arm", model="KJ314J")
G.add_node("kj264j", label="Machine", name="KJ264J Robot Arm", model="KJ264J")

G.add_edge("manual_robot_arm", "kj314j", type="DESCRIBES")
G.add_edge("manual_robot_arm", "kj264j", type="DESCRIBES")

# 2. Extract Data Loop (Mapping extracted JSON to Schema)
extracted_constraints = [
    {"entity": "KJ314J_JT4", "val": 56.2, "src": "JT4 56.2 N･m 2.19 kg･m2"},
    {"entity": "KJ314J_JT5", "val": 43.4, "src": "JT5 43.4 N･m 1.31 kg･m2"},
    {"entity": "KJ314J_JT6", "val": 22.0, "src": "JT6 22.0 N･m 0.33 kg･m2"},
    {"entity": "KJ264J_JT4", "val": 56.2, "src": "JT4 56.2 N･m 2.19 kg･m2"},
    {"entity": "KJ264J_JT5", "val": 43.4, "src": "JT5 43.4 N･m 1.31 kg･m2"},
    {"entity": "KJ264J_JT6", "val": 22.0, "src": "JT6 22.0 N･m 0.33 kg･m2"}
]

for item in extracted_constraints:
    model_id, jt_id = item["entity"].split("_")
    node_id = item["entity"].lower()
    comp_id = f"comp_{node_id}"
    const_id = f"c_{node_id}_torque"
    sent_id = f"s_{node_id}_spec"

    # Component Node (Axis)
    G.add_node(comp_id, label="Component", name=jt_id, component_type="axis")
    G.add_edge(model_id.lower(), comp_id, type="HAS_COMPONENT")

    # Sentence Node (Provenance)
    G.add_node(sent_id, label="Sentence", text=item["src"])

    # Constraint Node (Parameter Limit)
    G.add_node(const_id, label="Constraint", 
               property="max_torque", op="<=", value=item["val"], unit="Nm")
    
    # Evidence & Application Edges
    G.add_edge(const_id, comp_id, type="LIMITS_PARAMETER")
    G.add_edge(const_id, sent_id, type="EVIDENCED_BY")

In [7]:
cmd_re = re.compile(
    r"^\s*set\s+(?P<entity>[a-zA-Z\d_\s]+)\s+"
    r"(?P<prop>speed|torque)\s+to\s+"
    r"(?P<val>\d+(?:\.\d+)?)\s*"
    r"(?P<unit>[a-zA-Z°/]+)\s*$",
    re.IGNORECASE
)

def normalize_entity(entity_str: str) -> str:
    """
    Standardizes input strings to match the Graph Node IDs.
    Examples:
    'KJ314J_JT4' -> 'comp_kj314j_jt4'
    'axis_1'     -> 'axis_1' (for the generic machine)
    """
    clean = entity_str.strip().lower().replace(" ", "_")
    # Check if it already looks like a component node in our graph
    if f"comp_{clean}" in G:
        return f"comp_{clean}"
    return clean # Fallback to literal (like 'axis_1')

def get_constraints_for(entity_node: str, prop: str):
    out = []
    if entity_node not in G:
        return out

    # Search for constraints connected to this component via LIMITS_PARAMETER
    # In your graph: Constraint -> LIMITS_PARAMETER -> Component
    for u, v, edata in G.in_edges(entity_node, data=True):
        if edata.get("type") == "LIMITS_PARAMETER":
            c = G.nodes[u]
            # Normalize property name ('max_torque' vs 'torque')
            if c.get("label") == "Constraint" and prop in c.get("property", ""):
                out.append((u, c))
    
    # Generic mapping for 'axis_1' style nodes (APPLIES_TO)
    for u, v, edata in G.in_edges(entity_node, data=True):
        if edata.get("type") == "APPLIES_TO":
            c = G.nodes[u]
            if prop in c.get("property", ""):
                out.append((u, c))

    return out

def verify(command: str):
    m = cmd_re.match(command)
    if not m:
        return {"decision": "REWRITE", "reason": "Invalid syntax. Format: Set [entity] [prop] to [val] [unit]"}

    raw_entity = m.group("entity")
    entity_node = normalize_entity(raw_entity)
    prop = m.group("prop").lower()
    val = float(m.group("val"))
    unit = m.group("unit").strip()

    if entity_node not in G:
        return {"decision": "REWRITE", "reason": f"Unknown machine component: {raw_entity}"}

    constraints = get_constraints_for(entity_node, prop)
    if not constraints:
        return {"decision": "REWRITE", "reason": f"No safety limits found for {raw_entity} {prop}"}

    for c_id, c in constraints:
        # Check units
        c_unit = c.get("unit", "Nm")
        if c_unit.lower() != unit.lower():
            return {"decision": "REWRITE", "reason": f"Unit mismatch: Manual specifies {c_unit}"}

        # Inequality check
        limit = float(c["value"])
        if c.get("op") == "<=" and val > limit:
            return {
                "decision": "BLOCKED",
                "reason": f"Value {val} exceeds manual limit of {limit} {c_unit}.",
                "source": c.get("source_sentence"),
                "rewrite": f"Set {raw_entity} {prop} to {limit} {c_unit}"
            }

    return {"decision": "ALLOWED", "reason": "Command within safe operating parameters."}



In [None]:

tests = [
    "Set KJ314J_JT4 torque to 100 Nm",  # Should be BLOCKED (Limit is 56.2)
    "Set KJ314J_JT4 torque to 8 Nm",       # Should be ALLOWED (Limit is 30)
    "Set axis_1 torque to 40 Nm",      # Should be Rewrite as axis is unknown

]

for t in tests:
    result = verify(t)
    print(f"CMD: {t}")
    print(f"OUT: {result['decision']} - {result['reason']}\n")

CMD: Set KJ314J_JT4 torque to 100 Nm
OUT: BLOCKED - Value 100.0 exceeds manual limit of 56.2 Nm.

CMD: Set KJ314J_JT4 torque to 8 Nm
OUT: ALLOWED - Command within safe operating parameters.

CMD: Set axis_1 torque to 40 Nm
OUT: REWRITE - Unknown machine component: axis_1



The code above is a small prototype that demonstrates the intended loop: parse a command, retrieve the linked constraint from the graph, and check it as an inequality to return ALLOWED, BLOCKED, or REWRITE. In a full system, the parsing and retrieval would be replaced by a DSPy Graph RAG pipeline with a self validation loop, like here: [My made GraphRAG](https://github.com/Str3am786/Scalable_Sys_Project_2/blob/main/src/scalable_sys/rag/graph_rag.py), so the model is forced to produce a consistent structured command and the system can verify that the entity exists, the property matches the manual, and the retrieved constraint actually applies before taking action. Before any comparison, the pipeline should standardize units and scale by mapping unit variants to a canonical form and converting values into a single reference unit per property, using a small conversion dictionary, so the verifier always compares like with like. The symbolic checker remains deterministic and auditable because every decision is grounded in stored constraint nodes that keep provenance back to the source sentence or table row.