# Process DAG Extraction and Graph Ingestion

This walkthrough shows how to extract a directed acyclic graph (DAG) representing a business process from raw text with `spindle.extractor.extract_process_graph`, and then persist it as triples inside the embedded Kùzu-backed `GraphStore`.



## Prerequisites

- Install dependencies (if you have not already):
  ```bash
  uv pip install kuzu pandas
  ```
- Make sure the Spindle package is installed in editable mode so notebook imports resolve correctly.



In [12]:
from __future__ import annotations

import json
from datetime import datetime

import pandas as pd

from spindle import extractor, triples_to_dict
from spindle.graph_store import GraphStore
from spindle.baml_client.types import (
    AttributeValue,
    CharacterSpan,
    Entity,
    ProcessExtractionResult,
    ProcessGraph,
    SourceMetadata,
    Triple,
)



## 1. Provide a process narrative

We will work with a short operations playbook describing how support tickets are triaged and resolved. The text intentionally contains parallel work and a feedback loop so we can see how the DAG captures dependencies.



In [2]:
sample_text = """
When a new customer ticket arrives, the support lead logs it in the tracker and assigns an initial priority.
High-priority tickets require an on-call engineer to begin diagnosis immediately while the support lead notifies the customer that work has started.
For medium and low priority tickets, an agent drafts a response template and checks previous similar cases before the engineer is looped in.
After an engineer finishes investigating, they post a root-cause summary and proposed fix.
The support lead reviews the fix, confirms customer communication, and, if the customer reports success, closes the ticket.
If the fix fails, the lead reopens the ticket and the engineer repeats the diagnosis step.
"""

process_hint = "Customer support ticket resolution workflow"
print(sample_text)




When a new customer ticket arrives, the support lead logs it in the tracker and assigns an initial priority.
High-priority tickets require an on-call engineer to begin diagnosis immediately while the support lead notifies the customer that work has started.
For medium and low priority tickets, an agent drafts a response template and checks previous similar cases before the engineer is looped in.
After an engineer finishes investigating, they post a root-cause summary and proposed fix.
The support lead reviews the fix, confirms customer communication, and, if the customer reports success, closes the ticket.
If the fix fails, the lead reopens the ticket and the engineer repeats the diagnosis step.



## 2. Extract the process DAG

`extract_process_graph` returns a `ProcessExtractionResult` with a structured `ProcessGraph`. We pass the optional `process_hint` to provide extra context for the model.



In [3]:
result: ProcessExtractionResult = extractor.extract_process_graph(
    text=sample_text,
    process_hint=process_hint,
)

result.status, len(result.issues)



