# Voice Agent for Data Capture

Technical stack with LangGraph:

Orchestrator Agent is GPT-4o

Data Aggregator functionality to read, retrieve, and write to existing database with connection string.

Tools:
- Tool to retrieve schema from database (use case: LLM needs to locate where in the schema to retrieve or write data to).
- Tool to retrieve data samples from database (use case: LLM needs to few shot itself/another LLM with data samples for pre-labeling).
- Tool to write data to the database (use case: after locating the part of the schema to add data to and getting the information from the user, the LLM needs to write data to that part of the database).
- Tool to create workflows on the fly (this can be done internally through the LLM. However, there needs to be other tools to collect data from the user, such as a tool to open a camera modal or tool to collect a voice recording).
- Tool to prelabel data (use case: using the tool to retrieve data to few shot an LLM to pre-label data. This can either be the orchestrator agent or another smaller or just separate LLM. )
- Tool to retrieve all tools available to the LLM.

Example workflow:

After connecting to a SQL relational database through a connection string, the LLM reads the schemas and database and 'understands' it.

Then, the user asks the LLM to "please record a new ‘bag-dump’ of fine-ware pottery sherds. "

The LLM takes in the input, reasons through it, and develops a plan/workflow (just like the o1 reasoning models do). It does so by first locating the part of the schema to add data to by calling on the tool to retrieve schema from the user's existing database. It determines that these sherds need to be 'fine ware' entry samples, and to enter it into the samples, it needs to gather the type, categorization, and weight of the sherds. From the few-shot prompt, it develops a workflow to first get the bag-dump image from the user by prompting the user to take an image, then call prelabeling tools to do object detecting and classifications on them, and have the user do HITL verification of all prelabeling and edit if necessary before commiting to the database.

After it reasons through and develops that plan, it first explains what its going to do to the user through voice and then asks the user if that sounds good and proceed. If the user responds with anything else, it will make the necessary adjustments. Afterward, it executes it by calling the tools in that order and prompting the user step by step do collect the data while also listening to the user through the whole process for questions or potential plan or workflow adjustments. After the verification, it will call on the writing tool to commit it to the database. It also logs it into a separate commitments file to keep track of all commitments and generate paradata and metadata associated with it to enable for instant rollback.


### Refined Example Workflow

After connecting and learning database.

1) User holds down and speaks: "Can you please record a new ‘bag-dump’ of fine-ware pottery sherds. "

2) Agent (reasoning loop):
- Calls introspectSchema to return the cached/hard-coded schema table.
- LLM reasons and locates part of schema to write to + information it needs to collect.
- Agent builds PLAN JSON via few-shotted PLAN templates.

  "steps": [
    { "ask": "photo", "prompt": "Please photograph the bag-dump." },

    { "tool": "VisionLabel", "args": { "model": "yolo-nano" } },

    { "ask": "voice",
      "prompt": "I found {{count}} sherds. I’ll show each close-up—confirm rim/base and weight or say 'skip'." },

    { "tool": "WriteRows" }
  ]
}

- Agent reads verbal summary to user and asks for confirmation before execution.

3) The agent continues to technically stay in the loop while calling the various tools.
- On the front end, the user is prompted to take an image by a modal.
- After the image, the LLM takes the user through each identified and prelabeled sherd in the photo by zooming in and asking for confirmation. It listens for the user's questions and potential changes to the prelabeling.
- Afterward, the LLM commits the changes to the database. Agent calls WriteRows, returns commit IDs. VersionedWriter appends {row_id, diff, thought} to audit_log.

For initial prototype + demo:
1. Stream mic-to-cloud via Whisper and GPT-4o
2. Hard code schema returned in the introspectSchema function for a demo SQL relational database.

Main points of demonstration:
1. A conversational voice agent finds the correct information to record, develops a reasonable human-in-the-loop workflow for it, and is able to write to the database.
2. Audit log exists and can be viewed. Instant rollback is possible.
3. Powerful, accurate, and versatile prelabeling to save time.

