# graph

> Graph service for committing documents and segments to the context graph

In [None]:
#| default_exp services.graph

In [None]:
#| export
from typing import List, Dict, Any, Optional
from uuid import uuid4
import asyncio

from cjm_plugin_system.core.manager import PluginManager
from cjm_graph_plugin_system.core import SourceRef
from cjm_graph_domains.domains.structure import Document, Segment
from cjm_graph_domains.domains.relations import StructureRelations

from cjm_transcript_segmentation.models import TextSegment
from cjm_transcript_vad_align.models import VADChunk

## GraphService

This service wraps the SQLite graph plugin to provide storage for the decomposed document structure. It converts `TextSegment` objects into graph nodes and edges, then commits them to the context graph.

In [None]:
#| export
class GraphService:
    """Service for committing structure to context graph."""
    
    def __init__(
        self,
        plugin_manager:PluginManager,  # Plugin manager for accessing graph plugin
        plugin_name:str="cjm-graph-plugin-sqlite",  # Name of the graph plugin
    ):
        """Initialize the graph service."""
        self._manager = plugin_manager
        self._plugin_name = plugin_name
    
    def is_available(self) -> bool:  # True if plugin is loaded and ready
        """Check if the graph plugin is available."""
        return self._manager.get_plugin(self._plugin_name) is not None
    
    def ensure_loaded(
        self,
        config:Optional[Dict[str, Any]]=None,  # Optional plugin configuration
    ) -> bool:  # True if successfully loaded
        """Ensure the graph plugin is loaded."""
        if self.is_available():
            return True
        
        # Try to find and load the plugin
        meta = self._manager.get_discovered_meta(self._plugin_name)
        if meta:
            return self._manager.load_plugin(meta, config)
        return False
    
    def _create_source_ref(
        self,
        segment:TextSegment,  # Text segment with source info
    ) -> Optional[SourceRef]:  # SourceRef or None if no source info
        """Create a SourceRef from segment source information."""
        if not segment.source_id or not segment.source_provider_id:
            return None
        
        # Build segment slice string
        slice_str = None
        if segment.start_char is not None and segment.end_char is not None:
            slice_str = f"char:{segment.start_char}-{segment.end_char}"
        
        return SourceRef(
            plugin_name=segment.source_provider_id,
            table_name="transcriptions",
            row_id=segment.source_id,
            content_hash=SourceRef.compute_hash(segment.text.encode()),
            segment_slice=slice_str
        )
    
    async def commit_document_async(
        self,
        title:str,  # Document title
        text_segments:List[TextSegment],  # Text segments from decomposition
        vad_chunks:List[VADChunk],  # VAD chunks for timing (1:1 with segments)
        media_type:str="audio",  # Source media type
    ) -> Dict[str, Any]:  # Result with document_id and segment_ids
        """Commit a document to the context graph.
        
        Assembles text segments with VAD timing at commit time.
        Requires 1:1 alignment: len(text_segments) == len(vad_chunks).
        """
        if not self.is_available():
            raise RuntimeError(f"Plugin {self._plugin_name} not loaded")
        
        if len(text_segments) != len(vad_chunks):
            raise ValueError(
                f"Segment and VAD chunk counts must match: "
                f"{len(text_segments)} segments vs {len(vad_chunks)} chunks"
            )
        
        # Create Document node
        doc = Document(
            id=str(uuid4()),
            title=title,
            media_type=media_type
        )
        
        # Convert to graph node
        doc_node = doc.to_graph_node(sources=[])
        
        # Create Segment nodes by zipping text with timing
        segment_nodes = []
        for text_seg, vad_chunk in zip(text_segments, vad_chunks):
            seg = Segment(
                id=str(uuid4()),
                text=text_seg.text,
                index=text_seg.index,
                start_time=vad_chunk.start_time,
                end_time=vad_chunk.end_time,
                start_char=text_seg.start_char,
                end_char=text_seg.end_char
            )
            
            # Create source reference
            sources = []
            source_ref = self._create_source_ref(text_seg)
            if source_ref:
                sources.append(source_ref)
            
            segment_nodes.append(seg.to_graph_node(sources=sources))
        
        # Create edges
        edges = []
        
        # STARTS_WITH: Document -> First Segment
        if segment_nodes:
            edges.append({
                'id': str(uuid4()),
                'source_id': doc_node.id,
                'target_id': segment_nodes[0].id,
                'relation_type': StructureRelations.STARTS_WITH,
                'properties': {}
            })
        
        # PART_OF: Each Segment -> Document
        # NEXT: Each Segment -> Next Segment
        for i, seg_node in enumerate(segment_nodes):
            # PART_OF
            edges.append({
                'id': str(uuid4()),
                'source_id': seg_node.id,
                'target_id': doc_node.id,
                'relation_type': StructureRelations.PART_OF,
                'properties': {}
            })
            
            # NEXT (if not last segment)
            if i < len(segment_nodes) - 1:
                edges.append({
                    'id': str(uuid4()),
                    'source_id': seg_node.id,
                    'target_id': segment_nodes[i + 1].id,
                    'relation_type': StructureRelations.NEXT,
                    'properties': {}
                })
        
        # Convert nodes to dicts for serialization
        all_nodes = [doc_node.to_dict()] + [n.to_dict() for n in segment_nodes]
        
        # Add nodes to graph
        await self._manager.execute_plugin_async(
            self._plugin_name,
            action="add_nodes",
            nodes=all_nodes
        )
        
        # Add edges to graph
        await self._manager.execute_plugin_async(
            self._plugin_name,
            action="add_edges",
            edges=edges
        )
        
        return {
            'document_id': doc_node.id,
            'segment_ids': [n.id for n in segment_nodes],
            'edge_count': len(edges)
        }
    
    def commit_document(
        self,
        title:str,  # Document title
        text_segments:List[TextSegment],  # Text segments from decomposition
        vad_chunks:List[VADChunk],  # VAD chunks for timing
        media_type:str="audio",  # Source media type
    ) -> Dict[str, Any]:  # Result with document_id and segment_ids
        """Commit a document to the context graph synchronously."""
        return asyncio.get_event_loop().run_until_complete(
            self.commit_document_async(title, text_segments, vad_chunks, media_type)
        )

