# dataset

> RDF Dataset-based session memory for RLM

In [None]:
#| default_exp dataset

## Overview

This module implements RDF Dataset-based memory for RLM sessions using named graphs:

- `onto/<name>` - Read-only ontology graphs
- `mem` - Mutable working memory for current session
- `prov` - Provenance/audit trail
- `work/<task_id>` - Scratch graphs for intermediate results

### Design Principles

- **Session-scoped**: `mem` is working memory for current RLM run
- **Handle-based access**: Model sees bounded views, never raw quads
- **Provenance tracking**: All `mem` changes recorded with timestamp/source/reason
- **Lazy indexing**: Caches invalidated on mutation

## Imports

In [None]:
#| export
from rdflib import Dataset, Graph, Namespace, RDF, RDFS, URIRef, Literal, XSD, BNode
from pathlib import Path
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from functools import partial
from datetime import datetime
import uuid

# Helper function for URI/Literal conversion
def _to_rdf_term(value):
    """Convert value to appropriate RDF term.
    
    Handles URIs (any scheme), literals, and existing RDF terms.
    """
    # Already an RDF term
    if isinstance(value, (URIRef, Literal, BNode)):
        return value
    
    # String conversion
    if isinstance(value, str):
        # Check if it looks like a URI (has scheme like http:, urn:, https:, etc.)
        # Exclude blank nodes (_:) which are handled separately
        if ':' in value and not value.startswith('_:'):
            # Could be URI - check if it's likely a URI vs a literal with colon
            # Simple heuristic: if it starts with a known scheme or has :// it's a URI
            if value.split(':', 1)[0].lower() in ['http', 'https', 'urn', 'ftp', 'mailto', 'file', 'data']:
                return URIRef(value)
            # Also handle URIs with ://
            if '://' in value:
                return URIRef(value)
        # Otherwise treat as literal
        return Literal(value)
    
    # Numbers and booleans become typed literals
    if isinstance(value, (int, float, bool)):
        return Literal(value)
    
    # Pass through anything else (shouldn't happen normally)
    return value

## DatasetMeta

Meta-graph navigation for RDF Dataset with lazy-cached indexes.

In [None]:
#| export
@dataclass
class DatasetMeta:
    """Meta-graph navigation for RDF Dataset.
    
    Provides lazy-cached indexes and bounded views over named graphs.
    Indexes are invalidated on any mutation to mem graph.
    """
    dataset: Dataset
    name: str = 'ds'
    session_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    
    # Lazy-cached indexes (invalidated on mutation)
    _graph_stats: dict = field(default=None, init=False, repr=False)
    _mem_predicates: Counter = field(default=None, init=False, repr=False)
    _version: int = field(default=0, init=False, repr=False)
    
    def __post_init__(self):
        """Initialize graph URIs."""
        self._mem_uri = URIRef(f'urn:rlm:{self.name}:mem')
        self._prov_uri = URIRef(f'urn:rlm:{self.name}:prov')
        
        # Create mem and prov graphs if they don't exist
        if (None, None, None, self._mem_uri) not in self.dataset:
            self.dataset.graph(self._mem_uri)
        if (None, None, None, self._prov_uri) not in self.dataset:
            self.dataset.graph(self._prov_uri)
    
    @property
    def mem(self) -> Graph:
        """Get working memory graph."""
        return self.dataset.graph(self._mem_uri)
    
    @property
    def prov(self) -> Graph:
        """Get provenance graph."""
        return self.dataset.graph(self._prov_uri)
    
    @property
    def graph_stats(self) -> dict:
        """Get statistics for all graphs (cached)."""
        if self._graph_stats is None:
            stats = {}
            for ctx in self.dataset.contexts():
                graph_uri = ctx.identifier
                stats[str(graph_uri)] = len(ctx)
            self._graph_stats = stats
        return self._graph_stats
    
    @property
    def work_graphs(self) -> list:
        """List all work/* scratch graphs."""
        work_prefix = f'urn:rlm:{self.name}:work/'
        return [str(ctx.identifier) for ctx in self.dataset.contexts() 
                if str(ctx.identifier).startswith(work_prefix)]
    
    def summary(self) -> str:
        """Generate summary of dataset."""
        lines = [
            f"Dataset '{self.name}' (session: {self.session_id})",
            f"mem: {len(self.mem)} triples",
            f"prov: {len(self.prov)} events",
            f"work graphs: {len(self.work_graphs)}",
            f"onto graphs: {len([g for g in self.graph_stats.keys() if ':onto/' in g])}"  # FIX: Use :onto/ not /onto/
        ]
        return '\n'.join(lines)
    
    def _invalidate_caches(self):
        """Invalidate all cached indexes."""
        self._graph_stats = None
        self._mem_predicates = None
        self._version += 1

