# **Use Case: Building a Compliance-Aware Loan Origination Agent (Multi-Agent Orchestrator)**

In this advanced use case, we expand upon the previous loan agent by implementing a **Multi-Agent System (MAS)** architecture using the **Google Agent Development Kit (ADK)**.

Instead of a single agent doing all the work, we utilize an **Orchestrator Agent** that manages a team of specialized sub-agents. This pattern increases reliability and separation of concerns.

### **Architecture:**
1.  **Orchestrator Agent:** The FCoT-powered manager that plans the workflow and delegates tasks.
2.  **Specialist Team (Sub-Agents):**
    * `document_validator`: Ensures data integrity.
    * `credit_checker`: Interfaces with credit bureaus.
    * `risk_assessor`: Calculates financial risk scores.
    * `compliance_checker`: Validates against regulations.

### **Robustness Features:**
This implementation includes **Enterprise-Grade Robustness patterns**:
* **Rate Limiting:** Proactive throttling to respect API quotas.
* **Exponential Backoff:** Intelligent retries using the `tenacity` library.
* **Error Handling:** Professional-grade error reporting for API failures.

<a target="_blank" href="https://colab.research.google.com/github/PacktPublishing/Agentic-Architectural-Patterns-for-Building-Multi-Agent-Systems/blob/main/Chapter_14/Chapter_14_Multi_Agent.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

In [1]:
#@title Install dependencies
!pip install google-adk litellm tenacity ratelimit

Collecting litellm
  Downloading litellm-1.80.9-py3-none-any.whl.metadata (30 kB)
Collecting ratelimit
  Downloading ratelimit-2.2.1.tar.gz (5.3 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fastuuid>=0.13.0 (from litellm)
  Downloading fastuuid-0.14.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.1 kB)
Collecting grpcio<1.68.0,>=1.62.3 (from litellm)
  Downloading grpcio-1.67.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.9 kB)
INFO: pip is looking at multiple versions of grpcio-status to determine which version is compatible with other requirements. This could take a while.
Collecting grpcio-status<2.0.0,>=1.33.2 (from google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,<3.0.0,>=1.34.1->google-cloud-aiplatform<2.0.0,>=1.125.0->google-cloud-aiplatform[agent-engines]<2.0.0,>=1.125.0->google-adk)
  Downloading grpcio_status-1.76.0-py3-none-any.whl.metadata (1.1 kB)
Collecting protob

In [2]:
#@title Imports
from google.api_core import exceptions as api_exceptions
from google.adk.planners import BuiltInPlanner
from google.adk.agents.llm_agent import LlmAgent
from google.adk.tools import FunctionTool
from google.adk.tools.agent_tool import AgentTool
from google.adk.planners import BuiltInPlanner
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from google.genai.types import ThinkingConfig

import json
import os
import time
import random
import uuid

In [None]:
#@title API Setup

from getpass import getpass

GEMINI=getpass("Enter your GEMINI API KEY: ")
os.environ["GOOGLE_API_KEY"]=GEMINI
print(f"Google API Key set: {'Yes' if os.environ.get('GOOGLE_API_KEY') and os.environ['GOOGLE_API_KEY'] != 'YOUR_GOOGLE_API_KEY' else 'No (REPLACE PLACEHOLDER!)'}")

model= "gemini-3-flash"

In [4]:
#@title Create and Equip All Specialist Sub-Agents with Tools

from google.adk.agents.llm_agent import LlmAgent
from google.adk.tools import FunctionTool, AgentTool
import json

# --- 1. Define Python Functions to Serve as Tools ---

def validate_document_fields(application_data: str) -> str:
    """
    Validates that the loan application contains all required fields.
    Input is a JSON string of the application.
    """
    print(f"--- TOOL: Validating document fields ---")
    try:
        data = json.loads(application_data)
        required_fields = ["customer_id", "loan_amount", "income", "credit_history"]
        missing_fields = [field for field in required_fields if field not in data]
        if missing_fields:
            return json.dumps({"error": f"Validation failed: Missing required fields: {', '.join(missing_fields)}"})
        return json.dumps({"status": "validated", "data": data})
    except json.JSONDecodeError:
        return json.dumps({"error": "Invalid JSON format in application data."})

