# SQLite Graph Plugin

> Plugin implementation for Context Graph using SQLite

In [None]:
#| default_exp plugin

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import json
import logging
import os
import sqlite3
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, Union

from cjm_plugin_system.utils.validation import (
    dict_to_config, config_to_dict, dataclass_to_jsonschema,
    SCHEMA_TITLE, SCHEMA_DESC
)
from cjm_graph_plugin_system.plugin_interface import GraphPlugin
from cjm_graph_plugin_system.core import (
    GraphNode, GraphEdge, GraphContext, GraphQuery, SourceRef
)

from cjm_graph_plugin_sqlite.meta import get_plugin_metadata

## Configuration

In [None]:
#| export
@dataclass
class SQLiteGraphPluginConfig:
    """Configuration for SQLite Graph Plugin."""
    db_path: Optional[str] = field(
        default=None,
        metadata={
            SCHEMA_TITLE: "Database Path",
            SCHEMA_DESC: "Absolute path to SQLite DB. If None, uses default env path."
        }
    )
    readonly: bool = field(
        default=False,
        metadata={
            SCHEMA_TITLE: "Read Only",
            SCHEMA_DESC: "Open database in read-only mode."
        }
    )

## SQLiteGraphPlugin

Local, file-backed Context Graph implementation using SQLite. Stores nodes and edges in relational tables with JSON payloads for properties.

**Schema:**

```sql
-- Nodes table
CREATE TABLE nodes (
    id TEXT PRIMARY KEY,
    label TEXT NOT NULL,
    properties JSON,
    sources JSON,
    created_at REAL,
    updated_at REAL
);

-- Edges table (with foreign keys for cascade delete)
CREATE TABLE edges (
    id TEXT PRIMARY KEY,
    source_id TEXT NOT NULL,
    target_id TEXT NOT NULL,
    relation_type TEXT NOT NULL,
    properties JSON,
    created_at REAL,
    updated_at REAL,
    FOREIGN KEY(source_id) REFERENCES nodes(id) ON DELETE CASCADE,
    FOREIGN KEY(target_id) REFERENCES nodes(id) ON DELETE CASCADE
);
```

