Data Flow Between Agents

The system's data flow is coordinated by the Task Planner Agent:

1. Initial Flow: Document → Task Planner → Pre-processor
2. Information Extraction: Pre-processor → Context Bank & Task Planner
3. Knowledge Gathering: Task Planner → Knowledge Agent → Context Bank & Task Planner
4. Compliance Analysis: Task Planner → Compliance Checker (accessing Context Bank)
   - If knowledge is insufficient → Knowledge Agent (with the missing fields)
   - If knowledge is sufficient → Check compliance for each clause
5. Conditional Processing:
   - If contradictions: Compliance Checker → Clause Rewriter → Compliance Checker
   - If compliant: Compliance Checker → Task Planner
6. Summarizing Changes: Task Planner → Post-processor
7. Task Completion: Post-processor → Final Output → User


In [None]:
from langchain_ollama import ChatOllama
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent
from langgraph_swarm import create_handoff_tool, create_swarm
from langchain_core.tools import tool


In [2]:
model = ChatOllama(model="llama3.1:latest")

## Context Bank Initialization

Initialize a shared context bank instance that will be used by all agents to store and retrieve document information.


In [None]:
from context_bank import ContextBank

# Initialize a single shared context bank instance
context_bank = ContextBank()

# This context_bank will be passed to all agents that need to store or retrieve information

In [None]:
planner_agent_tools = [
    create_handoff_tool(agent_name="pre_processor_agent", description="Transfer when pre-processing is needed, it helps to format and clean the input data."),
    create_handoff_tool(agent_name="knowledge_agent", description="Transfer when knowledge is needed, it helps to retrieve knowledge from the web using websearcher."),
    create_handoff_tool(agent_name="Compliance Checker Agent", description="Transfer when compliance checking is needed, it helps to check legal documents for compliance with regulations."),
    create_handoff_tool(agent_name="Post Processor Agent", description="Transfer when post processing is needed, it helps to format and finalize the output."),
]

planner_agent_node = create_react_agent(
    model,
    planner_agent_tools,
    prompt="""
        You are a Task Planner Agent responsible for coordinating a multi-agent system to analyze legal documents for discrepancies and compliance. Your job is to plan and delegate tasks to specialized agents, track task completion, and dynamically adapt the plan based on the current system state.

        You are aware of the capabilities of the agents and can query or instruct them based on the task at hand. After every task execution, you should validate whether the task was completed successfully. If a task fails or the output is insufficient, you should modify the workflow, reassign the task, or create additional subtasks.

        YOu have access to the following agents/tools:
        - Knowledge Agent: Transfer when knowledge is needed, it helps to retrieve knowledge from the web using websearcher.

        YOU ARE SUPPOSED TO PLAN WHAT AGENTS AND TOOLS TO CALL AND IN WHAT ORDER.
    """,
    name="Planner Agent"
)

In [None]:
import PyPDF2
import re
import spacy
import uuid

# Load once to avoid redundant loading
_nlp = spacy.load("en_core_web_sm")

def preprocess_document_tool_implementation(file_path: str, context_bank, system_prompt) -> dict:
    """
    Consolidated preprocessing tool to be used as a callable function in a multi-agent system.
    Extracts text, title, named entities, and clause classifications from a PDF document.
    """

    # Step 1: Extract text from PDF
    with open(file_path, "rb") as file:
        reader = PyPDF2.PdfReader(file)
        text = "".join(page.extract_text() for page in reader.pages)

    # Step 2: Title Extraction (from first 10 lines)
    def extract_title(text: str) -> str:
        lines = text.split("\n")
        candidates = []
        for i, line in enumerate(lines[:10]):
            clean_line = line.strip()
            if not clean_line or len(clean_line) < 5:
                continue
            score = 0
            if re.match(r"^(CONTRACT|AGREEMENT|PETITION|NOTICE|ORDER|BILL|ACT|STATUTE)\b", clean_line, re.IGNORECASE):
                score += 5
            if re.match(r"^[A-Z\s\-]{5,}$", clean_line):
                score += 2
            if "**" in clean_line or clean_line.center(80) == clean_line:
                score += 1
            candidates.append((clean_line, score))
        candidates.sort(key=lambda x: x[1], reverse=True)
        return candidates[0][0] if candidates else "Unknown Title"

    title = extract_title(text)

    # Step 3: Named Entity Recognition
    doc = _nlp(text)
    entities = [(ent.text, ent.label_) for ent in doc.ents]

    # Step 4: Document + Clause Classification via external LLM system
    llm_output = system_prompt.process_document(text)
    document_class = llm_output.get("CLASS", "")
    clause_classes = llm_output.get("CLAUSES", [])

    # Step 5: Context Banking
    document_id = str(uuid.uuid4())
    context_bank.add_document(document_id, text, {
        "title": title,
        "document_type": document_class,
        "source_file": file_path
    })
    context_bank.add_entities(document_id, entities)
    context_bank.add_clauses(document_id, clause_classes)

    # Final structured output
    return {
        "Document ID": document_id,
        "Text Extracted": text,
        "Document Title": title,
        "Document Class": document_class,
        "Important Clauses": {clause["Text"]: clause["Category"] for clause in clause_classes},
        "Named Entities": entities
    }