## Setup Function

## Memory Operations

In [None]:
#| export
def mem_add(ds_meta: DatasetMeta, subject, predicate, obj, 
            source: str = 'agent', reason: str = None) -> str:
    """Add fact to mem with provenance tracking.
    
    Args:
        ds_meta: DatasetMeta containing the dataset
        subject: Subject URI or literal
        predicate: Predicate URI
        obj: Object URI or literal
        source: Source of this fact (default: 'agent')
        reason: Optional reason for adding
        
    Returns:
        Summary string
    """
    # Convert to RDF terms (handles all URI schemes: http, https, urn, etc.)
    s = _to_rdf_term(subject)
    p = URIRef(predicate) if isinstance(predicate, str) else predicate
    o = _to_rdf_term(obj)
    
    # Add to mem
    ds_meta.mem.add((s, p, o))
    
    # Record provenance
    event_uri = URIRef(f'urn:rlm:prov:event_{uuid.uuid4().hex[:8]}')
    RLM_PROV = Namespace('urn:rlm:prov:')
    
    ds_meta.prov.add((event_uri, RDF.type, RLM_PROV.AddEvent))
    ds_meta.prov.add((event_uri, RLM_PROV.subject, s))
    ds_meta.prov.add((event_uri, RLM_PROV.predicate, p))
    ds_meta.prov.add((event_uri, RLM_PROV.object, o))
    ds_meta.prov.add((event_uri, RLM_PROV.timestamp, Literal(datetime.utcnow().isoformat() + 'Z', datatype=XSD.dateTime)))
    ds_meta.prov.add((event_uri, RLM_PROV.source, Literal(source)))
    ds_meta.prov.add((event_uri, RLM_PROV.session, Literal(ds_meta.session_id)))
    
    if reason:
        ds_meta.prov.add((event_uri, RLM_PROV.reason, Literal(reason)))
    
    # Invalidate caches
    ds_meta._invalidate_caches()
    
    return f"Added triple to mem: ({s}, {p}, {o})"

In [None]:
#| export
def mem_query(ds_meta: DatasetMeta, sparql: str, limit: int = 100) -> list:
    """Query mem graph, return bounded results.
    
    Args:
        ds_meta: DatasetMeta containing the dataset
        sparql: SPARQL query string
        limit: Maximum results to return
        
    Returns:
        List of result rows (as dicts)
    """
    # Inject LIMIT if not present
    if 'LIMIT' not in sparql.upper():
        sparql = sparql.rstrip() + f' LIMIT {limit}'
    
    results = ds_meta.mem.query(sparql)
    
    # Convert to list of dicts
    rows = []
    for row in results:
        row_dict = {}
        for var in results.vars:
            row_dict[str(var)] = str(row[var]) if row[var] else None
        rows.append(row_dict)
    
    return rows[:limit]

