In [22]:
import os
import json
from typing import TypedDict, List, Dict, Literal
from langgraph.graph import StateGraph, END
from openai import AsyncOpenAI

# Make sure to set your OpenAI API key as an environment variable
# For example: os.environ["OPENAI_API_KEY"] = "your_key_here"

## 1. Define Agent State

This state will be passed between the nodes in our graph. It contains the question, the router's decision, the final response, and any errors.

In [23]:
class ReasoningAgentState(TypedDict, total=False):
    question: str
    route: Literal["general_qa", "sbvr_extraction"]
    response: str | List[Dict]
    error: str

## 2. Build the Toolbox

We will use our new `generic_engine_builder.py` to create two distinct search engine instances.
- `sbvr_engine`: Configured with a specific prompt to extract JSON rules.
- `qa_engine`: Configured with a general prompt for conversational answers.

In [24]:
from generic_engine_builder import build_search_engine

# --- Define paths and response types ---
INDEX_ROOT = os.path.join("..", "kyc_workflow", "output")
QA_PROMPT_PATH = "cust_local_search_system_prompt.txt"
SBVR_PROMPT_PATH = "sbvr_local_search_system_prompt.txt" 

sbvr_response_type = "Return SBVR-style business rules as a single JSON object with a top-level 'rules' array."

# --- Build Tool A: SBVR Extractor ---
print("Building SBVR Extraction tool...")
sbvr_engine = build_search_engine(
    index_root=INDEX_ROOT,
    system_prompt_path=SBVR_PROMPT_PATH,
    response_type=sbvr_response_type
)
print("...SBVR tool ready.")

# --- Build Tool B: General QA ---
print("\nBuilding General QA tool...")
qa_engine = build_search_engine(
    index_root=INDEX_ROOT,
    system_prompt_path=QA_PROMPT_PATH,
    response_type="multiple paragraphs"
)
print("...QA tool ready.")

print("\n‚úÖ Toolbox is ready with 2 tools.")

Building SBVR Extraction tool...
...SBVR tool ready.

Building General QA tool...
...QA tool ready.

‚úÖ Toolbox is ready with 2 tools.


## 3. Create Tool Nodes

These are the nodes that will execute the tools. Each node calls its respective search engine.

In [25]:
# SBVR Extraction Node
async def sbvr_extraction_node(state: ReasoningAgentState) -> ReasoningAgentState:
    print("ü§ñ Executing SBVR Extraction tool...")
    try:
        result = await sbvr_engine.search(state["question"])
        data = json.loads(result.response)
        rules = data.get("rules", [])
        return {"response": rules}
    except (json.JSONDecodeError, AttributeError) as e:
        print(f"Error in SBVR node: {e}")
        return {"response": [], "error": "SBVR tool failed to produce valid JSON."}

# General QA Node
async def general_qa_node(state: ReasoningAgentState) -> ReasoningAgentState:
    print("ü§ñ Executing General QA tool...")
    try:
        result = await qa_engine.search(state["question"])
        return {"response": result.response}
    except Exception as e:
        print(f"Error in QA node: {e}")
        return {"response": "", "error": "General QA tool failed."}

## 4. Create the Brain (Router)

This node decides which tool to use based on the user's question. It calls an LLM with a specific prompt to make this decision.

In [26]:
client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

ROUTER_PROMPT = """You are an expert router. Your job is to determine which tool to use based on the user's question.
You have two tools available:
1. `general_qa`: Use this for general questions, definitions, or summaries about a policy.
2. `sbvr_extraction`: Use this ONLY when the user explicitly asks to "extract rules", "find obligations", "list requirements", or other similar phrases that imply structured, rule-based output.

User question: "{question}"

Based on the user question, which tool should be used?
Return ONLY the name of the tool: `general_qa` or `sbvr_extraction`.
"""

async def router_node(state: ReasoningAgentState) -> ReasoningAgentState:
    print("\\nüß† Brain is thinking... Which tool to use?")
    prompt = ROUTER_PROMPT.format(question=state["question"])
    
    response = await client.chat.completions.create(
        model="gpt-4o-mini", # Using a fast and cheap model for routing
        messages=[{"role": "user", "content": prompt}],
        temperature=0,
    )
    route = response.choices[0].message.content.strip()
    
    if route not in ["general_qa", "sbvr_extraction"]:
        print(f"‚ö†Ô∏è Router made an invalid choice: '{route}'. Defaulting to 'general_qa'.")
        route = "general_qa"
        
    print(f"üí° Decision: Use tool '{route}'")
    return {"route": route}