In [None]:
#| export
class SQLiteGraphPlugin(GraphPlugin):
    """Local, file-backed Context Graph implementation using SQLite."""

    config_class = SQLiteGraphPluginConfig

    def __init__(self):
        self.logger = logging.getLogger(f"{__name__}.{type(self).__name__}")
        self.config: SQLiteGraphPluginConfig = None
        self._db_path: str = None

    @property
    def name(self) -> str:  # Plugin name identifier
        """Get the plugin name identifier."""
        return "sqlite_graph"

    @property
    def version(self) -> str:  # Plugin version string
        """Get the plugin version string."""
        return "0.1.0"

    def get_current_config(self) -> Dict[str, Any]:  # Current configuration as dictionary
        """Return current configuration state."""
        if not self.config:
            return {}
        return config_to_dict(self.config)

    def get_config_schema(self) -> Dict[str, Any]:  # JSON Schema for configuration
        """Return JSON Schema for UI generation."""
        return dataclass_to_jsonschema(SQLiteGraphPluginConfig)

    def initialize(
        self,
        config: Optional[Any] = None  # Configuration dataclass, dict, or None
    ) -> None:
        """Initialize DB connection and schema."""
        self.config = dict_to_config(SQLiteGraphPluginConfig, config or {})

        # Determine DB path (Config override > Meta default)
        meta_path = get_plugin_metadata()["db_path"]
        self._db_path = self.config.db_path if self.config.db_path else meta_path

        self.logger.info(f"Initializing SQLite Graph at: {self._db_path}")
        self._init_db()

    def _init_db(self) -> None:
        """Create tables and indices."""
        with sqlite3.connect(self._db_path) as con:
            # Enable Foreign Keys
            con.execute("PRAGMA foreign_keys = ON;")

            # Nodes Table
            con.execute("""
                CREATE TABLE IF NOT EXISTS nodes (
                    id TEXT PRIMARY KEY,
                    label TEXT NOT NULL,
                    properties JSON,
                    sources JSON,
                    created_at REAL,
                    updated_at REAL
                )
            """)
            con.execute("CREATE INDEX IF NOT EXISTS idx_nodes_label ON nodes(label);")

            # Edges Table
            con.execute("""
                CREATE TABLE IF NOT EXISTS edges (
                    id TEXT PRIMARY KEY,
                    source_id TEXT NOT NULL,
                    target_id TEXT NOT NULL,
                    relation_type TEXT NOT NULL,
                    properties JSON,
                    created_at REAL,
                    updated_at REAL,
                    FOREIGN KEY(source_id) REFERENCES nodes(id) ON DELETE CASCADE,
                    FOREIGN KEY(target_id) REFERENCES nodes(id) ON DELETE CASCADE
                )
            """)
            con.execute("CREATE INDEX IF NOT EXISTS idx_edges_source ON edges(source_id);")
            con.execute("CREATE INDEX IF NOT EXISTS idx_edges_target ON edges(target_id);")
            con.execute("CREATE INDEX IF NOT EXISTS idx_edges_type ON edges(relation_type);")

    # -------------------------------------------------------------------------
    # Helpers
    # -------------------------------------------------------------------------

    def _row_to_node(
        self,
        row: Tuple  # DB row: (id, label, properties_json, sources_json, created_at, updated_at)
    ) -> GraphNode:  # Reconstructed GraphNode
        """Convert DB row to GraphNode DTO."""
        props = json.loads(row[2]) if row[2] else {}
        sources_raw = json.loads(row[3]) if row[3] else []
        sources = [SourceRef(**s) for s in sources_raw]
        return GraphNode(
            id=row[0], label=row[1], properties=props, sources=sources,
            created_at=row[4] if len(row) > 4 else None,
            updated_at=row[5] if len(row) > 5 else None,
        )

    def _row_to_edge(
        self,
        row: Tuple  # DB row: (id, source_id, target_id, relation_type, properties_json, created_at, updated_at)
    ) -> GraphEdge:  # Reconstructed GraphEdge
        """Convert DB row to GraphEdge DTO."""
        props = json.loads(row[4]) if row[4] else {}
        return GraphEdge(
            id=row[0], source_id=row[1], target_id=row[2],
            relation_type=row[3], properties=props,
            created_at=row[5] if len(row) > 5 else None,
            updated_at=row[6] if len(row) > 6 else None,
        )

    def _dict_to_node(
        self,
        data: Dict[str, Any]  # Node data as dictionary
    ) -> GraphNode:  # Reconstructed GraphNode
        """Convert dictionary to GraphNode, handling nested sources."""
        sources = []
        for s in data.get("sources", []):
            if isinstance(s, dict):
                sources.append(SourceRef(**s))
            else:
                sources.append(s)
        return GraphNode(
            id=data["id"],
            label=data["label"],
            properties=data.get("properties", {}),
            sources=sources,
            created_at=data.get("created_at"),
            updated_at=data.get("updated_at"),
        )

    def _dict_to_edge(
        self,
        data: Dict[str, Any]  # Edge data as dictionary
    ) -> GraphEdge:  # Reconstructed GraphEdge
        """Convert dictionary to GraphEdge."""
        return GraphEdge(
            id=data["id"],
            source_id=data["source_id"],
            target_id=data["target_id"],
            relation_type=data["relation_type"],
            properties=data.get("properties", {}),
            created_at=data.get("created_at"),
            updated_at=data.get("updated_at"),
        )

    # -------------------------------------------------------------------------
    # EXECUTE - Main dispatcher for RemotePluginProxy
    # -------------------------------------------------------------------------

    def execute(
        self,
        action: str = "get_schema",  # Action to perform
        **kwargs
    ) -> Dict[str, Any]:  # JSON-serializable result
        """Dispatch to appropriate method based on action."""

        if action == "get_schema":
            return self.get_schema()

        elif action == "add_nodes":
            # Convert dicts to GraphNode objects
            nodes_data = kwargs.get("nodes", [])
            nodes = []
            for n in nodes_data:
                if isinstance(n, dict):
                    nodes.append(self._dict_to_node(n))
                else:
                    nodes.append(n)
            ids = self.add_nodes(nodes)
            return {"created_ids": ids, "count": len(ids)}

        elif action == "add_edges":
            edges_data = kwargs.get("edges", [])
            edges = []
            for e in edges_data:
                if isinstance(e, dict):
                    edges.append(self._dict_to_edge(e))
                else:
                    edges.append(e)
            ids = self.add_edges(edges)
            return {"created_ids": ids, "count": len(ids)}

        elif action == "get_node":
            node = self.get_node(kwargs["node_id"])
            return {"node": node.to_dict() if node else None}

        elif action == "get_edge":
            edge = self.get_edge(kwargs["edge_id"])
            return {"edge": edge.to_dict() if edge else None}

        elif action == "get_context":
            ctx = self.get_context(
                kwargs["node_id"],
                depth=kwargs.get("depth", 1),
                filter_labels=kwargs.get("filter_labels")
            )
            return ctx.to_dict()

        elif action == "find_nodes_by_source":
            ref_data = kwargs["source_ref"]
            if isinstance(ref_data, dict):
                ref = SourceRef(**ref_data)
            else:
                ref = ref_data
            nodes = self.find_nodes_by_source(ref)
            return {"nodes": [n.to_dict() for n in nodes], "count": len(nodes)}

        elif action == "find_nodes_by_label":
            nodes = self.find_nodes_by_label(
                kwargs["label"],
                limit=kwargs.get("limit", 100)
            )
            return {"nodes": [n.to_dict() for n in nodes], "count": len(nodes)}

        elif action == "update_node":
            success = self.update_node(kwargs["node_id"], kwargs["properties"])
            return {"success": success}

        elif action == "update_edge":
            success = self.update_edge(kwargs["edge_id"], kwargs["properties"])
            return {"success": success}

        elif action == "delete_nodes":
            count = self.delete_nodes(
                kwargs["node_ids"],
                cascade=kwargs.get("cascade", True)
            )
            return {"deleted_count": count}

        elif action == "delete_edges":
            count = self.delete_edges(kwargs["edge_ids"])
            return {"deleted_count": count}

        elif action == "import_graph":
            graph_data = kwargs["graph_data"]
            if isinstance(graph_data, dict):
                graph_data = GraphContext.from_dict(graph_data)
            stats = self.import_graph(
                graph_data,
                merge_strategy=kwargs.get("merge_strategy", "overwrite")
            )
            return stats

        elif action == "export_graph":
            ctx = self.export_graph(filter_query=kwargs.get("filter_query"))
            return ctx.to_dict()

        elif action == "query":
            # Raw query execution (future enhancement)
            query = kwargs.get("query", "")
            self.logger.warning(f"Raw query action not fully implemented: {query}")
            return {"status": "not_implemented", "query": str(query)}

        else:
            raise ValueError(f"Unknown action: {action}")

    # -------------------------------------------------------------------------
    # CREATE
    # -------------------------------------------------------------------------

    def add_nodes(
        self,
        nodes: List[GraphNode]  # Nodes to create
    ) -> List[str]:  # Created node IDs
        """Bulk create nodes."""
        ids = []
        now = time.time()
        with sqlite3.connect(self._db_path) as con:
            for n in nodes:
                sources_json = json.dumps([s.to_dict() for s in n.sources])
                props_json = json.dumps(n.properties)
                try:
                    con.execute(
                        "INSERT INTO nodes (id, label, properties, sources, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
                        (n.id, n.label, props_json, sources_json, now, now)
                    )
                    ids.append(n.id)
                except sqlite3.IntegrityError:
                    self.logger.warning(f"Node ID collision or error: {n.id}")
        return ids

    def add_edges(
        self,
        edges: List[GraphEdge]  # Edges to create
    ) -> List[str]:  # Created edge IDs
        """Bulk create edges."""
        ids = []
        now = time.time()
        with sqlite3.connect(self._db_path) as con:
            con.execute("PRAGMA foreign_keys = ON;")
            for e in edges:
                props_json = json.dumps(e.properties)
                try:
                    con.execute(
                        "INSERT INTO edges (id, source_id, target_id, relation_type, properties, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
                        (e.id, e.source_id, e.target_id, e.relation_type, props_json, now, now)
                    )
                    ids.append(e.id)
                except sqlite3.IntegrityError as err:
                    self.logger.warning(f"Edge creation error (likely missing node): {err}")
        return ids

    # -------------------------------------------------------------------------
    # READ
    # -------------------------------------------------------------------------

    def get_node(
        self,
        node_id: str  # UUID of node to retrieve
    ) -> Optional[GraphNode]:  # Node or None if not found
        """Get a single node by ID."""
        with sqlite3.connect(self._db_path) as con:
            cur = con.execute(
                "SELECT id, label, properties, sources, created_at, updated_at FROM nodes WHERE id = ?",
                (node_id,)
            )
            row = cur.fetchone()
            return self._row_to_node(row) if row else None

    def get_edge(
        self,
        edge_id: str  # UUID of edge to retrieve
    ) -> Optional[GraphEdge]:  # Edge or None if not found
        """Get a single edge by ID."""
        with sqlite3.connect(self._db_path) as con:
            cur = con.execute(
                "SELECT id, source_id, target_id, relation_type, properties, created_at, updated_at FROM edges WHERE id = ?",
                (edge_id,)
            )
            row = cur.fetchone()
            return self._row_to_edge(row) if row else None

    def find_nodes_by_source(
        self,
        source_ref: SourceRef  # External resource reference
    ) -> List[GraphNode]:  # Nodes attached to this source
        """Find all nodes linked to a specific external resource."""
        # Use SQLite's json_each() to search within the sources JSON array
        query = """
            SELECT DISTINCT n.id, n.label, n.properties, n.sources, n.created_at, n.updated_at
            FROM nodes n, json_each(n.sources) as src
            WHERE json_extract(src.value, '$.plugin_name') = ?
              AND json_extract(src.value, '$.row_id') = ?
        """
        params = [source_ref.plugin_name, source_ref.row_id]

        # If segment slice is specified, add it to query
        if source_ref.segment_slice:
            query += " AND json_extract(src.value, '$.segment_slice') = ?"
            params.append(source_ref.segment_slice)

        results = []
        with sqlite3.connect(self._db_path) as con:
            cur = con.execute(query, tuple(params))
            for row in cur:
                results.append(self._row_to_node(row))
        return results

    def find_nodes_by_label(
        self,
        label: str,  # Node label to search for
        limit: int = 100  # Max results
    ) -> List[GraphNode]:  # Matching nodes
        """Find nodes by label."""
        results = []
        with sqlite3.connect(self._db_path) as con:
            cur = con.execute(
                "SELECT id, label, properties, sources, created_at, updated_at FROM nodes WHERE label = ? LIMIT ?",
                (label, limit)
            )
            for row in cur:
                results.append(self._row_to_node(row))
        return results

    def get_context(
        self,
        node_id: str,  # Starting node UUID
        depth: int = 1,  # Traversal depth (1 = immediate neighbors)
        filter_labels: Optional[List[str]] = None  # Only include nodes with these labels
    ) -> GraphContext:  # Subgraph containing node and its neighborhood
        """Get the neighborhood of a specific node."""
        # For depth=1, use simple query; for deeper, use recursive CTE
        edge_ids = []
        with sqlite3.connect(self._db_path) as con:
            if depth == 1:
                cur = con.execute(
                    "SELECT id FROM edges WHERE source_id = ? OR target_id = ?",
                    (node_id, node_id)
                )
            else:
                # Recursive CTE for multi-hop traversal
                query = """
                WITH RECURSIVE traversal(edge_id, node_id, depth) AS (
                    -- Base case: edges connected to the start node
                    SELECT id, 
                           CASE WHEN source_id = ? THEN target_id ELSE source_id END,
                           1
                    FROM edges
                    WHERE source_id = ? OR target_id = ?

                    UNION

                    -- Recursive step: edges connected to discovered nodes
                    SELECT e.id,
                           CASE WHEN e.source_id = t.node_id THEN e.target_id ELSE e.source_id END,
                           t.depth + 1
                    FROM edges e
                    JOIN traversal t ON (e.source_id = t.node_id OR e.target_id = t.node_id)
                    WHERE t.depth < ?
                )
                SELECT DISTINCT edge_id FROM traversal;
                """
                cur = con.execute(query, (node_id, node_id, node_id, depth))

            edge_ids = [row[0] for row in cur.fetchall()]

        # Fetch full Edge objects
        edges = []
        node_ids_in_context = {node_id}  # Always include the center

        if edge_ids:
            placeholders = ','.join('?' for _ in edge_ids)
            with sqlite3.connect(self._db_path) as con:
                cur = con.execute(
                    f"SELECT id, source_id, target_id, relation_type, properties, created_at, updated_at FROM edges WHERE id IN ({placeholders})",
                    tuple(edge_ids)
                )
                for row in cur:
                    e = self._row_to_edge(row)
                    edges.append(e)
                    node_ids_in_context.add(e.source_id)
                    node_ids_in_context.add(e.target_id)

        # Fetch full Node objects
        nodes = []
        if node_ids_in_context:
            placeholders = ','.join('?' for _ in node_ids_in_context)
            with sqlite3.connect(self._db_path) as con:
                sql = f"SELECT id, label, properties, sources, created_at, updated_at FROM nodes WHERE id IN ({placeholders})"

                # Apply optional label filtering
                params = list(node_ids_in_context)
                if filter_labels:
                    sql += f" AND label IN ({','.join('?' for _ in filter_labels)})"
                    params.extend(filter_labels)

                cur = con.execute(sql, tuple(params))
                for row in cur:
                    nodes.append(self._row_to_node(row))

        return GraphContext(
            nodes=nodes,
            edges=edges,
            metadata={"depth": depth, "center": node_id}
        )

    # -------------------------------------------------------------------------
    # UPDATE
    # -------------------------------------------------------------------------

    def update_node(
        self,
        node_id: str,  # UUID of node to update
        properties: Dict[str, Any]  # Properties to merge/update
    ) -> bool:  # True if successful
        """Partial update of node properties."""
        with sqlite3.connect(self._db_path) as con:
            # Fetch existing to merge
            cur = con.execute("SELECT properties FROM nodes WHERE id = ?", (node_id,))
            row = cur.fetchone()
            if not row:
                return False

            existing = json.loads(row[0]) if row[0] else {}
            existing.update(properties)  # Merge

            con.execute(
                "UPDATE nodes SET properties = ?, updated_at = ? WHERE id = ?",
                (json.dumps(existing), time.time(), node_id)
            )
            return True

    def update_edge(
        self,
        edge_id: str,  # UUID of edge to update
        properties: Dict[str, Any]  # Properties to merge/update
    ) -> bool:  # True if successful
        """Partial update of edge properties."""
        with sqlite3.connect(self._db_path) as con:
            cur = con.execute("SELECT properties FROM edges WHERE id = ?", (edge_id,))
            row = cur.fetchone()
            if not row:
                return False

            existing = json.loads(row[0]) if row[0] else {}
            existing.update(properties)

            con.execute(
                "UPDATE edges SET properties = ?, updated_at = ? WHERE id = ?",
                (json.dumps(existing), time.time(), edge_id)
            )
            return True

    # -------------------------------------------------------------------------
    # DELETE
    # -------------------------------------------------------------------------

    def delete_nodes(
        self,
        node_ids: List[str],  # UUIDs of nodes to delete
        cascade: bool = True  # Also delete connected edges
    ) -> int:  # Number of nodes deleted
        """Delete nodes (and optionally connected edges)."""
        with sqlite3.connect(self._db_path) as con:
            if cascade:
                con.execute("PRAGMA foreign_keys = ON;")  # Ensures cascade works
            else:
                con.execute("PRAGMA foreign_keys = OFF;")

            placeholders = ','.join('?' for _ in node_ids)
            cur = con.execute(
                f"DELETE FROM nodes WHERE id IN ({placeholders})",
                tuple(node_ids)
            )
            return cur.rowcount

    def delete_edges(
        self,
        edge_ids: List[str]  # UUIDs of edges to delete
    ) -> int:  # Number of edges deleted
        """Delete edges."""
        with sqlite3.connect(self._db_path) as con:
            placeholders = ','.join('?' for _ in edge_ids)
            cur = con.execute(
                f"DELETE FROM edges WHERE id IN ({placeholders})",
                tuple(edge_ids)
            )
            return cur.rowcount

    # -------------------------------------------------------------------------
    # LIFECYCLE & INTROSPECTION
    # -------------------------------------------------------------------------

    def get_schema(self) -> Dict[str, Any]:  # Graph schema/ontology
        """Return the current ontology/schema of the graph."""
        schema = {"node_labels": [], "edge_types": [], "counts": {}}
        with sqlite3.connect(self._db_path) as con:
            # Labels
            cur = con.execute("SELECT DISTINCT label FROM nodes")
            schema["node_labels"] = [r[0] for r in cur.fetchall()]

            # Types
            cur = con.execute("SELECT DISTINCT relation_type FROM edges")
            schema["edge_types"] = [r[0] for r in cur.fetchall()]

            # Counts
            cur = con.execute("SELECT label, COUNT(*) FROM nodes GROUP BY label")
            for row in cur:
                schema["counts"][row[0]] = row[1]

        return schema

    def import_graph(
        self,
        graph_data: GraphContext,  # Data to import
        merge_strategy: str = "overwrite"  # "overwrite", "skip", or "merge"
    ) -> Dict[str, int]:  # Import statistics {nodes_created, edges_created, ...}
        """Bulk import a GraphContext (e.g., from backup or another plugin)."""
        # Reuse bulk add methods (currently uses overwrite via INSERT)
        n = self.add_nodes(graph_data.nodes)
        e = self.add_edges(graph_data.edges)
        return {"nodes_created": len(n), "edges_created": len(e)}

    def export_graph(
        self,
        filter_query: Optional[GraphQuery] = None  # Optional filter
    ) -> GraphContext:  # Exported subgraph or full graph
        """Export the entire graph or a filtered subset."""
        # For now, dump everything (future: use filter_query)
        all_nodes = []
        all_edges = []

        with sqlite3.connect(self._db_path) as con:
            cur = con.execute("SELECT id, label, properties, sources, created_at, updated_at FROM nodes")
            for row in cur:
                all_nodes.append(self._row_to_node(row))

            cur = con.execute("SELECT id, source_id, target_id, relation_type, properties, created_at, updated_at FROM edges")
            for row in cur:
                all_edges.append(self._row_to_edge(row))

        return GraphContext(nodes=all_nodes, edges=all_edges)

    def cleanup(self) -> None:
        """Clean up resources."""
        # SQLite connections are managed via context managers, nothing to do here
        pass