In [None]:
#| export
def mem_retract(ds_meta: DatasetMeta, subject=None, predicate=None, obj=None,
                source: str = 'agent', reason: str = None) -> str:
    """Remove triples with provenance.
    
    Args:
        ds_meta: DatasetMeta containing the dataset
        subject: Subject URI or None (wildcard)
        predicate: Predicate URI or None (wildcard)
        obj: Object URI/literal or None (wildcard)
        source: Source of this retraction
        reason: Optional reason for removing
        
    Returns:
        Summary string
    """
    # Convert to RDF terms or None (handles all URI schemes)
    s = _to_rdf_term(subject) if subject is not None else None
    p = URIRef(predicate) if predicate and isinstance(predicate, str) else predicate
    o = _to_rdf_term(obj) if obj is not None else None
    
    # Find matching triples
    to_remove = list(ds_meta.mem.triples((s, p, o)))
    
    # Remove each triple and record provenance
    RLM_PROV = Namespace('urn:rlm:prov:')
    for triple in to_remove:
        ds_meta.mem.remove(triple)
        
        # Record provenance
        event_uri = URIRef(f'urn:rlm:prov:event_{uuid.uuid4().hex[:8]}')
        ds_meta.prov.add((event_uri, RDF.type, RLM_PROV.RetractEvent))
        ds_meta.prov.add((event_uri, RLM_PROV.subject, triple[0]))
        ds_meta.prov.add((event_uri, RLM_PROV.predicate, triple[1]))
        ds_meta.prov.add((event_uri, RLM_PROV.object, triple[2]))
        ds_meta.prov.add((event_uri, RLM_PROV.timestamp, Literal(datetime.utcnow().isoformat() + 'Z', datatype=XSD.dateTime)))
        ds_meta.prov.add((event_uri, RLM_PROV.source, Literal(source)))
        ds_meta.prov.add((event_uri, RLM_PROV.session, Literal(ds_meta.session_id)))
        
        if reason:
            ds_meta.prov.add((event_uri, RLM_PROV.reason, Literal(reason)))
    
    # Invalidate caches
    ds_meta._invalidate_caches()
    
    return f"Removed {len(to_remove)} triples from mem"

In [None]:
#| export
def mem_describe(ds_meta: DatasetMeta, uri: str, limit: int = 20) -> dict:
    """Get bounded entity description from mem.
    
    Args:
        ds_meta: DatasetMeta containing the dataset
        uri: URI of entity to describe
        limit: Maximum triples to include
        
    Returns:
        Dict with 'as_subject' and 'as_object' triple lists
    """
    entity = URIRef(uri)
    
    # Get triples where entity is subject
    as_subject = [(str(s), str(p), str(o)) for s, p, o in list(ds_meta.mem.triples((entity, None, None)))[:limit]]
    
    # Get triples where entity is object
    as_object = [(str(s), str(p), str(o)) for s, p, o in list(ds_meta.mem.triples((None, None, entity)))[:limit]]
    
    return {
        'uri': uri,
        'as_subject': as_subject,
        'as_object': as_object
    }

## Scratch Graph Operations

In [None]:
#| export
def work_create(ds_meta: DatasetMeta, task_id: str = None) -> tuple:
    """Create a scratch graph for intermediate results.
    
    Args:
        ds_meta: DatasetMeta containing the dataset
        task_id: Task identifier (default: auto-generated)
        
    Returns:
        (graph_uri, graph) tuple
    """
    if task_id is None:
        task_id = f"task_{uuid.uuid4().hex[:8]}"
    
    graph_uri = URIRef(f'urn:rlm:{ds_meta.name}:work/{task_id}')
    graph = ds_meta.dataset.graph(graph_uri)
    
    return (str(graph_uri), graph)