## 5. Build the Graph

Now we wire everything together using LangGraph's `add_conditional_edges`.

In [27]:
def build_reasoning_graph():
    graph = StateGraph(ReasoningAgentState)

    # 1. Add all nodes
    graph.add_node("router", router_node)
    graph.add_node("sbvr_extractor", sbvr_extraction_node)
    graph.add_node("general_qa", general_qa_node)

    # 2. Set the entry point
    graph.set_entry_point("router")

    # 3. Add conditional edges
    def decide_next_node(state: ReasoningAgentState):
        return state["route"]

    graph.add_conditional_edges(
        "router",
        decide_next_node,
        {
            "sbvr_extraction": "sbvr_extractor",
            "general_qa": "general_qa",
        }
    )

    # 4. Connect tool nodes to the end
    graph.add_edge("sbvr_extractor", END)
    graph.add_edge("general_qa", END)

    # 5. Compile the graph
    return graph.compile()

reasoning_agent = build_reasoning_graph()
print("‚úÖ Reasoning Agent is compiled and ready.")

‚úÖ Reasoning Agent is compiled and ready.


## 6. Run the Agent

Let's test the agent with two different types of questions to see the router in action.

In [29]:
# --- Test Question 1: Should trigger SBVR Extraction ---
question_1 = "Extract all CDD rules that apply to non-account holders"
print(f"--- Running Agent for: '{question_1}' ---")
result_1 = await reasoning_agent.ainvoke({"question": question_1})

print("\\n--- Result for Question 1 ---")
if result_1.get("error"):
    print("Error:", result_1.get("error"))
else:
    print(json.dumps(result_1.get("response"), indent=2, ensure_ascii=False))

print("\\n" + "="*50 + "\\n")

# --- Test Question 2: Should trigger General QA ---
question_2 = "What is customer due diligence according to HKMA?"
print(f"--- Running Agent for: '{question_2}' ---")
result_2 = await reasoning_agent.ainvoke({"question": question_2})

print("\\n--- Result for Question 2 ---")
if result_2.get("error"):
    print("Error:", result_2.get("error"))
else:
    print(result_2.get("response"))

--- Running Agent for: 'Extract all CDD rules that apply to non-account holders' ---
\nüß† Brain is thinking... Which tool to use?
üí° Decision: Use tool 'sbvr_extraction'
ü§ñ Executing SBVR Extraction tool...
üí° Decision: Use tool 'sbvr_extraction'
ü§ñ Executing SBVR Extraction tool...


Reached token limit - reverting to previous context state
Reached token limit - reverting to previous context state


\n--- Result for Question 1 ---
[
  {
    "id": "CDD-R-NAH-001",
    "source_reference": "g33a: SUPPLEMENT TO THE GUIDELINE ON PREVENTION OF MONEY LAUNDERING, Section 3. Customer due diligence.",
    "clause_reference": "3.10, 3.11",
    "data_support": "[Data: TextUnits (26)]",
    "subject": "authorized institution",
    "modality": "obligation",
    "condition": "WHEN carrying out transactions exceeding HK$120,000 on behalf of a non-account holder",
    "action": "perform customer due diligence measures on the non-account holder",
    "action_steps": [
      "identify and verify the direct customer",
      "identify and verify any natural persons representing the customer, including their authority to act",
      "enquire if any beneficial owner exists and take reasonable measures to verify their identity",
      "take reasonable measures to understand the ownership structure if the customer is a corporate",
      "ascertain the intended nature and purpose of the transaction unless 

Reached token limit - reverting to previous context state
Reached token limit - reverting to previous context state


\n--- Result for Question 2 ---
### Definition of Customer Due Diligence (CDD) According to HKMA

Customer Due Diligence (CDD) is a comprehensive policy framework and risk management process mandated by the Hong Kong Monetary Authority (HKMA) for Authorized Institutions (AIs). It involves verifying the identity of customers and beneficial owners, assessing their risk profiles, and implementing measures to prevent financial crimes such as money laundering and terrorist financing. The process requires AIs to identify and verify customers through reliable means, including face-to-face interviews or trusted intermediaries, and to understand ownership structures to ensure transparency. CDD also involves ongoing monitoring of transactions to detect suspicious activities and manage risks effectively [Data: Entities (57, 78, 325); Relationships (574, 86)] (Source: g33a: SUPPLEMENT TO THE GUIDELINE ON PREVENTION OF MONEY LAUNDERING, Section 3. Customer due diligence).

### Key Components of CDD