def query_credit_bureau_api(customer_id: str) -> str:
    """Simulates a call to a credit bureau API to retrieve a credit score."""
    print(f"--- TOOL: Calling Credit Bureau API for customer: {customer_id} ---")
    mock_credit_scores = {
        "CUST-12345": 810,  # Happy Path
        "CUST-55555": 680,  # High Risk Path
        "CUST-00700": 0    # Compliance Path (represents 'No History')
    }
    score = mock_credit_scores.get(customer_id)
    if score is not None:
        return json.dumps({"customer_id": customer_id, "credit_score": score})
    return json.dumps({"error": "Customer ID not found."})

def calculate_risk_score(loan_amount: int, income: str, credit_score: int) -> str:
    """Calculates a more realistic risk score based on annual income."""
    print(f"--- TOOL: Calculating risk score ---")
    try:
        # A simple parser to handle inputs like "USD 5000 a month" -> 60000 annually
        income_value = int(''.join(filter(str.isdigit, income)))
        annual_income = income_value * 12 if "month" in income.lower() else income_value
    except (ValueError, TypeError):
        annual_income = 0 # Default to 0 if parsing fails

    if annual_income == 0:
        risk_score = 10
    else:
        # A more forgiving loan-to-annual-income ratio
        loan_to_income_ratio = loan_amount / annual_income
        risk_score = 1 # Start with a very low base risk for everyone

        # Add risk for lower credit scores
        if credit_score < 650:
            risk_score += 4
        elif credit_score < 720:
            risk_score += 2

        # Add risk for high loan-to-income ratios
        if loan_to_income_ratio > 0.8: # e.g., loan is 80% of annual income
            risk_score += 5
        elif loan_to_income_ratio > 0.5: # e.g., loan is 50% of annual income
            risk_score += 2

    return json.dumps({"risk_score": min(risk_score, 10)})

def check_lending_compliance(credit_history: str, risk_score: int) -> str:
    """Checks the application against internal policies, INCLUDING risk score."""
    print(f"--- TOOL: Checking compliance rules (including risk score) ---")
    if credit_history == "No History":
        return json.dumps({"is_compliant": False, "reason": "Policy violation: No credit history is an automatic denial."})
    if risk_score >= 8:
        return json.dumps({"is_compliant": False, "reason": f"Policy violation: Risk score of {risk_score} is too high for approval."})
    return json.dumps({"is_compliant": True, "reason": "Application meets all internal policy guidelines."})


# --- 2. Wrap Functions in ADK FunctionTools ---

validation_tool = FunctionTool(func=validate_document_fields)
credit_tool = FunctionTool(func=query_credit_bureau_api)
risk_tool = FunctionTool(func=calculate_risk_score)
compliance_tool = FunctionTool(func=check_lending_compliance)


# --- 3. Update Agent Instructions with Explicit Input Requirements ---

doc_validator_instructions = """
You are a Document Validation Agent.
Your ONLY task is to format loan application information and call the `validate_document_fields` tool.
**INPUT REQUIREMENT:** You must receive the complete, original loan application composed by the fields:
 ["customer_id", "loan_amount", "income", "credit_history"]
If you receive the required input format the data, and call the tool and return its exact output.
If you are missing documents from the input, request the missing items.
"""

credit_check_instructions = """
You are a Credit Check Agent.
Your ONLY task is to call the `query_credit_bureau_api` tool.
**INPUT REQUIREMENT:** You must receive the applicant's 'customer_id'.
If you receive the customer_id, extract it and call the tool. Return the tool's exact output.
If the 'customer_id' is not provided, respond with an error: 'ERROR: Missing customer_id input.'
"""

risk_assessor_instructions = """
You are a Risk Assessment Agent.
Your ONLY task is to call the `calculate_risk_score` tool.
**INPUT REQUIREMENTS:** You must receive the 'loan_amount', 'income', and 'credit_score'.
If you receive all required inputs, extract them and call the tool. Return the tool's exact output.
If any inputs are missing, respond with an error specifying what is missing, for example: 'ERROR: Missing required inputs: income, credit_score.'
"""