For database connection via connection string:

We can use Supabase as a thin proxy/wrapper around another Postgres (MySQL, SQL Server, etc).

To do so, we will create an empty Supabase project, install the postgres_fdw or dblink extension to bring the external tables and data from the external PostGres in, and control the REST/GraphQL as if the tables were actually local.


**Remember to add the supervisor in the FastAPI --> every user 'utterance' gets passed through the small LLM to be either classified as pass/answer/replan.**

In [None]:
!pip install langchain-openai langgraph langchain-core openai replicate

In [None]:
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

In [None]:


llm = ChatOpenAI(model="gpt-4o", temperature=0)

Tools

In [None]:
async def introspect_schema() -> Dict[str, Any]:
  """Return hard coded JSON schema file --> swap this for a live Supabase GraphQL wrapper introspection query. """
  return {
      "table": "samples",
      "columns": {
          "id": "uuid",
          "class": "text",
          "weight": "float",
          "image_url": "text",
          "created_at": "timestamptz"
      },
      "primary_key": "id"
  }

async def sample_rows(n: int = 5) -> List[Dict[str, Any]]:
  """Return data samples (for few-shotting)"""
  url = f"{SUPABASE_URL}/rest/v1/samples?limit={n}&select=*"
  r   = await http.get(url, headers=HEADERS_JSON)
  r.raise_for_status()
  return r.json()

class Detection(TypedDict):
  bbox: List[float]
  label: str
  conf: float

async def vision_label(image_url: str,
                       n_fewshot: int = 6,
                       yolo_conf: float = 0.25) -> List[Detection]:
  # 1) YOLO detect
  yolo_payload = {
      "version": YOLO_VERSION,
      "input": { "image": image_url, "conf": yolo_conf, "iou": 0.5 }
  }
  r = await http.post("https://api.replicate.com/v1/predictions",
                      headers={"Authorization": f"Token {REPLICATE_KEY}",
                                "Content-Type": "application/json"},
                      json=yolo_payload)
  r.raise_for_status()
  pred = r.json()
  while pred["status"] not in ("succeeded", "failed"):
    await asyncio.sleep(0.8)
    poll = await http.get(pred["urls"]["get"],
                          headers={"Authorization": f"Token {REPLICATE_KEY}"})
    pred = poll.json()
  if pred["status"] == "failed":
    raise RuntimeError(pred["error"])

  # YOLO output: list of {"x1":..,"y1":..,"x2":..,"y2":..,"confidence":..}
  raw_boxes = pred["output"]

  # 2) few‑shot examples from DB
  examples = await sample_rows(n_fewshot)
  classes  = list({row["class"] for row in examples})

  # Build few‑shot messages
  shots = []
  for ex in examples[:4]:
      shots.extend([
          {"role":"user",
            "content":f"bbox:[0.1,0.2,0.3,0.4] base_conf:0.88"},
          {"role":"assistant","content":ex["class"]}
      ])

  # 3) classify each detection with GPT‑4o
  out: List[Detection] = []
  for box in raw_boxes:
    prompt = [
        {"role":"system",
          "content": "You are a pottery‑sherd classifier. "
                    "Respond with exactly one label from this list:\n"
                    + ", ".join(classes)},
        *shots,
        {"role":"user",
          "content": (f"bbox:[{box['x1']:.2f},{box['y1']:.2f},"
                      f"{box['x2']:.2f},{box['y2']:.2f}] "
                      f"base_conf:{box['confidence']:.2f}") }
    ]
    comp = await openai.ChatCompletion.acreate(
                    model="gpt-4o-mini",
                    temperature=0,
                    messages=prompt)
    label = comp.choices[0].message.content.strip()
    out.append({"bbox":[box["x1"],box["y1"],box["x2"],box["y2"]],
                "label":label,
                "conf":box["confidence"]})
  return out