## Testing the Plugin

In [None]:
import tempfile
import uuid

# Create plugin with temp database
plugin = SQLiteGraphPlugin()

# Use temp file for testing
tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
plugin.initialize({"db_path": tmp_db.name})

print(f"Plugin: {plugin.name} v{plugin.version}")
print(f"Database: {plugin._db_path}")

Plugin: sqlite_graph v0.1.0
Database: /tmp/tmpzhwjkx15.db


In [None]:
# Test get_config_schema
schema = plugin.get_config_schema()
print(f"Config schema: {list(schema['properties'].keys())}")

Config schema: ['db_path', 'readonly']


In [None]:
# Create some nodes
alice_id = str(uuid.uuid4())
bob_id = str(uuid.uuid4())
ml_id = str(uuid.uuid4())

# Simulate consumed content and compute hash
transcript_content = b"Alice discussed machine learning with Bob in the podcast."
content_hash = SourceRef.compute_hash(transcript_content)

# Create SourceRef to link to external data (now requires content_hash)
transcript_ref = SourceRef(
    plugin_name="cjm-transcription-plugin-whisper",
    table_name="transcriptions",
    row_id="job-abc123",
    content_hash=content_hash,
    segment_slice="full_text"
)

nodes = [
    GraphNode(id=alice_id, label="Person", properties={"name": "Alice", "role": "speaker"}, sources=[transcript_ref]),
    GraphNode(id=bob_id, label="Person", properties={"name": "Bob"}),
    GraphNode(id=ml_id, label="Concept", properties={"name": "Machine Learning", "definition": "AI subfield"})
]