In [None]:

@tool
def preprocess_document_tool(description: str) -> dict:
    """
    Creates a tool function for preprocessing legal documents.
    
    Args:
        description: A description of what the tool does
        
    Returns:
        A callable tool function that can preprocess legal documents
    """

    def _preprocess_document(file_path: str) -> dict:
        # Use the shared context bank instance
        from agents.utils.system_prompt import SystemPrompt
        system_prompt = SystemPrompt()
        
        # Call the implementation with the shared context bank
        return preprocess_document_tool_implementation(file_path, context_bank, system_prompt)
    
    return _preprocess_document

In [None]:
    # create_handoff_tool(agent_name="Knowledge Agent", description="Transfer when knowledge is needed, it helps to retrieve knowledge from the web using websearcher."),

# TODO : Add the other tools for Pre-Processor Agent - Document parser, Text Classifier, Clause Extractor, NER, Context Bank setter
# Update the pre-processor agent tools to include the context bank tools
pre_processor_agent_tools = [
    create_handoff_tool(agent_name="Planner Agent", description="Transfer when pre-processing is completed, it helps to plan the next steps in the workflow and delegate tasks."),
    preprocess_document_tool(description="Preprocess legal documents by rewriting clauses, extracting relevant information, and formatting them for analysis."),
    get_context_bank_data,
    set_context_bank_data
]

pre_processor_agent_node = create_react_agent(
    model,
    pre_processor_agent_tools,
    prompt="""

You are a Preprocessor Agent, a specialized component within a multi-agent legal analysis framework. Your role is to analyze legal documents and extract all relevant information into a structured format for downstream agents. You are the first point of contact in the pipeline and must ensure complete, clean, and accurate extraction.

Core Responsibilities

You are responsible for the following tasks when a legal document is provided:
- Extract the complete text from a legal contract PDF.
- Infer and extract the title of the document from the top sections.
- Classify the document type and its intended purpose using textual understanding.
- Identify named entities such as parties, jurisdictions, laws, organizations, etc.
- Extract important clauses, preserving their content and assigning a semantic category.
- Store all results in a structured, queryable format in the Context Bank.

Available Tools (Abstracted as One Function)

You have access to a single preprocess_document_tool() that integrates the capabilities of multiple tools:

1. PDF Text Extractor
   - Reads and parses PDF documents
   - Preserves structure across pages
   - Supports OCR for scanned files (optional in later versions)

2. Title Inference Module
   - Analyzes top section lines for key patterns and formatting
   - Uses heuristics to determine the most probable title

3. Document Classifier
   - Identifies the overall category of the document using LLM-based classification
   - Supports hierarchical legal types and subtypes

4. Named Entity Recognizer (NER)
   - Detects and labels relevant entities
   - Categories include parties, laws, jurisdictions, deadlines, financials, etc.

5. Clause Extractor
   - Segments the document into clauses
   - Tags clauses by type (e.g., Term, Dispute, Payment, Governing Law)

6. Context Storage
   - Automatically stores the full document, title, entity list, and clause map
   - Associates each element with a unique document ID
   - Enables downstream retrieval and reasoning

Processing Guidelines

- Parse the document thoroughly—do not skip pages, headers, or annexures.
- Prioritize precision in classification and completeness in NER.
- Extract clauses in full, preserving their intent and scope.
- If there is uncertainty in classification or clause type, annotate your result clearly.
- Use document structure and formatting cues (e.g., numbering, headings) to identify sections.

Input:
A legal document in PDF format provided by the user.

Output Format:
{
  "TEXT": "Complete text extracted from the document",
  "TITLE": "Inferred title from document header",
  "CLASS": "Document type classification (e.g., Legal Agreement - Employment Contract)",
  "NER": [
    {"Entity": "ACME Corporation", "Category": "Party-Company"},
    {"Entity": "John Smith", "Category": "Party-Individual"},
    {"Entity": "Americans With Disabilities Act of 1990", "Category": "Law"},
    {"Entity": "California", "Category": "Jurisdiction"}
  ],
  "CLAUSES": [
    {"Text": "Section 3.1: The term of this agreement shall be...", "Category": "Term Clause"},
    {"Text": "Section 7.2: All disputes shall be resolved by...", "Category": "Dispute Resolution"},
    {"Text": "Section 9.5: This agreement shall be governed by...", "Category": "Governing Law"}
  ]
}
    """,
    name="Pre Processor Agent"
)

