# Structured Agents and Pipelines
> Creating DSPy StructuredAgents for Semantic Web tasks

In [None]:
#| default_exp pipelines

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
import os
os.environ["COG_LOGLEVEL"] = "DEBUG"
# or, programmatically:
from cogitarelink.core.debug import set_loglevel
set_loglevel("DEBUG")

In [None]:
#| export
from typing import List, Dict, Any, Optional
import dspy, hashlib, datetime
from cogitarelink.core.graph import GraphManager
from cogitarelink_dspy.wrappers import get_tools, get_tool_by_name, group_tools_by_layer
from cogitarelink_dspy.components import list_layers
from cogitarelink_dspy.memory import ReflectionStore, REFLECTION_GRAPH
from cogitarelink_dspy.telemetry import TelemetryStore


## Setup mlflow for logging and introspection

In [None]:
import mlflow

mlflow.set_tracking_uri("http://localhost:5000")
# mlflow.set_experiment("DSPy")
mlflow.dspy.autolog()



## Introduction

This notebook implements structured agent pipelines for the Cogitarelink-DSPy integration. We're creating agents that can reason about semantic web data across different layers of abstraction:

1. **Context Layer** - Working with JSON-LD contexts and namespaces
2. **Ontology Layer** - Exploring ontologies and vocabularies
3. **Rules Layer** - Validating data against rules (SHACL, etc.)
4. **Instances Layer** - Working with actual data/triples
5. **Verification Layer** - Verifying and signing data

In addition, we have a **Utility Layer** for cross-cutting concerns like memory and telemetry.

Our approach uses DSPy's `StructuredAgent` which provides a framework for tool selection and execution based on the user's query. We'll implement two levels of agents:

- `HelloLOD`: A lightweight agent with essential tools for common tasks
- `FullPlanner`: A comprehensive agent with all available tools

We'll also integrate memory capabilities to enable the agent to learn from previous experiences.

In [None]:
#| export
graph     = GraphManager(use_rdflib=True)
mem       = ReflectionStore(graph)
telemetry = TelemetryStore(graph)

TOOLS = get_tools()
TOOLS += [mem.add, mem.retrieve, mem.as_prompt]

## System Prompts

The heart of our agent's reasoning is the system prompt, which explains the semantic web layers and how to select the appropriate tool based on the user's query. Let's define the system prompts for our agents.

In [None]:
# System prompt for ReAct agent
SEM_WEB_SYSTEM = (
    "You are a Linked-Data teaching assistant. "
    "Think step-by-step; choose the highest Cogitarelink layer that solves the task. "
    "Return only the final answer — never reveal your thought."
)

# Define the ReAct signature
sig = dspy.Signature(
    "query:str -> answer:str",
    instructions=SEM_WEB_SYSTEM
)


In [None]:
# Configure the LLM and instantiate the ReAct agent
lm = dspy.LM(
    "openai/o3-mini",
    temperature=1.0,
    max_tokens=20000
)
dspy.configure(lm=lm)

In [None]:
lm

<dspy.clients.lm.LM>

In [None]:
agent = dspy.ReAct(
    signature=sig,
    tools=TOOLS,
    max_iters=4,
)


## HelloLOD: Lightweight Semantic Web Agent

Our `HelloLOD` agent is a minimal implementation that provides basic semantic web functionality. It includes only the essential tools for common tasks, making it faster and more focused than the full agent.

The key design decisions for HelloLOD are:

1. Include one representative tool from each semantic layer
2. Exclude memory tools initially for simplicity
3. Use a straightforward system prompt without complex reflection

This agent serves as both a proof of concept and a starting point for more complex implementations.

