In [None]:
import io
import re
import requests
import pdfplumber
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Optional

from langgraph.graph import StateGraph, START, END
from langchain.schema import BaseMessage
from rapidfuzz import process, fuzz

from langchain.tools import tool
from langchain_openai import ChatOpenAI
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain.prompts import ChatPromptTemplate

In [2]:
NCAL_URL = "https://healthy.kaiserpermanente.org/content/dam/kporg/final/documents/health-education-materials/fact-sheets/1561931445-oe-sample-fees-list-ncal.pdf"
SCAL_URL = "https://healthy.kaiserpermanente.org/content/dam/kporg/final/documents/health-education-materials/fact-sheets/1561930325-oe-sample-fees-list-scal.pdf"

In [3]:
# Zip -> Region heuristic:
# CA ZIPs 90000–93599 ~ SoCal, 93600–96199 ~ NorCal (good-enough for routing the Kaiser PDFs)
def which_region(zipcode: str) -> str:
    try:
        z = int(re.sub(r"\D", "", zipcode))  # keep digits only
    except ValueError:
        return "ncal"  # safe default
    if 90000 <= z <= 93599:
        return "scal"
    elif 93600 <= z <= 96199:
        return "ncal"
    # If not a CA ZIP, default to NCAL doc to avoid false negatives
    return "ncal"

In [4]:
# ---------- PDF fetch & parse
def fetch_pdf_bytes(url: str) -> bytes:
    r = requests.get(url, timeout=20)
    r.raise_for_status()
    return r.content

def normalize(s: str) -> str:
    return re.sub(r"\s+", " ", s).strip().lower()

def extract_services_from_pdf(pdf_bytes: bytes) -> List[Tuple[str, str, bool]]:
    """
    Returns a list of (service_text, fee_text, has_star).
    This uses text extraction rather than table parsing to avoid extra deps.
    """
    out: List[Tuple[str, str, bool]] = []
    with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf:
        for page in pdf.pages:
            text = page.extract_text() or ""
            # Kaiser sample-fee PDFs are typically two-column tables: "Service" ... "Sample fee"
            # We'll scan line-by-line; a fee tends to look like $xx or $xx.xx, sometimes ranges.
            lines = [l.strip() for l in text.splitlines() if l.strip()]
            for ln in lines:
                # Heuristic: capture "...  $12" at line end; allow dots and spaces between columns
                m = re.search(r"(.+?)\s+(\$[\d,]+(?:\.\d{2})?)\s*$", ln)
                if m:
                    service_raw = m.group(1).strip()
                    fee = m.group(2).strip()
                    star = service_raw.endswith("*")
                    out.append((service_raw, fee, star))
    # Deduplicate while preserving first occurrence
    seen = set()
    uniq = []
    for s, f, st in out:
        key = (normalize(s), f)
        if key not in seen:
            seen.add(key)
            uniq.append((s, f, st))
    return uniq

In [5]:
def build_index(rows: List[Tuple[str, str, bool]]) -> Dict[str, Dict]:
    """
    Maps normalized service name -> {'service': original, 'fee': fee, 'star': bool}
    If duplicates exist, keep the cheapest fee as the most conservative sample.
    """
    idx: Dict[str, Dict] = {}
    for service, fee, star in rows:
        k = normalize(service.rstrip("*"))
        if k not in idx:
            idx[k] = {"service": service, "fee": fee, "star": star}
        else:
            # if duplicate, prefer lowest fee string numerically when possible
            def fee_to_float(s):
                try:
                    return float(s.replace("$", "").replace(",", ""))
                except Exception:
                    return 1e12
            if fee_to_float(fee) < fee_to_float(idx[k]["fee"]):
                idx[k] = {"service": service, "fee": fee, "star": star}
    return idx

In [6]:
def fuzzy_lookup(service_query: str, idx: Dict[str, Dict]) -> Tuple[Optional[str], Optional[Dict], List[str]]:
    choices = list(idx.keys())
    if not choices:
        return None, None, []
    match, score, _ = process.extractOne(
        normalize(service_query),
        choices,
        scorer=fuzz.WRatio
    )
    # Also return a few alternates for context
    alts = [c for c, _s, _ in process.extract(normalize(service_query), choices, limit=5)]
    return match, idx.get(match), alts