In [None]:
#| export
def work_cleanup(ds_meta: DatasetMeta, task_id: str = None, all: bool = False) -> str:
    """Remove scratch graph(s).
    
    Args:
        ds_meta: DatasetMeta containing the dataset
        task_id: Specific task to clean up, or None
        all: If True, remove all work/* graphs
        
    Returns:
        Summary string
    """
    removed = 0
    
    if all:
        for graph_uri in ds_meta.work_graphs:
            ds_meta.dataset.remove_graph(URIRef(graph_uri))
            removed += 1
    elif task_id:
        graph_uri = URIRef(f'urn:rlm:{ds_meta.name}:work/{task_id}')
        ds_meta.dataset.remove_graph(graph_uri)
        removed = 1
    
    ds_meta._invalidate_caches()
    
    return f"Removed {removed} work graph(s)"

In [None]:
#| export
def work_to_mem(ds_meta: DatasetMeta, task_id: str, 
                source: str = 'work', reason: str = None) -> str:
    """Promote triples from scratch graph to mem with provenance.
    
    Args:
        ds_meta: DatasetMeta containing the dataset
        task_id: Task identifier for work graph
        source: Source label for provenance
        reason: Optional reason for promotion
        
    Returns:
        Summary string
    """
    graph_uri = URIRef(f'urn:rlm:{ds_meta.name}:work/{task_id}')
    work_graph = ds_meta.dataset.graph(graph_uri)
    
    # Get all triples from work graph
    triples = list(work_graph.triples((None, None, None)))
    
    # Add each to mem
    for s, p, o in triples:
        ds_meta.mem.add((s, p, o))
    
    # Record single provenance event for the promotion
    event_uri = URIRef(f'urn:rlm:prov:event_{uuid.uuid4().hex[:8]}')
    RLM_PROV = Namespace('urn:rlm:prov:')
    
    ds_meta.prov.add((event_uri, RDF.type, RLM_PROV.PromoteEvent))
    ds_meta.prov.add((event_uri, RLM_PROV.fromGraph, graph_uri))
    ds_meta.prov.add((event_uri, RLM_PROV.tripleCount, Literal(len(triples))))
    ds_meta.prov.add((event_uri, RLM_PROV.timestamp, Literal(datetime.utcnow().isoformat() + 'Z', datatype=XSD.dateTime)))
    ds_meta.prov.add((event_uri, RLM_PROV.source, Literal(source)))
    ds_meta.prov.add((event_uri, RLM_PROV.session, Literal(ds_meta.session_id)))
    
    if reason:
        ds_meta.prov.add((event_uri, RLM_PROV.reason, Literal(reason)))
    
    # Invalidate caches
    ds_meta._invalidate_caches()
    
    return f"Promoted {len(triples)} triples from work/{task_id} to mem"

## Snapshot Functions

In [None]:
#| export
def snapshot_dataset(ds_meta: DatasetMeta, path: str = None, 
                     format: str = 'trig') -> str:
    """Serialize dataset to TriG/N-Quads for debugging.
    
    Args:
        ds_meta: DatasetMeta to snapshot
        path: Output path (default: auto-generated with timestamp)
        format: 'trig' or 'nquads'
        
    Returns:
        Path to snapshot file
    """
    if path is None:
        timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
        ext = 'trig' if format == 'trig' else 'nq'
        path = f"snapshot_{ds_meta.name}_{timestamp}.{ext}"
    
    ds_meta.dataset.serialize(destination=path, format=format)
    
    return f"Snapshot saved to {path}"