compliance_instructions = """
You are a Compliance Agent.
Your ONLY task is to call the `check_lending_compliance` tool.
**INPUT REQUIREMENTS:** You must receive the 'credit_history' and 'credit_score'.
If you receive all required inputs, extract them and call the tool. Return the tool's exact output.
If any inputs are missing, respond with an error: 'ERROR: Missing required inputs: credit_history.'
"""


# --- 4. Create Specialist Agents ---

document_validation_agent = LlmAgent(
    model=model,
    instruction=doc_validator_instructions,
    name="document_validator",
    description="Use this agent to validate the structure and content of a new loan application document.",
    tools=[validation_tool]
)
credit_check_agent = LlmAgent(
    model=model,
    instruction=credit_check_instructions,
    name="credit_checker",
    description="Use this agent to run a credit check on an applicant.",
    tools=[credit_tool]
)
risk_assessment_agent = LlmAgent(
    model=model,
    instruction=risk_assessor_instructions,
    name="risk_assessor",
    description="Use this agent to assess the financial risk of a loan.",
    tools=[risk_tool]
)
compliance_agent = LlmAgent(
    model=model,
    instruction=compliance_instructions,
    name="compliance_checker",
    description="Use this agent for the final step to check the application against all regulatory compliance rules.",
    tools=[compliance_tool]
)

# --- 5. Wrap Agents in AgentTools ---

validator_agent_tool = AgentTool(agent=document_validation_agent)
credit_checker_agent_tool = AgentTool(agent=credit_check_agent)
risk_assessor_agent_tool = AgentTool(agent=risk_assessment_agent)
compliance_checker_agent_tool = AgentTool(agent=compliance_agent)

print("All specialist agents have been created correctly.")

All specialist agents have been created correctly.


In [5]:
#@title Orchestrator Instructions

orchestrator_instructions = """
You are an FCoT-powered Orchestrator Agent managing a team of specialist agents for a Loan Origination Pipeline. Your primary role is to plan the workflow, delegate tasks, and intelligently handle exceptions.

**INSTRUCTION CONTRACT (IC)**

* **Mission:** Originate, evaluate, and approve a loan with full policy compliance, factual grounding, and fairness by delegating tasks to your team of specialist agents.
* **Deliverables:** A final, comprehensive summary of the loan decision and the justification, synthesized from the findings of your specialist agents.
* **Failure Handling Policy:**
    1.  **Reflect:** If a specialist agent returns an error, first analyze the error message. Is the cause of the failure clear?
    2.  **Resolve:** If the error is due to missing information (e.g., a missing field), review the original user request and the outputs from previous steps to see if you can find the missing data yourself. If you can, re-attempt the delegation with the complete information.
    3.  **Escalate:** Only if you cannot resolve the error on your own should you escalate the issue to the user. When you escalate, be specific about what information is needed to proceed.

* **Your Specialist Team & Data Contracts:**
    * **`document_validator`:** Validates the application.
        * **INPUT:** Expects the full, original application data as a single JSON string.
        * **OUTPUT:** Returns the validated data if successful.
    * **`credit_checker`:** Runs a credit check.
        * **INPUT:** Expects the `customer_id` from the validated data.
        * **OUTPUT:** Returns the `credit_score`.
    * **`risk_assessor`:** Assesses financial risk.
        * **INPUT:** Expects `loan_amount`, `income`, and the `credit_score` from the previous step.
        * **OUTPUT:** Returns the `risk_score`.
    * **`compliance_checker`:** Performs the final compliance check.
        * **INPUT:** Expects the `credit_history` from the original data AND the `risk_score` from the previous step.
        * **OUTPUT:** Returns the final compliance status.

**FCoT RECURSIVE LOOP (Delegation-Focused)**

**Iteration 1 (Planning):**
* **RECAP:** Review the user's request and your team's data contracts.
* **REASON:** Formulate a step-by-step plan that correctly sequences the delegation of tasks AND prepares the specific inputs required for each agent based on the data contracts.
* **VERIFY:** Ensure the plan follows the required workflow and correctly maps outputs from one step to the inputs of the next.

**Iteration 2 (Execution & Synthesis):**
* **RECAP:** Execute the plan by delegating to each specialist agent with its required inputs. Handle any failures according to the Failure Handling Policy.
* **REASON:** Collect the results from each specialist. Synthesize their findings into a coherent final summary.
* **VERIFY:** Check that all steps were completed successfully before formulating the final answer.
"""