In [None]:
import requests
import uuid
from typing import List, Dict
from bs4 import BeautifulSoup
from duckduckgo_search import DDGS
from cleantext import clean
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams
from langchain_ollama import OllamaEmbeddings

# Initialize global components once
_qdrant_url = "http://localhost:6333"
_qdrant_collection = "web_content"
_qdrant_client = QdrantClient(url=_qdrant_url)
_num_results = 3
_embeddings_model = OllamaEmbeddings(model="llama3.1")
_ddgs = DDGS()

# Optional: create collection if it doesn't exist
def _ensure_qdrant_collection():
    existing = _qdrant_client.get_collections().collections
    if _qdrant_collection not in [col.name for col in existing]:
        _qdrant_client.create_collection(
            collection_name=_qdrant_collection,
            vectors_config=VectorParams(size=4096, distance='Cosine')
        )

@tool
def retrieve_knowledge_tool(query: str) -> List[Dict]:
    """
    Searches the web for a legal/policy topic,
    scrapes and cleans page content, generates embeddings with Ollama,
    stores them in Qdrant, and retrieves top relevant results.
    Returns a list of summarized search results.
    """
    _ensure_qdrant_collection()

    # Step 1: DuckDuckGo search
    results = [
        {"title": r["title"], "url": r["href"], "snippet": r.get("body", "")}
        for r in _ddgs.text(query, max_results=_num_results)
    ]

    # Step 2: Scrape + clean + embed + store
    for result in results:
        url = result["url"]
        title = result["title"]

        try:
            response = requests.get(url, timeout=10)
            soup = BeautifulSoup(response.content, "html.parser")
            paragraphs = soup.find_all("p")
            content = " ".join(p.get_text() for p in paragraphs)
            content = " ".join(content.split())
            cleaned_content = clean(
                content,
                fix_unicode=True,
                to_ascii=True,
                lower=True,
                no_line_breaks=True,
                lang="en"
            )

            embeddings = _embeddings_model.embed_query(text=cleaned_content)
            point_id = str(uuid.uuid5(uuid.NAMESPACE_URL, url))

            _qdrant_client.upsert(
                collection_name=_qdrant_collection,
                points=[{
                    "id": point_id,
                    "vector": embeddings,
                    "payload": {
                        "title": title,
                        "content": cleaned_content,
                        "url": url
                    }
                }]
            )
        } catch (Exception e):
            print(f"[WARN] Failed to process URL {url}: {e}")

    # Step 3: Search in Qdrant using query embedding
    query_vec = _embeddings_model.embed_query(query)
    search_results = _qdrant_client.query_points(
        collection_name=_qdrant_collection,
        query=query_vec,
        with_payload=True,
        limit=_num_results
    ).points

    return [
        {
            "title": result.payload["title"],
            "content": result.payload["content"][:400] + "...",
            "url": result.payload["url"],
            "score": result.score
        }
        for result in search_results
    ]

In [None]:
# TODO : Add the other tools for Knowledge Agent - Web searcher, Context Bank setter and getter

knowledge_agent_tools = [
    retrieve_knowledge_tool,
    create_handoff_tool(agent_name="Planner Agent", description="Transfer to Planner Agent when knowledge has been retrieved and pass a summary back to it.")
]

knowledge_agent_node = create_react_agent(
    model,
    knowledge_agent_tools,
    prompt=
    """
      You are a Knowledge Retrieval Agent specializing in identifying, extracting, and summarizing verified, authoritative legal information from official U.S. government sources.

  Your responsibilities include:
 - Finding relevant, up-to-date statutes, regulations, and policies based on a given topic.
 - Ensuring that all information is accurate, non-fabricated, and clearly organized.
 - Avoiding non-authoritative, outdated, or speculative sources.

  Tools:
 1. Web Content Retriever: Fetches real-time legal documents, policy updates, and regulations from trusted U.S. government sources.
 2. Context Storage: Stores and organizes retrieved content for future access by other agents or systems, allowing continuity and cross-referencing.

 Input Format:
 Use this format for constructing your Search Query:

 site:congress.gov OR site:govinfo.gov OR site:law.cornell.edu OR site:federalregister.gov OR site:ecfr.gov OR site:justice.gov OR site:whitehouse.gov "[TOPIC]" AND "[FOCUS/CONTEXT]" AND ("[KEYWORD 1]" OR "[KEYWORD 2]") after:[YEAR]

  Where:
 - "TOPIC": Broad legal topic (e.g., “Data Privacy”, “Environmental Law”, “AI Regulation”)
 - "FOCUS/CONTEXT": Specific angle (e.g., “mobile apps”, “state regulations”, “employment law”)
 - "KEYWORDS": Relevant clauses, sections, or named legal entities (e.g., “Section 230”, “Title VII”)
 - "after:YEAR": Ensures latest updates (e.g., after:2023)

 Guidelines:
 - Use only trusted U.S. government domains listed above.
 - Do not fabricate or paraphrase content inaccurately. Avoid summarizing content you do not fully understand or that lacks a clear source.
 - If no direct source is found, clearly state that no official information could be retrieved.
 - Self-check using this question: “Is the summary accurate and completely grounded in verifiable information?” (Self Ask Prompting)

 Output Format:

 [Law or Regulation Name] — (Source: {Direct Link to Official Text})  
 Summary: [1–3 sentence summary explaining what this law/regulation covers and how it affects the specified topic.]

 Key Provisions:
 - [Section or clause] — [Explanation or impact]
    """,
    name="knowledge_agent",
)

In [None]:
from agents.compliance_checker import check_legal_compliance
from context_bank import ContextBank
from langgraph.prebuilt import ToolNode

# Initialize the Context Bank if not already initialized
context_bank = ContextBank()

# Define a function to get knowledge from the vector database
def get_knowledge_from_vector_db(query, jurisdiction="US", knowledge_type="statutes"):
    """
    Retrieves knowledge from the vector database instead of using the RAGKnowledgeAgent.
    This function will be implemented later to query the vector DB for relevant legal information.
    
    Args:
        query: The query text to search for in the vector database
        jurisdiction: The legal jurisdiction for the search (default: "US")
        knowledge_type: Type of knowledge to retrieve ("statutes" or "precedents")
        
    Returns:
        List of relevant legal information items from the vector database
    """
    # This is a placeholder function that will be implemented later
    # It will query the vector database for relevant legal information
    print(f"Retrieving {knowledge_type} for query: {query} in jurisdiction: {jurisdiction}")
    
    # Return empty list for now - will be replaced with actual vector DB query
    return []


# Create a tool that the ComplianceCheckerAgent can use
@tool
def compliance_check_tool(clauses, document_metadata):
    """
    Tool for checking legal document clauses for compliance issues.
    
    Args:
        clauses: List of clause dictionaries, each containing 'id' and 'text' keys
        document_metadata: Document metadata (jurisdiction, document_type, etc.)
        
    Returns:
        List of non-compliant clauses with detailed analysis
    """
    # Create a knowledge retrieval adapter that mimics a knowledge agent
    # but actually uses the vector DB directly
    class VectorDBKnowledgeAdapter:
        def find_relevant_statutes(self, query, jurisdiction="US"):
            return get_knowledge_from_vector_db(query, jurisdiction, "statutes")
        
        def find_relevant_precedents(self, query, jurisdiction="US"):
            return get_knowledge_from_vector_db(query, jurisdiction, "precedents")
    
    # Create the adapter to pass instead of a full RAGKnowledgeAgent
    vector_db_adapter = VectorDBKnowledgeAdapter()
    
    return check_legal_compliance(
        clauses=clauses,
        document_metadata=document_metadata,
        context_bank=context_bank,
        knowledge_agent=vector_db_adapter,  # Pass the adapter instead of RAGKnowledgeAgent
        use_ollama=True,
        model_name="llama3.1:latest",
        min_confidence=0.75
    )


# # Update the compliance_checker_agent_tools to include our new tool
# compliance_checker_agent_tools = [
#     create_handoff_tool(agent_name="Knowledge Agent", description="Transfer to Knowledge Agent if more knowledge is needed, it helps to retrieve knowledge from the web using websearcher."),
#     create_handoff_tool(agent_name="Clause Rewriter Agent", description="Transfer to Clause Rewriter Agent if a clause has some discrepancy and rewriting is needed, it helps to rewrite non-compliant clauses in the document."),
#     create_handoff_tool(agent_name="Planner Agent", description="Transfer to Planner Agent when compliance checking is completed and all clauses are found to be compliant, it helps to plan the next steps in the workflow and delegate tasks."),
#     compliance_tool_node
# ]