In [None]:
#| export
def load_snapshot(path: str, ns: dict, name: str = 'ds') -> str:
    """Load dataset from TriG/N-Quads snapshot.
    
    Useful for debugging/replay. Note: The snapshot preserves the original
    dataset name in graph URIs, so if you want to use the original name,
    extract it from the graph URIs.
    
    Args:
        path: Path to snapshot file
        ns: Namespace dict where Dataset will be stored
        name: Variable name for the Dataset handle
        
    Returns:
        Summary string
    """
    # Detect format from extension
    ext = Path(path).suffix.lower()
    
    # FIX: Properly distinguish Turtle from TriG
    if ext == '.trig':
        format = 'trig'
    elif ext == '.ttl':
        format = 'turtle'  # Turtle is single-graph, not TriG!
    elif ext in ['.nq', '.nquads']:
        format = 'nquads'
    else:
        # Default to trig for unknown extensions (datasets are multi-graph)
        format = 'trig'
    
    # Load dataset
    ds = Dataset()
    ds.parse(path, format=format)
    
    # Try to detect original name from graph URIs
    original_name = None
    for ctx in ds.contexts():
        uri = str(ctx.identifier)
        if ':mem' in uri:
            # Extract name from urn:rlm:{name}:mem
            parts = uri.split(':')
            if len(parts) >= 3:
                original_name = parts[2]
                break
    
    # Use detected name or provided name
    detected_name = original_name if original_name else name
    
    # Create meta with detected name so URIs match
    ds_meta = DatasetMeta(ds, name=detected_name)
    
    # Store in namespace with provided name
    ns[name] = ds
    ns[f"{name}_meta"] = ds_meta
    
    # Count graphs
    graph_count = len(list(ds.contexts()))
    
    msg = f"Loaded snapshot from {path}: {graph_count} graphs"
    if original_name and original_name != name:
        msg += f" (original name '{original_name}' preserved in graph URIs)"
    
    return msg

## Bounded View Functions

In [None]:
#| export
def res_head(result, n: int = 10) -> list:
    """Get first N rows of a result set.

    Args:
        result: ResultTable, list of dicts, or list of tuples
        n: Number of rows to return

    Returns:
        List of rows (same format as input)
    """
    if isinstance(result, ResultTable):
        return result.rows[:n]
    return result[:n]


def res_where(result, column: str, pattern: str = None, value: str = None) -> list:
    """Filter result rows by column value or regex pattern.

    Args:
        result: ResultTable or list of dicts
        column: Column name to filter on
        pattern: Optional regex pattern to match
        value: Optional exact value to match

    Returns:
        List of matching rows
    """
    import re

    rows = result.rows if isinstance(result, ResultTable) else result
    filtered = []

    for row in rows:
        if column not in row:
            continue

        cell_value = str(row[column]) if row[column] is not None else ''

        # Exact value match
        if value is not None:
            if cell_value == str(value):
                filtered.append(row)

        # Regex pattern match
        elif pattern is not None:
            if re.search(pattern, cell_value, re.IGNORECASE):
                filtered.append(row)

    return filtered


def res_group(result, column: str, limit: int = 20) -> list:
    """Get counts grouped by column value.

    Args:
        result: ResultTable or list of dicts
        column: Column to group by
        limit: Maximum groups to return

    Returns:
        List of (value, count) tuples, sorted by count descending
    """
    from collections import Counter

    rows = result.rows if isinstance(result, ResultTable) else result
    values = [str(row[column]) for row in rows if column in row and row[column] is not None]

    counts = Counter(values)
    return counts.most_common(limit)


def res_distinct(result, column: str, limit: int = 50) -> list:
    """Get distinct values in a column.

    Args:
        result: ResultTable or list of dicts
        column: Column to get distinct values from
        limit: Maximum distinct values to return

    Returns:
        List of distinct values
    """
    rows = result.rows if isinstance(result, ResultTable) else result
    distinct_values = set()

    for row in rows:
        if column in row and row[column] is not None:
            distinct_values.add(str(row[column]))

        if len(distinct_values) >= limit:
            break

    return sorted(list(distinct_values))[:limit]

In [None]:
#| export
@dataclass
class ResultTable:
    """Wrapper for SPARQL query results with bounded view operations."""
    rows: list          # list of dicts
    columns: list       # column names
    query: str          # Original SPARQL query
    total_rows: int     # Total before limit

    def __len__(self):
        """Return number of rows."""
        return len(self.rows)

    def __repr__(self):
        """String representation."""
        return f"ResultTable({len(self.rows)} rows, columns={self.columns})"

## Result Table Views

