Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,59 @@ For direct `chat.completions.create(...)`, pass the wrapped OpenAI-style

Use `DotTxt.models.list()` and `AsyncDotTxt.models.list()` for model listing.

## Streaming Fields

`AsyncDotTxt.stream(...)` yields `PatchEvent` objects as the model fills in a
schema-constrained response. The wire format is the gateway's `stream: "patch"`
mode (RFC 6902 JSON Patch over NDJSON).

Each event carries the raw op (`event.op`) and an independent deep copy of the
document so far (`event.snapshot`). For the common case of reacting to one
field at a time, use the demux properties: `event.field` is the JSON Pointer with
the leading `/` stripped (`"intent"`, `"steps/0"`, `"address/city"`), and
`event.value` is the op's value.

```python
import asyncio
from typing import Literal

from pydantic import BaseModel

from dottxt import AsyncDotTxt


class SupportTicket(BaseModel):
# Field order = arrival order. Put what unblocks downstream work first.
intent: Literal["billing", "technical", "account"]
urgency: Literal["low", "medium", "high", "critical"]
reply: str


async def main() -> None:
client = AsyncDotTxt()
stream = client.stream(
model="openai/gpt-oss-20b",
response_format=SupportTicket,
input="I was charged twice this month, please refund the duplicate.",
)
async for event in stream:
match event.field:
case "intent":
print(f"dispatching to {event.value} queue")
case "urgency" if event.value == "critical":
print("paging oncall")
case "reply":
print(f"reply: {event.value}")


asyncio.run(main())
```