## Tests

The following cells demonstrate the GraphService with the SQLite Graph plugin.

In [None]:
#| eval: false
# Test GraphService with SQLite Graph plugin
from pathlib import Path
from cjm_plugin_system.core.manager import PluginManager

# Calculate project root from notebook location (nbs/services/ -> project root)
project_root = Path.cwd().parent.parent
manifests_dir = project_root / ".cjm" / "manifests"

# Create plugin manager with explicit search path
manager = PluginManager(search_paths=[manifests_dir])
manager.discover_manifests()

print(f"Discovered {len(manager.discovered)} plugins from {manifests_dir}")

# Check if Graph plugin is available
graph_meta = manager.get_discovered_meta("cjm-graph-plugin-sqlite")
if graph_meta:
    print(f"Found plugin: {graph_meta.name} v{graph_meta.version}")
else:
    print("SQLite Graph plugin not found - install via plugins.yaml")

[PluginManager] Discovered manifest: cjm-graph-plugin-sqlite from /mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-transcript-review/.cjm/manifests/cjm-graph-plugin-sqlite.json


Discovered 1 plugins from /mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-transcript-review/.cjm/manifests
Found plugin: cjm-graph-plugin-sqlite v0.0.3


In [None]:
#| eval: false
# Initialize and test GraphService
if graph_meta:
    # Load the plugin
    # manager.load_plugin(graph_meta)
    manager.load_plugin(graph_meta, {
        "db_path": graph_meta.manifest.get("db_path")
    })
    
    graph_service = GraphService(manager)
    print(f"Plugin available: {graph_service.is_available()}")

[PluginManager] Launching worker for cjm-graph-plugin-sqlite...