In [None]:
#| export
class HelloLOD(dspy.Module):
    """Lightweight wrapper that logs scratch-pad hashes & provenance."""
    def __init__(self, agent, telemetry, mem):
        super().__init__()
        self.agent = agent
        self.telemetry = telemetry
        self.mem = mem

    def forward(self, query: str):
        t0 = datetime.datetime.utcnow()
        result = self.agent(query=query)
        t1 = datetime.datetime.utcnow()

        # Hash the hidden chain-of-thought (fallback to empty if unavailable)
        try:
            lm = self.agent.get_lm()
        except Exception:
            lm = None
        scratch = getattr(lm, "last_scratch", "") if lm is not None else ""
        digest = hashlib.sha256(scratch.encode()).hexdigest()
        self.telemetry.log("cot", digest, tool_iri="urn:agent:HelloLOD")

        # Log latency (milliseconds)
        latency_ms = (t1 - t0).total_seconds() * 1000
        self.telemetry.log("latency", latency_ms, tool_iri="urn:agent:HelloLOD")

        # Optional manual reflection
        if query.lower().startswith("remember:"):
            note = query.split("remember:", 1)[1].strip()
            self.mem.add(note, tags=["manual"])
            return {"answer": f"Stored: {note}"}

        return result

In [None]:
 #| export
hello = HelloLOD(agent, telemetry, mem)

In [None]:
# A couple of sample queries to exercise each layer
test_queries = [
    "What is the full IRI of dc:title?",
    "How many cats on Wikidata?"
]

In [None]:

for q in test_queries:
    print(f"Query:    {q}")
    resp = hello(q)
    # DSPy Prediction objects have .answer and .trajectory attributes
    answer     = getattr(resp, "answer", resp.get("answer", None))
    trajectory = getattr(resp, "trajectory", resp.get("trace", None))
    print(f"Answer:   {answer}")
    print(f"Trajectory: {trajectory}")
    print("-" * 60)