In [6]:
#@title Agent Initialization

# 1. Configure the agent's reasoning engine (Planner)
thinking_config = ThinkingConfig(
    include_thoughts=True,
    thinking_budget=1024
)
planner = BuiltInPlanner(
    thinking_config=thinking_config
)

# 2. Instantiate the list of tools will be given to the parent Orchestrator agent
delegation_tools = [
    validator_agent_tool,
    credit_checker_agent_tool,
    risk_assessor_agent_tool,
    compliance_checker_agent_tool,
]

# 3. Instantiate the Orchestrator LlmAgent
agent = LlmAgent(
    model="gemini-2.5-flash",
    name="LoanProcessingAgent",
    instruction=orchestrator_instructions,
    planner=planner,
    tools=delegation_tools
)

print("Loan Processing Orchestrator has been created.")

Loan Processing Orchestrator has been created.


In [7]:
#@title Session init
# Define unique IDs for our test user and session
USER_ID = "loan_officer_01"
SESSION_ID = str(uuid.uuid4()) # Generate a new session ID for this run
APP_NAME = "Loan_Agent"

session_service = InMemorySessionService()
session = await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
runner = Runner(agent=agent, app_name=APP_NAME, session_service=session_service)

print(f"Runner is set up. Using Session ID: {SESSION_ID}")

Runner is set up. Using Session ID: 7f1d3e2c-47e5-4614-b4c3-dd0ab8a238d0


In [8]:
#@title Agent Execution
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception
from ratelimit import limits, sleep_and_retry
from google.genai import types
import time

# --- CONFIGURATION ---
CALLS = 15
PERIOD = 60

# --- HELPER: ERROR FILTER ---
def is_rate_limit_error(e):
    msg = str(e)
    return "RESOURCE_EXHAUSTED" in msg or "429" in msg or "ServiceUnavailable" in msg

# --- 1. THE ROBUST RUNNER ---
# Keeps the logic linear and easy to read
@sleep_and_retry
@limits(calls=CALLS, period=PERIOD)
@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=2, min=4, max=30),
    retry=retry_if_exception(is_rate_limit_error),
    reraise=True
)
def start_agent_run(runner, user_id, session_id, content):
    print(f"  >> [Clock {time.strftime('%X')}] Initiating request...")
    return runner.run(user_id=user_id, session_id=session_id, new_message=content)

# --- 2. THE CLEAN CALLER ---
def call_agent(query: str):
    print(f"\n>>>> USER REQUEST: {query.strip()}\n")
    content = types.Content(role='user', parts=[types.Part(text=query)])

    try:
        # Step 1: Start the run (Protected by Retry & Throttling)
        events = start_agent_run(runner, USER_ID, SESSION_ID, content)

        print("--- Agent Activity Log ---")

        # Step 2: Iterate through events (The API calls happen here!)
        for event in events:
            if event.content:
                for part in event.content.parts:
                    if part.thought and part.text:
                        print(f"\nüß† THOUGHT:\n{part.text.strip()}")

                    if part.function_call:
                        tool_name = part.function_call.name
                        tool_args = dict(part.function_call.args)
                        print(f"\nüõ†Ô∏è TOOL CALL: {tool_name}({tool_args})")

                    if part.function_response:
                        tool_name = part.function_response.name
                        tool_output = dict(part.function_response.response)
                        print(f"\n‚Ü©Ô∏è TOOL OUTPUT from {tool_name}:\n{tool_output}")

            if event.is_final_response() and event.content:
                final_text = ""
                for part in event.content.parts:
                    if part.text and not part.thought:
                        final_text = part.text.strip()
                        break

                if final_text:
                    print("\n---------------------------------")
                    print("‚úÖ FINAL RESPONSE:")
                    print(final_text)
                    print("---------------------------------")

    # --- FAILURE: Professional Error Handling ---
    except Exception as e:
        error_msg = str(e)

        # Determine the cause
        is_quota = "RESOURCE_EXHAUSTED" in error_msg or "429" in error_msg
        is_free_tier = "FreeTier" in error_msg or "limit: 20" in error_msg

        print("\n" + "‚îÅ" * 60)
        print("SYSTEM CRITICAL ERROR")
        print("‚îÅ" * 60)

        if is_quota:
            print("  ‚ö†Ô∏è   CAUSE:    QUOTA EXCEEDED (API Refusal)")
            print("  üîç   CONTEXT:  The LLM provider rejected the request.")

            if is_free_tier:
                print("\n  üìâ   DIAGNOSIS: FREE TIER LIMIT REACHED")
                print("       You have hit the hard cap (approx. 20 requests/day).")
                print("       Retry Logic cannot bypass this daily limit.")
                print("\n  üõ†Ô∏è   ACTION:    [1] Wait 24 Hours")
                print("                  [2] Enable Billing (Pay-As-You-Go)")
            else:
                 print(f"\n  üìù   DETAILS:   {error_msg}")
        else:
            print(f"  ‚ö†Ô∏è   CAUSE:    UNEXPECTED EXCEPTION")
            print(f"  üìù   DETAILS:  {error_msg}")

        print("‚îÅ" * 60 + "\n")

