# Graph Plugin Interface

> Domain-specific plugin interface for Context Graphs

In [None]:
#| default_exp plugin_interface

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
from abc import abstractmethod
from typing import Any, Dict, List, Optional, Union

from cjm_plugin_system.core.interface import PluginInterface

from cjm_graph_plugin_system.core import (
    GraphNode,
    GraphEdge,
    GraphContext,
    GraphQuery,
    SourceRef
)

## GraphPlugin

Abstract base class for all Context Graph plugins. Provides a standardized interface for:

- **CRUD Operations**: Create, read, update, delete nodes and edges
- **Traversal**: Get neighborhood context around a node
- **Querying**: Execute raw queries or structured searches
- **Lifecycle**: Import/export graph data, introspect schema

In [None]:
#| export
class GraphPlugin(PluginInterface):
    """Abstract base class for all Context Graph plugins."""

    entry_point_group = "graph.plugins"  # Entry point for discovery

    # -------------------------------------------------------------------------
    # CREATE
    # -------------------------------------------------------------------------

    @abstractmethod
    def add_nodes(
        self,
        nodes: List[GraphNode]  # Nodes to create
    ) -> List[str]:  # Created node IDs
        """Bulk create nodes."""
        ...

    @abstractmethod
    def add_edges(
        self,
        edges: List[GraphEdge]  # Edges to create
    ) -> List[str]:  # Created edge IDs
        """Bulk create edges."""
        ...

    # -------------------------------------------------------------------------
    # READ
    # -------------------------------------------------------------------------

    @abstractmethod
    def get_node(
        self,
        node_id: str  # UUID of node to retrieve
    ) -> Optional[GraphNode]:  # Node or None if not found
        """Get a single node by ID."""
        ...

    @abstractmethod
    def get_edge(
        self,
        edge_id: str  # UUID of edge to retrieve
    ) -> Optional[GraphEdge]:  # Edge or None if not found
        """Get a single edge by ID."""
        ...

    @abstractmethod
    def get_context(
        self,
        node_id: str,  # Starting node UUID
        depth: int = 1,  # Traversal depth (1 = immediate neighbors)
        filter_labels: Optional[List[str]] = None  # Only include nodes with these labels
    ) -> GraphContext:  # Subgraph containing node and its neighborhood
        """Get the neighborhood of a specific node."""
        ...

    @abstractmethod
    def find_nodes_by_source(
        self,
        source_ref: SourceRef  # External resource reference
    ) -> List[GraphNode]:  # Nodes attached to this source
        """Find all nodes linked to a specific external resource."""
        ...

    @abstractmethod
    def find_nodes_by_label(
        self,
        label: str,  # Node label to search for
        limit: int = 100  # Max results
    ) -> List[GraphNode]:  # Matching nodes
        """Find nodes by label."""
        ...

    @abstractmethod
    def execute(
        self,
        query: Union[GraphQuery, str],  # Query object or raw query string
        **kwargs
    ) -> GraphContext:  # Query results as a subgraph
        """Execute a generic query against the graph."""
        ...

    # -------------------------------------------------------------------------
    # UPDATE
    # -------------------------------------------------------------------------

    @abstractmethod
    def update_node(
        self,
        node_id: str,  # UUID of node to update
        properties: Dict[str, Any]  # Properties to merge/update
    ) -> bool:  # True if successful
        """Partial update of node properties."""
        ...

    @abstractmethod
    def update_edge(
        self,
        edge_id: str,  # UUID of edge to update
        properties: Dict[str, Any]  # Properties to merge/update
    ) -> bool:  # True if successful
        """Partial update of edge properties."""
        ...

    # -------------------------------------------------------------------------
    # DELETE
    # -------------------------------------------------------------------------

    @abstractmethod
    def delete_nodes(
        self,
        node_ids: List[str],  # UUIDs of nodes to delete
        cascade: bool = True  # Also delete connected edges
    ) -> int:  # Number of nodes deleted
        """Delete nodes (and optionally connected edges)."""
        ...

    @abstractmethod
    def delete_edges(
        self,
        edge_ids: List[str]  # UUIDs of edges to delete
    ) -> int:  # Number of edges deleted
        """Delete edges."""
        ...

    # -------------------------------------------------------------------------
    # LIFECYCLE & INTROSPECTION
    # -------------------------------------------------------------------------

    @abstractmethod
    def get_schema(self) -> Dict[str, Any]:  # Graph schema/ontology
        """Return the current ontology/schema of the graph."""
        ...

    @abstractmethod
    def import_graph(
        self,
        graph_data: GraphContext,  # Data to import
        merge_strategy: str = "overwrite"  # "overwrite", "skip", or "merge"
    ) -> Dict[str, int]:  # Import statistics {nodes_created, edges_created, ...}
        """Bulk import a GraphContext (e.g., from backup or another plugin)."""
        ...

    @abstractmethod
    def export_graph(
        self,
        filter_query: Optional[GraphQuery] = None  # Optional filter
    ) -> GraphContext:  # Exported subgraph or full graph
        """Export the entire graph or a filtered subset."""
        ...