Bounded view operations over SPARQL query results.

In [None]:
#| export
def dataset_stats(ds_meta: DatasetMeta) -> str:
    """Get dataset statistics summary."""
    return ds_meta.summary()

In [None]:
#| export
def list_graphs(ds_meta: DatasetMeta, pattern: str = None) -> list:
    """List named graphs, optionally filtered.
    
    Args:
        ds_meta: DatasetMeta containing the dataset
        pattern: Optional substring to filter graph URIs
        
    Returns:
        List of (graph_uri, triple_count) tuples
    """
    graphs = []
    for ctx in ds_meta.dataset.contexts():
        uri = str(ctx.identifier)
        if pattern is None or pattern in uri:
            graphs.append((uri, len(ctx)))
    
    return sorted(graphs, key=lambda x: x[0])

In [None]:
#| export
def graph_sample(ds_meta: DatasetMeta, graph_uri: str, limit: int = 10) -> list:
    """Get sample triples from a graph.
    
    Args:
        ds_meta: DatasetMeta containing the dataset
        graph_uri: URI of graph to sample
        limit: Maximum triples to return
        
    Returns:
        List of (s, p, o) tuples as strings
    """
    graph = ds_meta.dataset.graph(URIRef(graph_uri))
    triples = [(str(s), str(p), str(o)) for s, p, o in list(graph.triples((None, None, None)))[:limit]]
    return triples

## Ontology Integration

In [None]:
#| export
def mount_ontology(ds_meta: DatasetMeta, ns: dict, path: str, ont_name: str) -> str:
    """Mount ontology into dataset as read-only onto/<name> graph.
    
    Args:
        ds_meta: DatasetMeta containing the dataset
        ns: Namespace dict (for compatibility with setup_ontology_context)
        path: Path to ontology file
        ont_name: Name for the ontology
        
    Returns:
        Summary string
    """
    graph_uri = URIRef(f'urn:rlm:{ds_meta.name}:onto/{ont_name}')
    graph = ds_meta.dataset.graph(graph_uri)
    
    # Parse ontology into the graph
    graph.parse(path)
    
    # Invalidate caches
    ds_meta._invalidate_caches()
    
    return f"Mounted {len(graph)} triples from {Path(path).name} into onto/{ont_name}"

In [None]:
#| export
def setup_dataset_context(ns: dict, name: str = 'ds') -> str:
    """Initialize Dataset with mem/prov graphs, bind helper functions.
    
    Args:
        ns: Namespace dict where Dataset will be stored
        name: Variable name for the Dataset handle
        
    Returns:
        Summary string describing what was created
    """
    # Create dataset and meta
    ds = Dataset()
    ds_meta = DatasetMeta(ds, name=name)
    
    # Store in namespace
    ns[name] = ds
    ns[f"{name}_meta"] = ds_meta
    
    # Bind helper functions
    ns['mem_add'] = partial(mem_add, ds_meta)
    ns['mem_query'] = partial(mem_query, ds_meta)
    ns['mem_retract'] = partial(mem_retract, ds_meta)
    ns['mem_describe'] = partial(mem_describe, ds_meta)
    ns['dataset_stats'] = partial(dataset_stats, ds_meta)
    ns['work_create'] = partial(work_create, ds_meta)
    ns['work_cleanup'] = partial(work_cleanup, ds_meta)
    ns['work_to_mem'] = partial(work_to_mem, ds_meta)
    ns['snapshot_dataset'] = partial(snapshot_dataset, ds_meta)
    ns['mount_ontology'] = partial(mount_ontology, ds_meta, ns)
    ns['list_graphs'] = partial(list_graphs, ds_meta)
    ns['graph_sample'] = partial(graph_sample, ds_meta)
    
    # NEW: Bind Stage 2 result table view functions
    ns['res_head'] = res_head
    ns['res_where'] = res_where
    ns['res_group'] = res_group
    ns['res_distinct'] = res_distinct
    
    return f"Created dataset '{name}' with session_id={ds_meta.session_id}"