The routing decision can fire tens of milliseconds into generation while
`reply` continues to stream. See
[docs/client.md](docs/client.md#streaming-fields-patch-stream) for the full
reference.

## OpenAI-Compatible Usage

Use `DotTxt` when you want an OpenAI-style client surface with
Expand Down Expand Up @@ -224,3 +277,8 @@ The compatibility surface expects the wrapped OpenAI-style
- [Use a Genson schema builder to generate](examples/generate_genson.py)
- [List available models](examples/list_models.py)
- [OpenAI-Compatible chat completions](examples/openai_chat_completions.py)
- [Stream fields as they arrive](examples/stream_field_printer.py)
- [Route on /intent before /reply finishes](examples/stream_early_routing.py)
- [Mid-stream human approval](examples/stream_hitl_approval.py)
- [Fan out research on each /steps/N](examples/stream_fanout.py)
- [Reconstruct the document from raw patch ops](examples/stream_reconstruct.py)
14 changes: 14 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Output rules:
targeted error with guidance to run `dottxt models` and set `DOTTXT_MODEL`
or pass `--model`

<<<<<<< HEAD
### `dottxt schema check`

Validate a schema file as JSON Schema.
Expand All @@ -82,3 +83,16 @@ Validate a schema file as JSON Schema.
- `<schema-file>`: JSON file path to validate
- `--json`: emits structured payload including `status` and `schema_file`
- Errors follow the shared `--json` error envelope when enabled
=======
### `dottxt stream`

Stream one generation as it is produced using JSON Patch RFC 6902.

- `-m, --model TEXT`: model id (required unless `DOTTXT_MODEL` is set)
- `-s, --schema FILE`: schema file path (required)
- `[PROMPT]`: literal prompt text (falls back to stdin, same rules as `generate`)

Output rules:

- stdout: one RFC 6902 `add` op per line (NDJSON), in arrival order.
>>>>>>> d5cc04a (Add JSON Patch streaming support)
100 changes: 98 additions & 2 deletions docs/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Requires Python 3.10+.
The client takes two arguments. Each is read from the constructor first, then
from the environment.

- `api_key` (`str | None`): falls back to `DOTTXT_API_KEY`. Required the
- `api_key` (`str | None`): falls back to `DOTTXT_API_KEY`. Required, the
constructor raises `ValueError` if neither is set.
- `base_url` (`str | None`): falls back to `DOTTXT_BASE_URL`, then to
`https://api.dottxt.ai/v1`.
Expand Down Expand Up @@ -209,11 +209,99 @@ result = client.generate(
print(result) # {'severity': 'high', 'team': 'checkout'}
```

## Streaming Fields (Patch Stream)

`AsyncDotTxt.stream(...)` yields `PatchEvent` objects as the model fills in a
schema-constrained response. It is built on the gateway's `stream: "patch"`
mode, which emits RFC 6902 JSON Patch operations in schema order, so
downstream work can start the moment a field arrives, without waiting for
the closing brace.

Parameters mirror `generate(...)`:

- `model` (`str`)
- `input` (`str | list[dict]`)
- `response_format` (`Any`) — any schema input accepted by `generate(...)`
- `temperature`, `max_tokens`, `seed`, `timeout` — optional
- `extra` (`dict | None`) — extra chat-completions body fields

Each `PatchEvent` carries:

- `event.op` — the raw RFC 6902 operation (`{"op": "add", "path": ..., "value": ...}`).
The dottxt API only ever emits `add` ops; `dottxt.apply_add(doc, path, value)`
folds one into a object in place, returning the (possibly new) root.
- `event.snapshot` — an independent deep copy of the JSON object built up to
and including this op
- `event.field` / `event.value` — `field` is the JSON Pointer with the leading `/` stripped
(`"intent"`, `"steps/0"`, `"address/city"`). `value` contains the current field content,
including empty lists `[]` or dictionary `{}` values.

```python
import asyncio
from typing import Literal
from pydantic import BaseModel
from dottxt import AsyncDotTxt

class SupportTicket(BaseModel):
# Field order = arrival order. Put what unblocks downstream work first.
intent: Literal["billing", "technical", "account"]
urgency: Literal["low", "medium", "high", "critical"]
reply: str

async def main():
client = AsyncDotTxt()
stream = client.stream(
model="openai/gpt-oss-20b",
response_format=SupportTicket,
input="I was charged twice this month, please refund the duplicate.",
)
async for event in stream:
match event.field:
case "intent":
asyncio.create_task(dispatch_to_queue(event.value))
case "urgency" if event.value == "critical":
asyncio.create_task(page_oncall())
case "reply":
await send(event.value)

asyncio.run(main())
```

The routing decision fires the moment `intent` arrives, typically tens of
milliseconds in while `reply` continues to stream. If you need the full
object so far (e.g. to log progress or hand a partial object to another
service), use `event.snapshot`.

To drive your own state from the raw ops instead of the snapshot (e.g. to
mirror the object into a store of your own) fold each `event.op` in with
`apply_add`:

```python
from dottxt import AsyncDotTxt, apply_add

async def main():
client = AsyncDotTxt()
doc = {} # the stream's first op is the root seed
stream = client.stream(
model="openai/gpt-oss-20b",
response_format=SupportTicket,
input="I was charged twice this month, please refund the duplicate.",
)
async for event in stream:
doc = apply_add(doc, event.op["path"], event.op["value"])
print(doc)
```

Errors:

- `dottxt.PatchStreamError`: raised when the gateway returns a non-200
status. Exposes `status_code` and `body`.

## OpenAI-Compatible Text Generation

If you prefer the standard OpenAI SDK surface, you can call
`chat.completions.create(...)` directly. The client passes the call through
unchanged and returns the raw chat completion object parsing and
unchanged and returns the raw chat completion object, parsing and
validation are up to the caller.

For structured output, pass the wrapped OpenAI-style `response_format`
Expand Down Expand Up @@ -268,3 +356,11 @@ Runnable examples live in the [`examples/`](../examples) directory:
- [`list_models.py`](../examples/list_models.py): list available models
- [`openai_chat_completions.py`](../examples/openai_chat_completions.py): use
the OpenAI-compatible `chat.completions.create` surface
- [`stream_field_printer.py`](../examples/stream_field_printer.py): minimal
`stream` demo — print each leaf field and value as it lands
- [`stream_early_routing.py`](../examples/stream_early_routing.py): route on
`/intent` while `/reply` is still streaming
- [`stream_hitl_approval.py`](../examples/stream_hitl_approval.py): approve a
proposed action mid-stream and discard the reply if the operator declines
- [`stream_fanout.py`](../examples/stream_fanout.py): fan research tasks out
on each `/steps/N` as the planner emits them
94 changes: 94 additions & 0 deletions examples/stream_early_routing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Route on /intent before /reply finishes.

The schema is ordered ``intent`` → ``urgency`` → ``reply``. Because dottxt
streams fields in schema order, the routing decision fires the moment
``intent`` arrives, typically tens of milliseconds in, while the model
continues generating the (much longer) ``reply``.

What to watch in the output: the ``-> dispatched ...`` and
``-> paged oncall ...`` lines arrive well before the final reply line.
The elapsed-time prefix on the reply line is the punchline, how much later
the full message lands compared to when routing was already settled.

Usage:
DOTTXT_API_KEY=sk-... python examples/stream_early_routing.py
"""

import asyncio
import time
from typing import Literal

from pydantic import BaseModel, Field

from dottxt import AsyncDotTxt


class SupportTicket(BaseModel):
"""A triaged support reply.

Field order is significant: earlier fields arrive first and unblock
downstream work that does not depend on later fields.
"""

intent: Literal["billing", "technical", "account", "feedback"]
urgency: Literal["low", "medium", "high", "critical"]
reply: str = Field(max_length=400)


async def route_to_billing(ticket_id: str) -> None:
"""Dispatch the ticket to the billing queue (stub)."""
print(f" -> dispatched {ticket_id} to billing queue")


async def route_to_technical(ticket_id: str) -> None:
"""Dispatch the ticket to the technical queue (stub)."""
print(f" -> dispatched {ticket_id} to technical queue")


async def page_oncall(ticket_id: str) -> None:
"""Page the on-call engineer (stub)."""
print(f" -> paged oncall for {ticket_id}")


async def main() -> None:
"""Run the example."""
ticket_id = "TKT-8821"
user_message = (
"I was charged twice for my subscription this month and the second "
"charge doesn't appear in my invoice list. Please refund the duplicate."
)

client = AsyncDotTxt()
started = time.monotonic()
try:
stream = client.stream(
model="openai/gpt-oss-20b",
response_format=SupportTicket,
input=[
{
"role": "system",
"content": "Triage support tickets and draft a reply.",
},
{"role": "user", "content": user_message},
],
max_tokens=400,
)
async for event in stream:
match event.field:
# Fire-and-forget: routing kicks off while /reply is still
# streaming.
case "intent" if event.value == "billing":
asyncio.create_task(route_to_billing(ticket_id))
case "intent" if event.value == "technical":
asyncio.create_task(route_to_technical(ticket_id))
case "urgency" if event.value == "critical":
asyncio.create_task(page_oncall(ticket_id))
case "reply":
elapsed_ms = int((time.monotonic() - started) * 1000)
print(f"reply ({elapsed_ms}ms): {event.value}")
finally:
await client.close()


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading