# Domain 2 Survey Bot - Quick Test - Clear
This notebook demonstrates the Pydantic AI survey bot for Domain 1: Handwash, Water Sanitation

In [3]:
import sys
print(sys.executable)

/Users/ruigangjiang/anaconda3/envs/riskprof/bin/python


In [1]:
import os
from dotenv import load_dotenv

load_dotenv(dotenv_path=".env")

assert os.getenv("OPENAI_API_KEY"), "OPENAI_API_KEY is not loaded. Check .env path / working directory."
print("OPENAI_API_KEY loaded ✅")

OPENAI_API_KEY loaded ✅


In [2]:
from agents.domain2_agent import (
    get_domain2_conversation_agent,
    get_domain2_extraction_agent,
)
from models.domain2 import Domain2SurveyDeps, Domain2Data

### Survey Simulation

In [7]:
user_answers = [
    "piped water",
    "yes",
    "boil",
    "pit latrine",
    "yes, with soap and water",
    "sometimes",
]

async def simulate_domain2_convo(user_answers):
    agent = get_domain2_conversation_agent()
    deps = Domain2SurveyDeps()

    # start
    r = await agent.run("Start the survey by greeting the respondent and then ask Q1.", deps=deps)
    print("AGENT:", r.response.text)
    deps.conversation_history.append(f"Agent: {r.response.text}")

    # feed answers with conversation history
    for ans in user_answers:
        deps.conversation_history.append(f"User: {ans}")
        conversation_text = "\n".join(deps.conversation_history)

        prompt = f"""
You are continuing the SAME survey interview.
Use the conversation history below to determine what question to ask next.
Do NOT restart the survey. Do NOT ask Q1 again if it was already answered.
Ask exactly ONE next question (or the Q2a follow-up if needed).

Conversation history:
{conversation_text}

Now respond as the interviewer with the next turn:
""".strip()

        r = await agent.run(prompt, deps=deps)
        print("\nUSER:", ans)
        print("AGENT:", r.response.text)
        deps.conversation_history.append(f"Agent: {r.response.text}")

    return deps

deps = await simulate_domain2_convo(user_answers)

AGENT: Hello! Thank you for participating in this survey. Could you please tell me your main source of drinking water?

USER: piped water
AGENT: Do you treat your drinking water in any way to make it safer?

USER: yes
AGENT: What method do you use to treat your drinking water?

USER: boil
AGENT: What type of toilet facility do you use?

USER: pit latrine
AGENT: Agent: Do you have a designated place for handwashing, and if so, does it have soap and water?

USER: yes, with soap and water
AGENT: How often do you wash your hands after using the toilet?

USER: sometimes
AGENT: Thank you for your responses. That concludes our survey. Have a great day!


### Extract Result

In [12]:
import json

def coerce_extraction_to_dict(er_response):
    """
    Handles:
    1) dict
    2) ModelResponse with ToolCallPart(tool_name='final_result', args='...json...')
    3) plain text containing json
    Returns a dict with the final 6 keys (water_source, treats_water, ...).
    """
    # Case 1: already dict
    if isinstance(er_response, dict):
        return er_response

    # Case 2: ModelResponse with tool call args
    if hasattr(er_response, "parts") and er_response.parts:
        for p in er_response.parts:
            # ToolCallPart typically has .tool_name and .args
            if hasattr(p, "tool_name") and hasattr(p, "args"):
                try:
                    payload = json.loads(p.args)
                    # your tool returns {"response": {...}}
                    if isinstance(payload, dict) and "response" in payload and isinstance(payload["response"], dict):
                        return payload["response"]
                    # sometimes it may return the dict directly
                    if isinstance(payload, dict):
                        return payload
                except Exception:
                    pass

    # Case 3: fallback to string parse
    s = getattr(er_response, "text", None) or str(er_response)
    start, end = s.find("{"), s.rfind("}")
    if start != -1 and end != -1 and end > start:
        try:
            payload = json.loads(s[start:end+1])
            if isinstance(payload, dict) and "response" in payload and isinstance(payload["response"], dict):
                return payload["response"]
            if isinstance(payload, dict):
                return payload
        except Exception:
            pass

    return {}


async def extract_domain2_cell_d(deps):
    extract_agent = get_domain2_extraction_agent()
    conversation_text = "\n".join(deps.conversation_history)

    er = await extract_agent.run(
        "Return ONLY the JSON object with the required keys. No extra text.\n\nConversation:\n"
        + conversation_text
    )

    answers = coerce_extraction_to_dict(er.response)
    data = Domain2Data.from_answers(answers)
    return answers, data

answers, data = await extract_domain2_cell_d(deps)

print("RAW EXTRACTED DICT:")
print(json.dumps(answers, indent=2, ensure_ascii=False))

print("\nNORMALIZED Domain2Data:")
print(data)

print("\nSUMMARY:")
print(json.dumps(data.get_risk_summary(), indent=2, ensure_ascii=False))

RAW EXTRACTED DICT:
{
  "water_source": "Piped water",
  "treats_water": true,
  "water_treatment_method": "Boil",
  "toilet_type": "Pit latrine",
  "handwashing_station": "Yes, with soap and water",
  "washes_after_toilet": "Sometimes"
}

NORMALIZED Domain2Data:
water_source=<WaterSource.PIPED: 'Piped water'> treats_water=True water_treatment_method=<WaterTreatmentMethod.BOIL: 'Boil'> toilet_type=<ToiletType.PIT: 'Pit latrine'> handwashing_station=<HandwashingStation.SOAP_AND_WATER: 'Yes, with soap and water'> washes_after_toilet=<HandwashFrequency.SOMETIMES: 'Sometimes'>

SUMMARY:
{
  "domain": "WASH (Water, Sanitation, Handwashing)",
  "domain_weight": 0.2,
  "water_source": "Piped water",
  "treats_water": true,
  "water_treatment_method": "Boil",
  "toilet_type": "Pit latrine",
  "handwashing_station": "Yes, with soap and water",
  "washes_after_toilet": "Sometimes",
  "wash_risk_score": 4.2,
  "weighted_score": 0.84
}