In [None]:
# Test result table views
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

# Add test data
mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/age', '30')
mem_add(ds_meta, 'http://ex.org/bob', 'http://ex.org/age', '25')
mem_add(ds_meta, 'http://ex.org/charlie', 'http://ex.org/age', '30')
mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/city', 'Boston')
mem_add(ds_meta, 'http://ex.org/bob', 'http://ex.org/city', 'NYC')

# Query and get results as list
results = mem_query(ds_meta, 'SELECT ?s ?age WHERE { ?s <http://ex.org/age> ?age }')

# Test res_head
head = res_head(results, n=2)
assert len(head) == 2
print(f"✓ res_head works: {len(head)} rows")

# Test res_where with exact value
filtered = res_where(results, 'age', value='30')
assert len(filtered) == 2
print(f"✓ res_where (exact) works: {len(filtered)} rows with age=30")

# Test res_where with pattern
filtered_pattern = res_where(results, 's', pattern='alice')
assert len(filtered_pattern) == 1
print(f"✓ res_where (pattern) works: {len(filtered_pattern)} rows matching 'alice'")

# Test res_group
groups = res_group(results, 'age')
assert len(groups) == 2  # Two distinct ages
assert groups[0][1] == 2  # Age '30' appears twice
print(f"✓ res_group works: {groups}")

# Test res_distinct
distinct_ages = res_distinct(results, 'age')
assert len(distinct_ages) == 2
assert '25' in distinct_ages and '30' in distinct_ages
print(f"✓ res_distinct works: {distinct_ages}")

# Test ResultTable wrapper
result_table = ResultTable(
    rows=results,
    columns=['s', 'age'],
    query='SELECT ?s ?age WHERE { ?s <http://ex.org/age> ?age }',
    total_rows=len(results)
)
assert len(result_table) == 3
print(f"✓ ResultTable works: {result_table}")

# Test result table views work with ResultTable
head_from_table = res_head(result_table, n=2)
assert len(head_from_table) == 2
print(f"✓ res_head works with ResultTable")

In [None]:
# Test dataset creation
test_ns = {}
result = setup_dataset_context(test_ns, name='test_ds')
assert 'test_ds' in test_ns
assert 'test_ds_meta' in test_ns
assert len(test_ns['test_ds_meta'].session_id) == 8
print("✓ Dataset creation works")
print(result)

In [None]:
# Test mem_add with provenance
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

result = mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/knows', 'http://ex.org/bob', 
                 source='test', reason='Testing')
assert len(ds_meta.mem) == 1
assert len(ds_meta.prov) > 0
print("✓ mem_add works")
print(result)

In [None]:
# Test mem_query
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/age', '30')
mem_add(ds_meta, 'http://ex.org/bob', 'http://ex.org/age', '25')

results = mem_query(ds_meta, 'SELECT ?s ?age WHERE { ?s <http://ex.org/age> ?age }')
assert len(results) == 2
assert all('s' in r and 'age' in r for r in results)
print("✓ mem_query works")
print(results)

In [None]:
# Test mem_retract
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/age', '30')
assert len(ds_meta.mem) == 1

result = mem_retract(ds_meta, predicate='http://ex.org/age', source='test', reason='Correction')
assert len(ds_meta.mem) == 0
assert 'Removed 1 triples' in result
print("✓ mem_retract works")
print(result)

In [None]:
# Test mem_describe
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/knows', 'http://ex.org/bob')
mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/age', '30')

desc = mem_describe(ds_meta, 'http://ex.org/alice')
assert 'as_subject' in desc
assert 'as_object' in desc
assert len(desc['as_subject']) == 2
print("✓ mem_describe works")
print(desc)

In [None]:
# Test index invalidation
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

# Access cached property
initial_version = ds_meta._version
_ = ds_meta.graph_stats