created_ids = plugin.add_nodes(nodes)
print(f"Created {len(created_ids)} nodes")

Created 3 nodes


In [None]:
# Create edges
edges = [
    GraphEdge(id=str(uuid.uuid4()), source_id=alice_id, target_id=ml_id, relation_type="MENTIONS", properties={"confidence": 0.95}),
    GraphEdge(id=str(uuid.uuid4()), source_id=bob_id, target_id=ml_id, relation_type="MENTIONS"),
    GraphEdge(id=str(uuid.uuid4()), source_id=alice_id, target_id=bob_id, relation_type="KNOWS")
]

created_ids = plugin.add_edges(edges)
print(f"Created {len(created_ids)} edges")

Created 3 edges


In [None]:
# Test get_node
alice = plugin.get_node(alice_id)
print(f"Retrieved: {alice.label} - {alice.properties}")
print(f"Sources: {[s.to_dict() for s in alice.sources]}")

Retrieved: Person - {'name': 'Alice', 'role': 'speaker'}
Sources: [{'plugin_name': 'cjm-transcription-plugin-whisper', 'table_name': 'transcriptions', 'row_id': 'job-abc123', 'content_hash': 'sha256:f85b2165bd6e790af2cf6a2223c07f74cbf0f588434395be4607a479c7e592a3', 'segment_slice': 'full_text'}]