async def write_rows(rows: List[Dict[str, Any]]) -> Dict[str, Any]:
  """Commit multiple samples/rows to database + writes audit entry""".
  commit_id = str(uuid.uuid4())
  ts        = int(time.time())

  # 4.1 bulk insert the samples
  ins = await http.post(f"{SUPABASE_URL}/rest/v1/samples",
                        headers=HEADERS_JSON | {"Prefer": "return=representation"},
                        json=[{**row, "id": str(uuid.uuid4())} for row in rows])
  ins.raise_for_status()
  inserted = ins.json()

  # 4.2 write an audit record
  audit_body = {
      "id": commit_id,
      "timestamp": ts,
      "rows": [r["id"] for r in inserted],
      "diff": rows
  }
  await http.post(f"{SUPABASE_URL}/rest/v1/audit_log",
                  headers=HEADERS_JSON,
                  json=audit_body)

  return {"commit_id": commit_id, "row_ids": [r["id"] for r in inserted]}

#Edit frontend to include these functionalities

async def ask_photo() -> Dict[str, str]:
  """Prompts user to take a picture"""
  return {"prompt": "Please photograph the bag."}

async def user_audio(prompt: str) -> Dict[str, str]:
  """Prompts user to record audio"""
  return {"prompt": prompt}

async def user_info(prompt: str) -> Dict[str, str]:
  """Prompts user to say/type in some information"""
  return {"prompt": prompt}


tools = [
    ToolNode("IntrospectSchema", introspect_schema),
    ToolNode("SampleRows", sample_rows),
    ToolNode("VisionDetectLabel", vision_label),
    ToolNode("WriteRows", write_rows),
    ToolNode("ask_photo", ask_photo),
    ToolNode("UserAudio", user_audio),
    ToolNode("UserInfo", user_info)
]

llm_with_tools = llm.bind_tools(tools)

Nodes

In [None]:
#Converts user message to structured intent; ask follow-up question if unclear
def intent_node(state):
  user_msg = state["messages"][-1]["content"]
  sys_prompt = "Extract user intent as JSON {task_type,domain}"
  intent = llm_with_tools.invoke(sys_prompt + user_msg).json()
  return {"intent": intent, "thinking": "parsed intent"}

#Takes in structured intent and cached schema and creates a PLAN JSON workflow w/ self-explanation/chain-of-thought
def planner_node(state):
  schema = state.get("schema") or introspect_schema(None) #retrieves schmea
  plan_prompt = ChatPromptTemplate.from_messages([
      ("system","You are a planner, develop workflow according to user query or intent and the database schema. Here's a template to follow (). "), ("user", "{intent}"), ("system", "{schema}")
  ])
  plan = llm_with_tools.invoke(
      plan_prompt.format(intent=state["intent"], schema=schema)
  ).json()
  return {"plan": plan, "thinking": "built plan"}

#Walks through the JSON PLAN steps (add more for future demos)
def runner_node(state):
  for step in plan["steps"]:
    if step.get("ask") == "photo":
      yield {"tool_events":["ask_photo"]}
    elif step.get("tool") == "VisionDetectLabel":
      detections = vision_label(image_url)
      yield {"tool_events":[{"VisionDetectLabel": detections}],
            "rows_pending": convert(detections)}
  return {}

#Commits to database
def writer_node(state):
  commit = write_rows(state["rows_pending"])
  return {"commit_ids": commit}

sg = StateGraph()
sg.add_node("Intent", intent_node)
sg.add_node("Planner", planner_node)
sg.add_node("Runner", runner_node)
sg.add_node("Writer", writer_node)

sg.add_edge(START, "Intent")
sg.add_edge("Intent", "Planner")
sg.add_edge("Planner", "Runner")
sg.add_edge("Runner", "Writer")
sg.add_edge("Writer", END)

agent = sg.compile()