## Interface Documentation

### CREATE Operations

In [None]:
show_doc(GraphPlugin.add_nodes)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L33){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.add_nodes

```python

def add_nodes(
    nodes:List, # Nodes to create
)->List: # Created node IDs


```

*Bulk create nodes.*

In [None]:
show_doc(GraphPlugin.add_edges)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L41){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.add_edges

```python

def add_edges(
    edges:List, # Edges to create
)->List: # Created edge IDs


```

*Bulk create edges.*

### READ Operations

In [None]:
show_doc(GraphPlugin.get_node)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L53){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.get_node

```python

def get_node(
    node_id:str, # UUID of node to retrieve
)->Optional: # Node or None if not found


```

*Get a single node by ID.*

In [None]:
show_doc(GraphPlugin.get_edge)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L61){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.get_edge

```python

def get_edge(
    edge_id:str, # UUID of edge to retrieve
)->Optional: # Edge or None if not found


```

*Get a single edge by ID.*

In [None]:
show_doc(GraphPlugin.get_context)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L69){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.get_context

```python

def get_context(
    node_id:str, # Starting node UUID
    depth:int=1, # Traversal depth (1 = immediate neighbors)
    filter_labels:Optional=None, # Only include nodes with these labels
)->GraphContext: # Subgraph containing node and its neighborhood


```

*Get the neighborhood of a specific node.*

In [None]:
show_doc(GraphPlugin.find_nodes_by_source)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L79){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.find_nodes_by_source

```python

def find_nodes_by_source(
    source_ref:SourceRef, # External resource reference
)->List: # Nodes attached to this source


```

*Find all nodes linked to a specific external resource.*

In [None]:
show_doc(GraphPlugin.find_nodes_by_label)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L87){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.find_nodes_by_label

```python

def find_nodes_by_label(
    label:str, # Node label to search for
    limit:int=100, # Max results
)->List: # Matching nodes


```

*Find nodes by label.*

In [None]:
show_doc(GraphPlugin.execute)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L96){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.execute

```python

def execute(
    query:Union, # Query object or raw query string
    kwargs:VAR_KEYWORD
)->GraphContext: # Query results as a subgraph


```

*Execute a generic query against the graph.*

### UPDATE Operations

In [None]:
show_doc(GraphPlugin.update_node)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L109){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.update_node

```python

def update_node(
    node_id:str, # UUID of node to update
    properties:Dict, # Properties to merge/update
)->bool: # True if successful


```

*Partial update of node properties.*

In [None]:
show_doc(GraphPlugin.update_edge)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L118){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.update_edge

```python

def update_edge(
    edge_id:str, # UUID of edge to update
    properties:Dict, # Properties to merge/update
)->bool: # True if successful


```

*Partial update of edge properties.*

### DELETE Operations

In [None]:
show_doc(GraphPlugin.delete_nodes)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L131){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.delete_nodes

```python

def delete_nodes(
    node_ids:List, # UUIDs of nodes to delete
    cascade:bool=True, # Also delete connected edges
)->int: # Number of nodes deleted


```

*Delete nodes (and optionally connected edges).*

In [None]:
show_doc(GraphPlugin.delete_edges)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L140){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.delete_edges

```python

def delete_edges(
    edge_ids:List, # UUIDs of edges to delete
)->int: # Number of edges deleted


```

*Delete edges.*

### LIFECYCLE Operations

In [None]:
show_doc(GraphPlugin.get_schema)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L152){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.get_schema

```python

def get_schema(
    
)->Dict: # Graph schema/ontology


```

*Return the current ontology/schema of the graph.*

In [None]:
show_doc(GraphPlugin.import_graph)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L157){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.import_graph

```python

def import_graph(
    graph_data:GraphContext, # Data to import
    merge_strategy:str='overwrite', # "overwrite", "skip", or "merge"
)->Dict: # Import statistics {nodes_created, edges_created, ...}


```

*Bulk import a GraphContext (e.g., from backup or another plugin).*

In [None]:
show_doc(GraphPlugin.export_graph)

---

[source](https://github.com/cj-mills/cjm-graph-plugin-system/blob/main/cjm_graph_plugin_system/plugin_interface.py#L166){target="_blank" style="float:right; font-size:smaller"}

### GraphPlugin.export_graph

```python

def export_graph(
    filter_query:Optional=None, # Optional filter
)->GraphContext: # Exported subgraph or full graph


```

*Export the entire graph or a filtered subset.*

## Example Schema Return Value

The `get_schema()` method returns information about the current graph ontology:

```python
{
    "node_labels": ["Person", "Concept", "Correction", "Event"],
    "edge_types": ["MENTIONS", "CORRECTS", "AUTHORED_BY", "RELATED_TO"],
    "counts": {
        "nodes": {"Person": 12, "Concept": 45, "Correction": 3},
        "edges": {"MENTIONS": 78, "CORRECTS": 5}
    },
    "properties": {
        "Person": ["name", "role", "confidence"],
        "Concept": ["name", "definition", "aliases"]
    }
}
```

This is useful for UIs that need to know available labels and types for filtering/querying.

## Example Implementation

A minimal in-memory implementation demonstrating the interface:

In [None]:
class InMemoryGraphPlugin(GraphPlugin):
    """Simple in-memory implementation for testing."""

    def __init__(self):
        self._nodes: Dict[str, GraphNode] = {}
        self._edges: Dict[str, GraphEdge] = {}
        self._config: Dict[str, Any] = {}

    @property
    def name(self) -> str:
        return "in-memory-graph"

    @property
    def version(self) -> str:
        return "1.0.0"

    def initialize(self, config: Optional[Dict[str, Any]] = None) -> None:
        self._config = config or {}

    def get_config_schema(self) -> Dict[str, Any]:
        return {"type": "object", "properties": {}}

    def get_current_config(self) -> Dict[str, Any]:
        return self._config

    def cleanup(self) -> None:
        self._nodes.clear()
        self._edges.clear()

    # CREATE
    def add_nodes(self, nodes: List[GraphNode]) -> List[str]:
        ids = []
        for node in nodes:
            self._nodes[node.id] = node
            ids.append(node.id)
        return ids

    def add_edges(self, edges: List[GraphEdge]) -> List[str]:
        ids = []
        for edge in edges:
            self._edges[edge.id] = edge
            ids.append(edge.id)
        return ids

    # READ
    def get_node(self, node_id: str) -> Optional[GraphNode]:
        return self._nodes.get(node_id)

    def get_edge(self, edge_id: str) -> Optional[GraphEdge]:
        return self._edges.get(edge_id)

    def get_context(self, node_id: str, depth: int = 1, filter_labels: Optional[List[str]] = None) -> GraphContext:
        # Simple 1-hop implementation
        nodes = []
        edges = []
        
        if node_id in self._nodes:
            nodes.append(self._nodes[node_id])
            
        for edge in self._edges.values():
            if edge.source_id == node_id or edge.target_id == node_id:
                edges.append(edge)
                other_id = edge.target_id if edge.source_id == node_id else edge.source_id
                if other_id in self._nodes:
                    neighbor = self._nodes[other_id]
                    if filter_labels is None or neighbor.label in filter_labels:
                        if neighbor not in nodes:
                            nodes.append(neighbor)
                            
        return GraphContext(nodes=nodes, edges=edges, metadata={"depth": depth})

    def find_nodes_by_source(self, source_ref: SourceRef) -> List[GraphNode]:
        results = []
        for node in self._nodes.values():
            for src in node.sources:
                if (src.plugin_name == source_ref.plugin_name and
                    src.table_name == source_ref.table_name and
                    src.row_id == source_ref.row_id):
                    results.append(node)
                    break
        return results

    def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]:
        return [n for n in self._nodes.values() if n.label == label][:limit]

    def execute(self, query: Union[GraphQuery, str], **kwargs) -> GraphContext:
        # Simplified: just return all data
        return GraphContext(
            nodes=list(self._nodes.values()),
            edges=list(self._edges.values()),
            metadata={"query": str(query)}
        )

    # UPDATE
    def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool:
        if node_id not in self._nodes:
            return False
        self._nodes[node_id].properties.update(properties)
        return True

    def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool:
        if edge_id not in self._edges:
            return False
        self._edges[edge_id].properties.update(properties)
        return True

    # DELETE
    def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int:
        count = 0
        for nid in node_ids:
            if nid in self._nodes:
                del self._nodes[nid]
                count += 1
                if cascade:
                    # Remove connected edges
                    to_remove = [eid for eid, e in self._edges.items()
                                 if e.source_id == nid or e.target_id == nid]
                    for eid in to_remove:
                        del self._edges[eid]
        return count

    def delete_edges(self, edge_ids: List[str]) -> int:
        count = 0
        for eid in edge_ids:
            if eid in self._edges:
                del self._edges[eid]
                count += 1
        return count

    # LIFECYCLE
    def get_schema(self) -> Dict[str, Any]:
        labels = set(n.label for n in self._nodes.values())
        types = set(e.relation_type for e in self._edges.values())
        return {
            "node_labels": list(labels),
            "edge_types": list(types),
            "counts": {
                "nodes": len(self._nodes),
                "edges": len(self._edges)
            }
        }

    def import_graph(self, graph_data: GraphContext, merge_strategy: str = "overwrite") -> Dict[str, int]:
        nodes_created = 0
        edges_created = 0
        
        for node in graph_data.nodes:
            if merge_strategy == "skip" and node.id in self._nodes:
                continue
            self._nodes[node.id] = node
            nodes_created += 1
            
        for edge in graph_data.edges:
            if merge_strategy == "skip" and edge.id in self._edges:
                continue
            self._edges[edge.id] = edge
            edges_created += 1
            
        return {"nodes_created": nodes_created, "edges_created": edges_created}

    def export_graph(self, filter_query: Optional[GraphQuery] = None) -> GraphContext:
        return GraphContext(
            nodes=list(self._nodes.values()),
            edges=list(self._edges.values()),
            metadata={"exported_at": "now"}
        )

In [None]:
#| eval: false
import uuid

# Test the example plugin
plugin = InMemoryGraphPlugin()
plugin.initialize()

print(f"Plugin: {plugin.name} v{plugin.version}")

Plugin: in-memory-graph v1.0.0


In [None]:
#| eval: false
# Create some nodes
alice_id = str(uuid.uuid4())
bob_id = str(uuid.uuid4())
ml_id = str(uuid.uuid4())

nodes = [
    GraphNode(id=alice_id, label="Person", properties={"name": "Alice"}),
    GraphNode(id=bob_id, label="Person", properties={"name": "Bob"}),
    GraphNode(id=ml_id, label="Concept", properties={"name": "Machine Learning"})
]

created_ids = plugin.add_nodes(nodes)
print(f"Created {len(created_ids)} nodes")

Created 3 nodes


In [None]:
#| eval: false
# Create edges
edges = [
    GraphEdge(id=str(uuid.uuid4()), source_id=alice_id, target_id=ml_id, relation_type="MENTIONS"),
    GraphEdge(id=str(uuid.uuid4()), source_id=bob_id, target_id=ml_id, relation_type="MENTIONS"),
    GraphEdge(id=str(uuid.uuid4()), source_id=alice_id, target_id=bob_id, relation_type="KNOWS")
]

created_ids = plugin.add_edges(edges)
print(f"Created {len(created_ids)} edges")

Created 3 edges


In [None]:
#| eval: false
# Test get_context (neighborhood traversal)
context = plugin.get_context(alice_id, depth=1)
print(f"Alice's neighborhood: {len(context.nodes)} nodes, {len(context.edges)} edges")
print(f"Neighbors: {[n.properties.get('name', n.label) for n in context.nodes]}")

Alice's neighborhood: 3 nodes, 2 edges
Neighbors: ['Alice', 'Machine Learning', 'Bob']


In [None]:
#| eval: false
# Test find_nodes_by_label
people = plugin.find_nodes_by_label("Person")
print(f"People: {[p.properties['name'] for p in people]}")

People: ['Alice', 'Bob']


In [None]:
#| eval: false
# Test get_schema
schema = plugin.get_schema()
print(f"Schema: {schema}")

Schema: {'node_labels': ['Person', 'Concept'], 'edge_types': ['MENTIONS', 'KNOWS'], 'counts': {'nodes': 3, 'edges': 3}}


In [None]:
#| eval: false
# Test update_node
plugin.update_node(alice_id, {"role": "speaker", "confidence": 0.95})
alice = plugin.get_node(alice_id)
print(f"Updated Alice: {alice.properties}")

Updated Alice: {'name': 'Alice', 'role': 'speaker', 'confidence': 0.95}


In [None]:
#| eval: false
# Test export/import
exported = plugin.export_graph()
print(f"Exported: {len(exported.nodes)} nodes, {len(exported.edges)} edges")

# Import into a new plugin instance
new_plugin = InMemoryGraphPlugin()
new_plugin.initialize()
stats = new_plugin.import_graph(exported)
print(f"Import stats: {stats}")

Exported: 3 nodes, 3 edges
Import stats: {'nodes_created': 3, 'edges_created': 3}


In [None]:
#| eval: false
# Test delete with cascade
deleted = plugin.delete_nodes([alice_id], cascade=True)
print(f"Deleted {deleted} node(s)")
print(f"Remaining: {plugin.get_schema()}")

Deleted 1 node(s)
Remaining: {'node_labels': ['Person', 'Concept'], 'edge_types': ['MENTIONS'], 'counts': {'nodes': 2, 'edges': 1}}


In [None]:
#| eval: false
# Cleanup
plugin.cleanup()
new_plugin.cleanup()

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()