[cjm-graph-plugin-sqlite] Starting worker on port 57931...
[cjm-graph-plugin-sqlite] Logs: /home/innom-dt/.cjm/logs/cjm-graph-plugin-sqlite.log


[PluginManager] HTTP Request: GET http://127.0.0.1:57931/health "HTTP/1.1 200 OK"
[PluginManager] HTTP Request: POST http://127.0.0.1:57931/initialize "HTTP/1.1 200 OK"
[PluginManager] Loaded plugin: cjm-graph-plugin-sqlite


[cjm-graph-plugin-sqlite] Worker ready.
Plugin available: True


In [None]:
#| eval: false
# Test committing a document to the graph with separate text segments and VAD chunks
from cjm_transcript_segmentation.models import TextSegment
from cjm_transcript_vad_align.models import VADChunk

if graph_meta and graph_service.is_available():
    # Create text segments (no time fields)
    text_segments = [
        TextSegment(
            index=0,
            text="Laying Plans",
            source_id="job_123",
            source_provider_id="test-plugin",
            start_char=0,
            end_char=12,
        ),
        TextSegment(
            index=1,
            text="Sun Tzu said: The art of war is of vital importance to the state.",
            source_id="job_123",
            source_provider_id="test-plugin",
            start_char=13,
            end_char=79,
        ),
        TextSegment(
            index=2,
            text="It is a matter of life and death, a road either to safety or to ruin.",
            source_id="job_123",
            source_provider_id="test-plugin",
            start_char=80,
            end_char=150,
        )
    ]
    
    # Create VAD chunks with timing (1:1 with segments)
    vad_chunks = [
        VADChunk(index=0, start_time=0.0, end_time=1.5),
        VADChunk(index=1, start_time=1.5, end_time=5.0),
        VADChunk(index=2, start_time=5.0, end_time=9.0),
    ]
    
    print(f"Committing document with {len(text_segments)} segments and {len(vad_chunks)} VAD chunks...")
    
    # Use await directly (Jupyter supports top-level await)
    result = await graph_service.commit_document_async(
        title="The Art of War - Chapter 1",
        text_segments=text_segments,
        vad_chunks=vad_chunks,
        media_type="audio",
    )
    
    print(f"\nCommit result:")
    print(f"  Document ID: {result['document_id']}")
    print(f"  Segment IDs: {result['segment_ids']}")
    print(f"  Edge count: {result['edge_count']}")

[PluginManager] HTTP Request: POST http://127.0.0.1:57931/execute "HTTP/1.1 200 OK"
[PluginManager] HTTP Request: POST http://127.0.0.1:57931/execute "HTTP/1.1 200 OK"


Committing document with 3 segments and 3 VAD chunks...

Commit result:
  Document ID: 6eeda707-4438-4094-8d44-6272ff5da8a3
  Segment IDs: ['70567737-0663-4f74-8e9b-18e507249e3c', '82e8b884-a3c2-4411-9c5e-d606277e6e83', '8d773f77-9c54-40d6-8a60-3c6e7c45509d']
  Edge count: 6


In [None]:
#| eval: false
# Verify the graph structure (use await directly - Jupyter supports top-level await)
if graph_meta and graph_service.is_available():
    # Get graph schema
    schema = await manager.execute_plugin_async(
        "cjm-graph-plugin-sqlite", 
        action="get_schema"
    )
    print(f"Graph schema:")
    print(f"  Node labels: {schema.get('node_labels', [])}")
    print(f"  Relation types: {schema.get('relation_types', [])}")

[PluginManager] HTTP Request: POST http://127.0.0.1:57931/execute "HTTP/1.1 200 OK"


Graph schema:
  Node labels: ['Document', 'Segment']
  Relation types: []


In [None]:
#| eval: false
# Cleanup
if graph_meta:
    manager.unload_all()
    print("Plugins unloaded")

[PluginManager] HTTP Request: POST http://127.0.0.1:57931/cleanup "HTTP/1.1 200 OK"
[PluginManager] Unloaded plugin: cjm-graph-plugin-sqlite


Plugins unloaded


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()