# Mutate
mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/age', '30')

# Check version incremented
assert ds_meta._version > initial_version
print("✓ Index invalidation works")

In [None]:
# Test work graph lifecycle
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

# Create work graph
uri, graph = work_create(ds_meta, task_id='test_task')
assert 'work/test_task' in uri
assert len(ds_meta.work_graphs) == 1

# Add some triples to work graph
graph.add((URIRef('http://ex.org/alice'), URIRef('http://ex.org/temp'), Literal('value')))
assert len(graph) == 1

# Promote to mem
result = work_to_mem(ds_meta, 'test_task', reason='Test promotion')
assert len(ds_meta.mem) == 1
assert 'Promoted 1 triples' in result

# Cleanup
result = work_cleanup(ds_meta, task_id='test_task')
assert 'Removed 1 work' in result
assert len(ds_meta.work_graphs) == 0

print("✓ Work graph lifecycle works")

In [None]:
# Test snapshot
import tempfile
import os

test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

# Add some data
mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/age', '30')

# Take snapshot
with tempfile.NamedTemporaryFile(mode='w', suffix='.trig', delete=False) as f:
    snapshot_path = f.name

result = snapshot_dataset(ds_meta, path=snapshot_path)
assert os.path.exists(snapshot_path)
assert 'Snapshot saved' in result

# Load snapshot (let it auto-detect the name 'ds' from graph URIs)
test_ns2 = {}
result = load_snapshot(snapshot_path, test_ns2, name='restored')
assert 'restored' in test_ns2
assert 'restored_meta' in test_ns2
# Should auto-detect original name 'ds' and use it for URIs
assert len(test_ns2['restored_meta'].mem) == 1

# Also test loading with same name
test_ns3 = {}
result = load_snapshot(snapshot_path, test_ns3, name='ds')
assert 'ds' in test_ns3
assert 'ds_meta' in test_ns3
assert len(test_ns3['ds_meta'].mem) == 1

# Cleanup
os.unlink(snapshot_path)

print("✓ Snapshot roundtrip works")

In [None]:
# Test bounded view functions
test_ns = {}
setup_dataset_context(test_ns)
ds_meta = test_ns['ds_meta']

# Add some data
mem_add(ds_meta, 'http://ex.org/alice', 'http://ex.org/age', '30')
work_create(ds_meta, 'task1')
work_create(ds_meta, 'task2')

# Test dataset_stats
stats = dataset_stats(ds_meta)
assert 'mem: 1 triples' in stats
assert 'work graphs: 2' in stats

# Test list_graphs
graphs = list_graphs(ds_meta)
assert len(graphs) >= 4  # mem, prov, work/task1, work/task2

# Test list_graphs with pattern
work_graphs = list_graphs(ds_meta, pattern='work/')
assert len(work_graphs) == 2

# Test graph_sample
mem_uri = f'urn:rlm:{ds_meta.name}:mem'
sample = graph_sample(ds_meta, mem_uri)
assert len(sample) == 1

print("✓ Bounded view functions work")

## Tests

## Usage Examples

In [None]:
#| eval: false
# Basic usage in RLM context
ns = {}
setup_dataset_context(ns)

# RLM can now use: mem_add, mem_query, mem_describe, etc.
ns['mem_add']('http://ex.org/alice', 'http://ex.org/knows', 'http://ex.org/bob')
results = ns['mem_query']('SELECT ?s ?p ?o WHERE { ?s ?p ?o }')
print(results)

In [None]:
#| eval: false
# Integration with ontology
from rlm.ontology import setup_ontology_context

ns = {}
setup_dataset_context(ns)
setup_ontology_context('ontology/prov.ttl', ns, name='prov')

# Mount ontology into dataset
ns['mount_ontology']('ontology/prov.ttl', 'prov')

# Now ontology is in dataset as onto/prov graph
graphs = ns['list_graphs']()
print(graphs)