In [9]:
# Define our user request for the "happy path" scenario
user_request = """
Please process this new loan application:
    "customer_id": "CUST-12345"
    "income": "USD 5000 a month"
    "loan_amount": 50000,
    "credit_history": "Very Solid",
    "loan_documents": ["doc_1.pdf","doc_2.pdf","credit_history.pdf","loan_application.docx"]
"""

# Call the agent
call_agent(user_request)


>>>> USER REQUEST: Please process this new loan application:
    "customer_id": "CUST-12345"
    "income": "USD 5000 a month"
    "loan_amount": 50000,
    "credit_history": "Very Solid",
    "loan_documents": ["doc_1.pdf","doc_2.pdf","credit_history.pdf","loan_application.docx"]

  >> [Clock 22:09:56] Initiating request...
--- Agent Activity Log ---





üß† THOUGHT:
**Initial Assessment & Strategy for Loan Application Processing**

Okay, so I've got this loan application to process, and it looks pretty straightforward. The user wants me to handle it using my specialized agents. Let me break down my plan to ensure a smooth workflow. First, I need to get the user's data and my team organized.

**Initial Data Understanding:**
I have this initial data:
*   `customer_id`: "CUST-12345"
*   `income`: "USD 5000 a month"
*   `loan_amount`: 50000
*   `credit_history`: "Very Solid"
*   `loan_documents`: ["doc_1.pdf", "doc_2.pdf", "credit_history.pdf", "loan_application.docx"]

**Defining my Team and their Roles:**
I will utilize:
*   `document_validator`: Input is a complete loan application in JSON. It outputs validated data.
*   `credit_checker`: Input is `customer_id` from the validated data. It outputs `credit_score`.
*   `risk_assessor`: Input is `loan_amount`, `income`, and `credit_score`. It outputs `risk_score`.
*   `compliance_checker

In [10]:
# Define our user request for the "not so happy path" scenario
user_request = """
Please process the loan application:

    "customer_id": "CUST-55555"
    "income": "USD 1000 a month"
    "loan_amount": 1000000,
    "credit_history": "Presenting Gaps",
    "loan_documents": ["drivers_license.pdf"]

"""

# Call the agent
call_agent(user_request)


>>>> USER REQUEST: Please process the loan application:

    "customer_id": "CUST-55555"
    "income": "USD 1000 a month"
    "loan_amount": 1000000,
    "credit_history": "Presenting Gaps",
    "loan_documents": ["drivers_license.pdf"]

  >> [Clock 22:11:35] Initiating request...
--- Agent Activity Log ---

üß† THOUGHT:
**Here's how I'm thinking about this new loan application:**

Okay, another one in the pipeline. My primary goal here is to get this application processed efficiently and correctly. I need to make sure I adhere strictly to our loan origination procedures. I'll have to start by following the planned steps, which involves delegating specific tasks to the appropriate specialists.

**The plan is straightforward:** First, I'll need to validate the documents associated with this application. This validation will be done by the `document_validator` tool, which will assess the loan application data. After that, I'll extract the `customer_id` and run a credit check through th