In [8]:
# ---------- LangGraph state
@dataclass
class AgentState:
    zipcode: str
    service: str
    region: Optional[str] = None
    pdf_url: Optional[str] = None
    rows: List[Tuple[str, str, bool]] = field(default_factory=list)
    index: Dict[str, Dict] = field(default_factory=dict)
    match_key: Optional[str] = None
    match_payload: Optional[Dict] = None
    alternates: List[str] = field(default_factory=list)
    message: Optional[str] = None  # final text

# ---------- Nodes
def route_region(state: AgentState) -> AgentState:
    region = which_region(state.zipcode)
    state.region = region
    state.pdf_url = NCAL_URL if region == "ncal" else SCAL_URL
    return state

def fetch_and_parse(state: AgentState) -> AgentState:
    pdf_bytes = fetch_pdf_bytes(state.pdf_url)
    rows = extract_services_from_pdf(pdf_bytes)
    state.rows = rows
    state.index = build_index(rows)
    return state

def find_service(state: AgentState) -> AgentState:
    key, payload, alts = fuzzy_lookup(state.service, state.index)
    state.match_key = key
    state.match_payload = payload
    state.alternates = alts
    return state

def craft_answer(state: AgentState) -> AgentState:
    region_label = "Northern California" if state.region == "ncal" else "Southern California"
    base_note = (
        "Note: In the Kaiser sample-fee list, a trailing '*' on a service means "
        "**it may be covered at no cost depending on your plan/eligibility**."
    )
    if not state.match_payload:
        # Not found; show nearby options
        alt_str = ", ".join(state.alternates[:5]) if state.alternates else "No close matches."
        state.message = (
            f"Couldn’t find an exact match for **{state.service}** in the {region_label} sample-fee list.\n\n"
            f"Closest matches: {alt_str}\n\n{base_note}"
        )
        return state

    svc = state.match_payload["service"]
    fee = state.match_payload["fee"]
    has_star = state.match_payload["star"]
    star_msg = "This service has a '*' and **may be covered/no cost** on some plans." if has_star else \
               "This service **does not** have a '*', so the sample fee typically applies."
    state.message = (
        f"**Region:** {region_label}\n"
        f"**Service (matched):** {svc}\n"
        f"**Sample fee:** {fee}\n\n"
        f"{star_msg}\n\n"
        f"{base_note}\n\n"
        f"Source: {state.pdf_url}"
    )
    return state

def build_graph():
    g = StateGraph(AgentState)
    g.add_node("route_region", route_region)
    g.add_node("fetch_and_parse", fetch_and_parse)
    g.add_node("find_service", find_service)
    g.add_node("craft_answer", craft_answer)

    g.add_edge(START, "route_region")
    g.add_edge("route_region", "fetch_and_parse")
    g.add_edge("fetch_and_parse", "find_service")
    g.add_edge("find_service", "craft_answer")
    g.add_edge("craft_answer", END)
    return g.compile()

# ---------- Convenience function
def get_sample_fee(zipcode: str, service: str) -> str:
    graph = build_graph()
    final_state: AgentState = graph.invoke(AgentState(zipcode=zipcode, service=service))
    return final_state.message

# ---------- Example (uncomment to try)
# if __name__ == "__main__":
#     print(get_sample_fee("94110", "Tdap vaccine*"))
#     print(get_sample_fee("90210", "chlamydia screening"))

In [14]:
# ---- Define the tool
@tool("kaiser_fee_lookup", return_direct=True)
def kaiser_fee_lookup(zipcode: str, service: str) -> str:
    """Return the sample fee for a given Kaiser Permanente service name and ZIP code."""
    return get_sample_fee(zipcode, service)

# ---- System prompt
system_prompt = """You are a helpful health-plan cost assistant.
When a user asks about prices for Kaiser Permanente services, 
call the `kaiser_fee_lookup` tool with their ZIP and service name.
If the result includes an asterisk note, explain that it may be covered at no cost."""

# ---- Build LLM and agent
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
tools = [kaiser_fee_lookup]

# ---- Prompt template (for flexible queries)
prompt = ChatPromptTemplate.from_messages([
    ("system", system_prompt),
    ("human", "{input}")
])

agent = create_tool_calling_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

# ---- Run it
executor.invoke({"input": "Tdap vaccine cost in 90210"})

NameError: name 'create_tool_calling_agent' is not defined