In [None]:
# TODO : Add the other tools for Compliance Checker Agent - Clause Compliance Checker (which contains the Statutory Validator, Precedent Analyzer, Contractual Consistency Engine, Hypergraph Analyzer, Confidence Scorer), Context Bank getter

compliance_checker_agent_tools = [
    create_handoff_tool(agent_name="Knowledge Agent", description="Transfer to Knowledge Agent if more knowledge is needed, it helps to retrieve knowledge from the web using websearcher."),
    create_handoff_tool(agent_name="Clause Rewriter Agent", description="Transfer to Clause Rewriter Agent if a clause has some discrepancy and rewriting is needed, it helps to rewrite non-compliant clauses in the document."),
    create_handoff_tool(agent_name="Planner Agent", description="Transfer to Planner Agent when compliance checking is completed and all clauses are found to be compliant, it helps to plan the next steps in the workflow and delegate tasks."),
    compliance_check_tool(description="Checks legal document clauses for compliance issues and returns a structured list of non-compliant clauses.")
]

compliance_checker_agent_node = create_react_agent(
    model,
    compliance_checker_agent_tools,
    prompt="""
    """,
    name="Compliance Checker Agent",
)

In [7]:
# TODO : Add the other tools for Clause Rewriter Agent -

clause_rewriter_agent_tools = [
    create_handoff_tool(agent_name="Compliance Checker Agent", description="Transfer to Compliance Checker Agent after a non-compliant clause has been rewritten, it helps to check the compliance of the rewritten clause."),
]

clause_rewriter_agent_node = create_react_agent(
    model,
    clause_rewriter_agent_tools,
    prompt="""
    """,
    name="Clause Rewriter Agent",
)

In [8]:
# TODO : Add the other tools for Post-Processor Agent - Process Summarizer, Context Bank getter

post_processor_agent_tools = [
    create_handoff_tool(agent_name="Planner Agent", description="Transfer to Planner Agent when post-processing is completed, it helps to plan the next steps in the workflow and delegate tasks."),
]

post_processor_agent_node = create_react_agent(
    model,
    post_processor_agent_tools,
    prompt="""
    """,
    name="Post Processor Agent",
)

In [9]:
checkpointer = InMemorySaver()
workflow = create_swarm(
    [planner_agent_node, pre_processor_agent_node, knowledge_agent_node, compliance_checker_agent_node, clause_rewriter_agent_node, post_processor_agent_node],
    default_active_agent="Planner Agent"
)
app = workflow.compile(checkpointer=checkpointer)

In [None]:
config = {"configurable": {"thread_id": "1"}}
turn_1 = app.invoke(
    {"messages": [{"role": "user", "content": "Ask the Knowledge Agent to retrieve information and add the number of letters in the response."}]},
    config,
)
print(turn_1)


In [None]:
@tool
def get_context_bank_data(document_id: str, data_type: str) -> dict:
    """
    Tool for retrieving data from the context bank
    
    Args:
        document_id: The ID of the document to retrieve data for
        data_type: The type of data to retrieve ("document", "entities", "clauses", or "all")
        
    Returns:
        The requested data from the context bank
    """
    if data_type == "document":
        return context_bank.get_document(document_id)
    elif data_type == "entities":
        return context_bank.get_entities(document_id)
    elif data_type == "clauses":
        return context_bank.get_clauses(document_id)
    elif data_type == "all":
        return {
            "document": context_bank.get_document(document_id),
            "entities": context_bank.get_entities(document_id),
            "clauses": context_bank.get_clauses(document_id)
        }
    else:
        return {"error": f"Invalid data_type: {data_type}. Must be 'document', 'entities', 'clauses', or 'all'."}

@tool
def set_context_bank_data(document_id: str, data_type: str, data: dict) -> dict:
    """
    Tool for setting data in the context bank
    
    Args:
        document_id: The ID of the document to set data for
        data_type: The type of data to set ("document", "entities", "clauses")
        data: The data to set
        
    Returns:
        Success message
    """
    if data_type == "document":
        context_bank.add_document(document_id, data.get("text", ""), data.get("metadata", {}))
    elif data_type == "entities":
        context_bank.add_entities(document_id, data.get("entities", []))
    elif data_type == "clauses":
        context_bank.add_clauses(document_id, data.get("clauses", []))
    else:
        return {"error": f"Invalid data_type: {data_type}. Must be 'document', 'entities', or 'clauses'."}
        
    return {"success": True, "message": f"Successfully set {data_type} data for document {document_id}"}