2025-11-07T16:12:01.859 [BAML [92mINFO[0m] [35mFunction ExtractProcessGraph[0m:
    [33mClient: CustomHaiku (claude-3-5-haiku-20241022) - 24045ms. StopReason: end_turn. Tokens(in/out): 1206/1735[0m
    [34m---PROMPT---[0m
    [2m[43muser: [0m[2mYou are a process modelling specialist. Extract a directed acyclic graph (DAG) that captures the end-to-end process described in the provided text. Represent the process with well-defined steps and explicit dependencies.TEXT:
    
    When a new customer ticket arrives, the support lead logs it in the tracker and assigns an initial priority.
    High-priority tickets require an on-call engineer to begin diagnosis immediately while the support lead notifies the customer that work has started.
    For medium and low priority tickets, an agent drafts a response template and checks previous similar cases before the engineer is looped in.
    After an engineer finishes investigating, they post a root-cause summary and proposed fix.
    Th

('process_found', 1)

In [4]:
if result.issues:
    for issue in result.issues:
        print(f"Issue: {issue.code} -> {issue.message}")
else:
    print("No validation issues raised.")



Issue: cycle_detected -> Detected cycles within the process dependencies; review relations or step ordering.


### Inspect extracted steps

The `ProcessGraph` captures every step, its type, and supporting evidence. Let’s tabulate the steps for a quick glance.



In [5]:
process_graph: ProcessGraph = result.graph

steps_df = pd.DataFrame(
    [
        {
            "step_id": step.step_id,
            "title": step.title,
            "type": step.step_type.value if hasattr(step.step_type, "value") else str(step.step_type),
            "actors": ", ".join(step.actors),
            "inputs": ", ".join(step.inputs),
            "outputs": ", ".join(step.outputs),
            "prerequisites": ", ".join(step.prerequisites),
        }
        for step in process_graph.steps
    ]
)
steps_df



Unnamed: 0,step_id,title,type,actors,inputs,outputs,prerequisites
0,ticket_arrival,Ticket Logging,EVENT,Support Lead,Customer Ticket,"Logged Ticket, Initial Priority",
1,priority_handling,Priority-Based Routing,DECISION,Support Lead,Initial Priority,Routing Decision,ticket_arrival
2,high_priority_diagnosis,High Priority Diagnosis,ACTIVITY,On-Call Engineer,Ticket Details,Initial Diagnosis,priority_handling
3,medium_low_preparation,Ticket Preparation,ACTIVITY,Support Agent,Ticket Details,"Response Template, Case History Review",priority_handling
4,technical_investigation,Technical Diagnosis,ACTIVITY,Engineer,"Ticket Details, Case History","Root Cause Summary, Proposed Fix","high_priority_diagnosis, medium_low_preparation"
5,fix_review,Solution Validation,ACTIVITY,Support Lead,"Proposed Fix, Root Cause Summary","Verified Solution, Customer Communication",technical_investigation
6,ticket_closure,Ticket Resolution,DECISION,Support Lead,Customer Feedback,"Closed Ticket, Reopened Ticket",fix_review


### Dependency edges

Each dependency includes a relation label (defaults to `precedes`) and optional conditional notes.



In [6]:
pd.DataFrame(
    [
        {
            "from": dependency.from_step,
            "to": dependency.to_step,
            "relation": dependency.relation or "precedes",
            "condition": dependency.condition or "",
            "evidence": " | ".join(span.text for span in dependency.evidence),
        }
        for dependency in process_graph.dependencies
    ]
)



Unnamed: 0,from,to,relation,condition,evidence
0,ticket_arrival,priority_handling,precedes,,
1,priority_handling,high_priority_diagnosis,enables,ticket is high priority,
2,priority_handling,medium_low_preparation,enables,ticket is medium or low priority,
3,high_priority_diagnosis,technical_investigation,precedes,,
4,medium_low_preparation,technical_investigation,precedes,,
5,technical_investigation,fix_review,precedes,,
6,fix_review,ticket_closure,precedes,,
7,ticket_closure,technical_investigation,loops_back,fix failed,


## 3. Convert the process DAG to triples

`GraphStore` works with knowledge-graph triples. We will flatten each process dependency into a `(step -> relation -> step)` triple and preserve rich step metadata inside `Entity.custom_atts`.



In [7]:
def process_graph_to_triples(
    graph: ProcessGraph,
    source_name: str,
    source_url: str | None = None,
) -> list[Triple]:
    """Flatten a process graph into Triple objects suitable for GraphStore."""

    if graph is None:
        return []

    source_metadata = SourceMetadata(source_name=source_name, source_url=source_url)
    extraction_time = datetime.utcnow().isoformat()

    # Build entity objects with reusable metadata
    step_entities: dict[str, Entity] = {}
    for step in graph.steps:
        attrs = {
            "step_type": AttributeValue(value=str(step.step_type), type="string"),
            "actors": AttributeValue(value=json.dumps(step.actors), type="json"),
            "inputs": AttributeValue(value=json.dumps(step.inputs), type="json"),
            "outputs": AttributeValue(value=json.dumps(step.outputs), type="json"),
            "duration": AttributeValue(value=step.duration or "", type="string"),
        }
        if graph.process_name:
            attrs["process_name"] = AttributeValue(value=graph.process_name, type="string")
        if graph.scope:
            attrs["scope"] = AttributeValue(value=graph.scope, type="string")

        node_label = f"{graph.process_name or 'Process'}::{step.step_id}" if graph.process_name else step.step_id
        step_entities[step.step_id] = Entity(
            name=node_label,
            type="ProcessStep",
            description=step.summary,
            custom_atts=attrs,
        )

    # Build triples using dependency edges
    triples: list[Triple] = []
    for dependency in graph.dependencies:
        subject = step_entities[dependency.from_step]
        obj = step_entities[dependency.to_step]

        spans = [
            CharacterSpan(text=span.text, start=span.start, end=span.end)
            for span in dependency.evidence
        ] or [CharacterSpan(text="Derived from process description", start=None, end=None)]

        predicate = dependency.relation or "precedes"
        if dependency.condition:
            predicate = f"{predicate} (condition: {dependency.condition})"

        triples.append(
            Triple(
                subject=subject,
                predicate=predicate,
                object=obj,
                source=source_metadata,
                supporting_spans=spans,
                extraction_datetime=extraction_time,
            )
        )

    return triples



In [14]:
process_triples = process_graph_to_triples(
    graph=process_graph,
    source_name="support_ticket_playbook.md",
)

process_triples[0]



Triple(subject=Entity(name='Customer Support Ticket Resolution::ticket_arrival', type='ProcessStep', description='New customer ticket enters the support system', custom_atts={'step_type': AttributeValue(value='ProcessStepType.EVENT', type='string'), 'actors': AttributeValue(value='["Support Lead"]', type='json'), 'inputs': AttributeValue(value='["Customer Ticket"]', type='json'), 'outputs': AttributeValue(value='["Logged Ticket", "Initial Priority"]', type='json'), 'duration': AttributeValue(value='', type='string'), 'process_name': AttributeValue(value='Customer Support Ticket Resolution', type='string'), 'scope': AttributeValue(value='End-to-end ticket handling workflow', type='string')}), predicate='precedes', object=Entity(name='Customer Support Ticket Resolution::priority_handling', type='ProcessStep', description='Determine ticket handling approach based on priority', custom_atts={'step_type': AttributeValue(value='ProcessStepType.DECISION', type='string'), 'actors': AttributeVal

## 4. Load the triples into `GraphStore`

Kùzu stores data on disk. We create a demo database named `process_demo` (located under `spindle/graphs/process_demo/graph.db`). Re-running the notebook will reset the database to avoid duplicate data.



In [15]:
graph_name = "process_demo"

store = GraphStore(graph_name)
store.delete_graph()
store.create_graph(graph_name)

inserted = store.add_triples(process_triples)
print(f"Inserted {inserted} process triples into '{graph_name}'.")



Inserted 8 process triples into 'process_demo'.


In [17]:
edges = store.query_by_pattern(predicate="precedes")

dependency_df = pd.DataFrame(
    [
        {
            "subject": edge["subject"],
            "predicate": edge["predicate"],
            "object": edge["object"],
        }
        for edge in edges
    ]
)

dependency_df



Unnamed: 0,subject,predicate,object
0,CUSTOMER SUPPORT TICKET RESOLUTION::TICKET_ARR...,PRECEDES,CUSTOMER SUPPORT TICKET RESOLUTION::PRIORITY_H...
1,CUSTOMER SUPPORT TICKET RESOLUTION::HIGH_PRIOR...,PRECEDES,CUSTOMER SUPPORT TICKET RESOLUTION::TECHNICAL_...
2,CUSTOMER SUPPORT TICKET RESOLUTION::MEDIUM_LOW...,PRECEDES,CUSTOMER SUPPORT TICKET RESOLUTION::TECHNICAL_...
3,CUSTOMER SUPPORT TICKET RESOLUTION::TECHNICAL_...,PRECEDES,CUSTOMER SUPPORT TICKET RESOLUTION::FIX_REVIEW
4,CUSTOMER SUPPORT TICKET RESOLUTION::FIX_REVIEW,PRECEDES,CUSTOMER SUPPORT TICKET RESOLUTION::TICKET_CLO...


Note: `GraphStore` normalizes entity names to uppercase when storing them. You can map them back to human-friendly labels if needed when presenting results.



## 5. Next steps

- Use `GraphStore.query_cypher` for more expressive queries or integrate the process DAG with other knowledge graphs imported from documents.
- Extend the conversion logic to store additional attributes (e.g., cycle warnings) or to sync process data with external workflow tools.
- When you are done exploring, call `store.close()` or use the context manager interface to release the database handle.



In [None]:
store.close()