In [None]:
# Test get_context (neighborhood traversal)
context = plugin.get_context(alice_id, depth=1)
print(f"Alice's neighborhood: {len(context.nodes)} nodes, {len(context.edges)} edges")
print(f"Neighbors: {[n.properties.get('name', n.label) for n in context.nodes]}")

Alice's neighborhood: 3 nodes, 2 edges
Neighbors: ['Machine Learning', 'Alice', 'Bob']


In [None]:
# Test find_nodes_by_source
found = plugin.find_nodes_by_source(transcript_ref)
print(f"Nodes linked to transcript job-abc123: {[n.properties.get('name') for n in found]}")

Nodes linked to transcript job-abc123: ['Alice']


In [None]:
# Test content hash round-trip through SQLite
alice = plugin.get_node(alice_id)
loaded_ref = alice.sources[0]

# Hash survived storage
print(f"Stored hash:   {loaded_ref.content_hash[:40]}...")
print(f"Original hash: {content_hash[:40]}...")
assert loaded_ref.content_hash == content_hash

# verify() works after round-trip
assert loaded_ref.verify(transcript_content), "verify() should return True for original content"
assert not loaded_ref.verify(b"tampered"), "verify() should return False for tampered content"
print("Content hash round-trip: PASSED")

Stored hash:   sha256:f85b2165bd6e790af2cf6a2223c07f74c...
Original hash: sha256:f85b2165bd6e790af2cf6a2223c07f74c...
Content hash round-trip: PASSED