Query:    What is the full IRI of dc:title?
Answer:   http://purl.org/dc/elements/1.1/title
Trajectory: {'thought_0': 'The full IRI for dc:title is "http://purl.org/dc/elements/1.1/title".', 'tool_name_0': 'finish', 'tool_args_0': {}, 'observation_0': 'Completed.'}
------------------------------------------------------------
Query:    How many cats on Wikidata?
Answer:   There isn’t a fixed number—the current count is dynamic. To get the exact number, run the query: SELECT (COUNT(?cat) AS ?count) WHERE { ?cat wdt:P31 wd:Q146 }.
Trajectory: {'thought_0': 'The question asks for the number of cat items on Wikidata. Since Wikidata is continuously updated, a direct number isn’t fixed; instead one would typically determine the count by running a SPARQL query such as:\n  SELECT (COUNT(?cat) AS ?count) WHERE { ?cat wdt:P31 wd:Q146 }\nThis query counts all items that have the instance-of property (P31) pointing to the "cat" entity (Q146). The returned number will reflect the current state of Wi

In [None]:
 # HelloLOD
resp = hello("What is the full IRI of dc:title?")
print("TRACE →", resp.trajectory)      # which tool was picked
# check stderr for any cogitarelink.* DEBUG messages



TRACE → {'thought_0': 'The full IRI for dc:title is "http://purl.org/dc/elements/1.1/title".', 'tool_name_0': 'finish', 'tool_args_0': {}, 'observation_0': 'Completed.'}


In [None]:
resp = hello("How many cats on Wikidata?")
print("TRACE →", resp.trajectory)
print("OBSERVATION →", resp.answer)



TRACE → {'thought_0': 'The question asks for the number of cat items on Wikidata. Since Wikidata is continuously updated, a direct number isn’t fixed; instead one would typically determine the count by running a SPARQL query such as:\n  SELECT (COUNT(?cat) AS ?count) WHERE { ?cat wdt:P31 wd:Q146 }\nThis query counts all items that have the instance-of property (P31) pointing to the "cat" entity (Q146). The returned number will reflect the current state of Wikidata and can vary over time.', 'tool_name_0': 'finish', 'tool_args_0': {}, 'observation_0': 'Completed.'}
OBSERVATION → There isn’t a fixed number—the current count is dynamic. To get the exact number, run the query: SELECT (COUNT(?cat) AS ?count) WHERE { ?cat wdt:P31 wd:Q146 }.


In [None]:
from cogitarelink_dspy.wrappers import get_tools
tools = get_tools()
print(f"Total wrappers generated: {len(tools)}")
print([t.__name__ for t in tools])

Total wrappers generated: 22
['EchoMessage', 'LoadContext', 'FetchOntology', 'ValidateEntity', 'GraphManager', 'VerifySignature', 'AddReflection', 'RecallReflection', 'ReflectionPrompt', 'LogTelemetry', 'add', 'retrieve', 'as_prompt', 'add', 'retrieve', 'as_prompt', 'add', 'retrieve', 'as_prompt', 'add', 'retrieve', 'as_prompt']


## Memory-Focused Training

Now we'll implement a training pipeline for our agent using a memory-focused development set. This approach follows DSPy's training methodology to optimize the agent's ability to use the memory tools appropriately.

In [None]:
# 1. First, let's load and inspect the memory development set
import json
from pathlib import Path

def load_memory_devset(path="../tests/devset_memory.jsonl"):
    """Load the memory development set from JSONL."""
    examples = []
    
    with open(path, 'r') as f:
        for line in f:
            data = json.loads(line)
            # Convert to DSPy Example format
            example = dspy.Example(
                q=data["q"],
                exp_tool=data["exp_tool"],
                use_memory=data.get("use_memory", False)
            ).with_inputs("q")
            examples.append(example)
            
    return examples

# Load and display a few examples
memory_devset = load_memory_devset()
print(f"Loaded {len(memory_devset)} memory examples")

for i, example in enumerate(memory_devset[:3]):
    print(f"Example {i+1}:")
    print(f"Query: {example.q}")
    print(f"Expected Tool: {example.exp_tool}")
    print(f"Use Memory: {example.use_memory}")
    print("---")

Loaded 33 memory examples
Example 1:
Query: Remember that wdt:P1476 is title
Expected Tool: AddReflection
Use Memory: False
---
Example 2:
Query: What's the Wikidata title property?
Expected Tool: RecallReflection
Use Memory: True
---
Example 3:
Query: Inject notes into system prompt
Expected Tool: ReflectionPrompt
Use Memory: False
---


In [None]:
# 2. Define a metric function for evaluating tool selection
def tool_match(pred, sample):
    """Check if the expected tool is in the trace."""
    # For ReAct's trajectory output format
    if hasattr(pred, 'trajectory') and pred.trajectory:
        tools_used = [t.get('tool_name') for t in pred.trajectory.values() 
                     if isinstance(t, dict) and 'tool_name' in t]
        return sample.exp_tool in tools_used
    
    # For other output formats
    return sample.exp_tool in getattr(pred, 'trace', [])

In [None]:
# 3. Define a memory-focused agent for training
class MemoryAgent(dspy.Module):
    """A DSPy agent focused on memory operations."""
    
    def __init__(self):
        super().__init__()
        # Define submodules for different decisions
        self.decide_memory_action = dspy.ChainOfThought("q -> memory_action")
        
        # Define memory-related tools
        memory_tools = [tool for tool in TOOLS 
                       if tool.__name__ in ["AddReflection", "RecallReflection", "ReflectionPrompt"]]
        
        # Create a ReAct agent for execution
        self.executor = dspy.ReAct(
            signature=dspy.Signature(
                "query:str -> answer:str",
                instructions="You are a memory assistant that can store and recall information."
            ),
            tools=memory_tools,
            max_iters=3
        )
    
    def forward(self, q):
        # Decide what memory action to take
        memory_decision = self.decide_memory_action(q=q)
        
        # Execute with ReAct
        result = self.executor(query=q)
        
        # Add tool trace for evaluation
        result.trace = []
        if hasattr(result, 'trajectory'):
            for step in result.trajectory.values():
                if isinstance(step, dict) and 'tool_name' in step:
                    result.trace.append(step['tool_name'])
        
        return result

In [None]:
# 4. Set up bootstrap training
from dspy.teleprompt import BootstrapFewShot

# Split into train and validation
from sklearn.model_selection import train_test_split
train_examples, val_examples = train_test_split(
    memory_devset, test_size=0.2, random_state=42
)

print(f"Training on {len(train_examples)} examples, validating on {len(val_examples)} examples")

# Create the base agent
memory_agent = MemoryAgent()

# Define the bootstrap trainer
bootstrap = BootstrapFewShot(
    trainset=train_examples[:5],  # Start with a small subset for demonstration
    metric=tool_match,
    num_threads=1  # Adjust based on your machine
)

Training on 26 examples, validating on 7 examples


TypeError: BootstrapFewShot.__init__() got an unexpected keyword argument 'trainset'

In [None]:
# 5. Run compilation (commented out to avoid LLM costs during development)
# Optimize using DSPy's compilation framework
# optimized_agent = dspy.compile(memory_agent, bootstrap, num_iterations=2)

# For demonstration/development, we'll use the unoptimized version
optimized_agent = memory_agent

print("Using unoptimized agent for demonstration. Uncomment to run actual training.")

In [None]:
# 6. Evaluate the agent
def evaluate_agent(agent, examples):
    """Evaluate an agent on a set of examples."""
    correct = 0
    results = []
    
    for example in examples:
        pred = agent(example.q)
        is_correct = tool_match(pred, example)
        
        if is_correct:
            correct += 1
        
        results.append({
            "query": example.q,
            "expected": example.exp_tool,
            "predicted": getattr(pred, 'trace', []),
            "correct": is_correct
        })
    
    accuracy = correct / len(examples) if examples else 0
    return accuracy, results

# Evaluate on a small subset to demonstrate the process
sample_examples = val_examples[:5]  # Using just a few examples for demonstration
val_accuracy, val_results = evaluate_agent(optimized_agent, sample_examples)
print(f"Validation accuracy on sample: {val_accuracy:.2f}")

In [None]:
# 7. Display example predictions
print("\nExample predictions:")
for i, result in enumerate(val_results[:3]):
    print(f"Query: {result['query']}")
    print(f"Expected: {result['expected']}")
    print(f"Predicted: {result['predicted']}")
    print(f"Correct: {result['correct']}")
    print("---")

In [None]:
# 8. Save the optimized agent (commented out for demonstration)
# import pickle
# import os
# 
# def save_agent(agent, path):
#     """Save the optimized agent to disk."""
#     os.makedirs(os.path.dirname(path), exist_ok=True)
#     with open(path, 'wb') as f:
#         pickle.dump(agent, f)
#     return path
# 
# # saved_path = save_agent(optimized_agent, "../cogitarelink_dspy/optimized/memory_agent.pkl")
# # print(f"Saved optimized agent to {saved_path}")

print("Model saving is commented out for demonstration purposes.")

In [None]:
# 9. Integration with HelloLOD
class HelloLODWithMemory(HelloLOD):
    """HelloLOD agent with integrated memory capabilities."""
    
    def forward(self, query: str):
        # First check if this is a memory operation
        if any(kw in query.lower() for kw in ["remember", "recall", "what did", "inject"]):
            # Use the memory agent for memory-specific operations
            memory_result = optimized_agent(query)
            
            # Extract the answer for the user
            answer = getattr(memory_result, "answer", "Memory operation completed")
            
            # Log memory usage telemetry
            self.telemetry.log("memory_use", 1, tool_iri="urn:agent:HelloLODWithMemory")
            
            return {"answer": answer}
        
        # Otherwise, use the standard HelloLOD behavior
        return super().forward(query)

In [None]:
# Create an instance of the integrated agent
hello_with_memory = HelloLODWithMemory(agent, telemetry, mem)

# Test with a memory-related query
memory_query = "Remember that rdfs:label is used for human-readable labels"
print(f"Query: {memory_query}")
resp = hello_with_memory(memory_query)
print(f"Response: {resp.get('answer', '')}")

## Conclusion

In this notebook, we've implemented a layered approach to semantic web agents using DSPy's structured agent framework. The key components we've created are:

1. **System prompts** that explain the semantic web layers and guide tool selection
2. **Agent implementations** at different capability levels (HelloLOD, HelloLODWithMemory)
3. **Memory integration** with a training pipeline based on a curriculum dataset
4. **Evaluation metrics** to measure performance on memory tasks

This implementation follows Jeremy Howard's step-by-step approach:
- Starting with simple, working demonstrations
- Building complexity incrementally
- Documenting each step clearly
- Testing components in isolation before integration

The next steps would be to:
1. Run the training with a real LLM
2. Expand the curriculum dataset to cover more semantic web tasks
3. Implement advanced techniques like reflection garbage collection
4. Create a full evaluation suite for comprehensive testing