In [None]:
# Test find_nodes_by_label
people = plugin.find_nodes_by_label("Person")
print(f"People: {[p.properties['name'] for p in people]}")

People: ['Alice', 'Bob']


In [None]:
# Test get_schema
schema = plugin.get_schema()
print(f"Schema: {schema}")

Schema: {'node_labels': ['Concept', 'Person'], 'edge_types': ['KNOWS', 'MENTIONS'], 'counts': {'Concept': 1, 'Person': 2}}


In [None]:
# Test update_node
plugin.update_node(alice_id, {"role": "host", "verified": True})
alice = plugin.get_node(alice_id)
print(f"Updated Alice: {alice.properties}")

Updated Alice: {'name': 'Alice', 'role': 'host', 'verified': True}


In [None]:
# Test export/import
exported = plugin.export_graph()
print(f"Exported: {len(exported.nodes)} nodes, {len(exported.edges)} edges")

# Test FileBackedDTO (zero-copy transfer)
temp_path = exported.to_temp_file()
print(f"Saved to temp file: {temp_path}")

# Load into new plugin
new_plugin = SQLiteGraphPlugin()
tmp_db2 = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
new_plugin.initialize({"db_path": tmp_db2.name})

# Load from file and import
loaded = GraphContext.from_file(temp_path)
stats = new_plugin.import_graph(loaded)
print(f"Import stats: {stats}")

import os
os.unlink(temp_path)

Exported: 3 nodes, 3 edges
Saved to temp file: /tmp/tmpoplcw3tf.json
Import stats: {'nodes_created': 3, 'edges_created': 3}


In [None]:
# Test delete with cascade
deleted = plugin.delete_nodes([alice_id], cascade=True)
print(f"Deleted {deleted} node(s)")
print(f"Remaining schema: {plugin.get_schema()}")

Deleted 1 node(s)
Remaining schema: {'node_labels': ['Concept', 'Person'], 'edge_types': ['MENTIONS'], 'counts': {'Concept': 1, 'Person': 1}}


In [None]:
# Cleanup
plugin.cleanup()
new_plugin.cleanup()

# Remove temp files
os.unlink(tmp_db.name)
os.unlink(tmp_db2.name)

print("Cleanup complete")

Cleanup complete


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()