Skip to content

feat(streaming-v3) Add AsyncStreamingClient#197

Closed
aurpsis-aai wants to merge 3 commits into
masterfrom
aurpsis/async-streaming-client
Closed

feat(streaming-v3) Add AsyncStreamingClient#197
aurpsis-aai wants to merge 3 commits into
masterfrom
aurpsis/async-streaming-client

Conversation

@aurpsis-aai
Copy link
Copy Markdown

Summary

Adds AsyncStreamingClient — an asyncio-native counterpart to the existing thread-based StreamingClient. The public API mirrors the sync version one-to-one (connect / disconnect / stream / set_params / force_endpoint / on / create_temporary_token) but as coroutines. Zero changes to the sync API.

import asyncio
from assemblyai.streaming.v3 import (
    AsyncStreamingClient, StreamingClientOptions, StreamingEvents,
    StreamingParameters, SpeechModel,
)

async def main():
    client = AsyncStreamingClient(
        StreamingClientOptions(api_key="...", api_host="streaming.assemblyai.com")
    )
    client.on(StreamingEvents.Turn, lambda c, turn: print(turn.transcript))
    await client.connect(StreamingParameters(sample_rate=16000, speech_model=SpeechModel.universal_streaming_english))
    await client.stream(audio_bytes)
    await client.disconnect(terminate=True)

asyncio.run(main())

Handlers may be plain callables or async functions — async handlers are awaited inline by the single internal read task.

Design notes

  • Shared with sync client: URL/header construction, param normalization, message parsing, connection-closed error mapping. Extracted into _build_uri / _build_headers / _emit_param_warnings in client.py.
  • The sync client's _pending_close_error cross-thread hand-off (needed because recv(timeout=1) polls) is deleted entirely in the async version. await ws.recv() raises ConnectionClosed immediately on socket close, so the read task always sees the close naturally and remains the sole handler-dispatch site — same error-dedup behavior, less machinery.
  • disconnect() cancels the read/write tasks after _stop_event is set (and after letting TerminateSession drain when terminate=True).
  • stream() accepts bytes, sync Iterable[bytes], or AsyncIterable[bytes].
  • Imports websockets.asyncio.client.connect when available (websockets >= 13) and falls back to websockets.client.connect for the supported lower bound (websockets>=11.0 per setup.py).
  • Zero new runtime deps; adds pytest-asyncio to tox.ini test deps only.

Files

Change File
Added assemblyai/streaming/v3/async_client.py
Added tests/unit/test_streaming_async.py (15 tests)
Modified assemblyai/streaming/v3/__init__.py (export AsyncStreamingClient)
Modified assemblyai/streaming/v3/client.py (extract shared helpers)
Modified tox.ini (add pytest-asyncio)

Test plan

  • pytest tests/unit/test_streaming_async.py — 15 new tests pass
  • pytest tests/unit/test_streaming.py — 33 existing sync tests pass (regression)
  • ruff format --check + ruff check clean on changed files
  • mypy --follow-imports=silent --ignore-missing-imports clean on changed files (matches CI lint)
  • CI green on Python 3.9 / 3.10 / 3.11

🤖 Generated with Claude Code

aurpsis-aai and others added 2 commits May 11, 2026 15:50
Adds an asyncio-native counterpart to the thread-based StreamingClient.
Mirrors the public API one-to-one (connect / disconnect / stream /
set_params / force_endpoint / on / create_temporary_token) as
coroutines. Event handlers may be plain callables or async functions —
async handlers are awaited inline by the single internal read task.

Shared with the sync client: URL/header construction, parameter
normalization, message parsing, and connection-closed error mapping.
The async client deletes the cross-thread `_pending_close_error`
hand-off — asyncio's `await ws.recv()` raises `ConnectionClosed`
immediately on socket close, so the read task always sees the close
naturally and remains the sole dispatch site.

- New `assemblyai/streaming/v3/async_client.py` (`AsyncStreamingClient`,
  `_AsyncHTTPClient`)
- Exported from `assemblyai.streaming.v3`
- Helper extractions in `client.py` (`_build_uri`, `_build_headers`,
  `_emit_param_warnings`) reused by both clients
- `tests/unit/test_streaming_async.py` covering connect, stream
  (bytes / sync iter / async iter), terminate, dedup, clean close,
  handler errors, sync+async handler mix, and temporary tokens
- `pytest-asyncio` added to tox test deps

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The new asyncio client API (websockets >= 13) takes ``additional_headers``;
the legacy ``websockets.client.connect`` on websockets 11/12 takes
``extra_headers``. Resolve once at import time and route through the
``websocket_connect_async`` wrapper so callers + tests see one entry
point. Required for the websockets-11.0 leg of the tox matrix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an asyncio-native AsyncStreamingClient to assemblyai.streaming.v3 that mirrors the existing thread-based StreamingClient API, while refactoring shared URI/header/warning helpers into client.py and adding async unit tests.

Changes:

  • Added AsyncStreamingClient implementation (async connect/disconnect/stream/etc.) with mixed sync+async event handlers.
  • Extracted shared _build_uri, _build_headers, and _emit_param_warnings helpers from the sync client for reuse.
  • Added async-focused unit tests and included pytest-asyncio in the tox test dependencies.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
tox.ini Adds pytest-asyncio to support running the new async unit tests.
tests/unit/test_streaming_async.py Introduces async unit tests for the new client (connection, streaming, handlers, error paths).
assemblyai/streaming/v3/client.py Refactors shared URI/header/warning helpers for sync+async clients.
assemblyai/streaming/v3/async_client.py Implements the new asyncio-native streaming client and async token helper.
assemblyai/streaming/v3/__init__.py Exports AsyncStreamingClient from the package.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +116 to +152
async def connect(self, params: StreamingParameters) -> None:
_emit_param_warnings(params)

uri = _build_uri(self._options.api_host, params)
headers = _build_headers(self._options)

try:
self._websocket = await asyncio.wait_for(
websocket_connect_async(uri, additional_headers=headers),
timeout=15,
)
except websockets.exceptions.InvalidStatus as exc:
status_code = getattr(getattr(exc, "response", None), "status_code", None)
await self._report_connection_closed(
StreamingError(
message=f"WebSocket handshake rejected (HTTP {status_code})",
code=status_code,
)
)
return
except (
websockets.exceptions.InvalidHandshake,
websockets.exceptions.ConnectionClosed,
OSError,
asyncio.TimeoutError,
TimeoutError,
) as exc:
await self._report_connection_closed(exc)
return

self._read_task = asyncio.create_task(
self._read_loop(), name="AsyncStreamingClient._read_loop"
)
self._write_task = asyncio.create_task(
self._write_loop(), name="AsyncStreamingClient._write_loop"
)

Comment on lines +433 to +450
class _AsyncHTTPClient:
def __init__(self, api_host: str, api_key: Optional[str] = None):
vi = sys.version_info
python_version = f"{vi.major}.{vi.minor}.{vi.micro}"
user_agent = (
f"{httpx._client.USER_AGENT} AssemblyAI/1.0 "
f"(sdk=Python/{__version__} runtime_env=Python/{python_version})"
)

headers = {"User-Agent": user_agent}

if api_key:
headers["Authorization"] = api_key

self._http_client = httpx.AsyncClient(
base_url="https://" + api_host,
headers=headers,
)
Comment thread tests/unit/test_streaming_async.py Outdated
Comment on lines +86 to +94
deadline = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < deadline:
read_done = client._read_task is None or client._read_task.done()
write_done = client._write_task is None or client._write_task.done()
if read_done and write_done and client._stop_event.is_set():
return
await asyncio.sleep(0.01)


Comment on lines +265 to +267
if self._stop_event.is_set():
return

Four fixes from the Copilot review of PR #197:

1. Read-loop race: the write task only set ``_stop_event`` on
   ``ConnectionClosed`` and trusted the read task's next ``recv()`` to
   raise. But the read task checks stop at the top of its loop, and if
   it processed a buffered message between recv calls and saw stop
   before the next ``recv()``, it would exit silently with the close
   undispatched. The write task now calls
   ``await self._report_connection_closed(exc)`` directly — that
   method's flag check + set is synchronous (no ``await`` between
   them), so the read task's parallel call is a safe no-op.

2. Reconnect guard: ``connect()`` now raises ``RuntimeError`` if the
   client already has an active websocket or read task, matching the
   sync client's "single-use" lifecycle.

3. HTTP client cleanup: ``_AsyncHTTPClient`` gains ``aclose()`` and
   ``disconnect()`` calls it, so ``httpx.AsyncClient`` doesn't leak
   its connection pool and trigger "unclosed client" warnings.

4. Test helper: ``_wait_for_tasks`` now raises ``AssertionError`` on
   timeout instead of returning silently (so stalls fail tests
   deterministically), and uses ``asyncio.get_running_loop().time()``.

Adds three regression tests covering each behavioral fix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@aurpsis-aai
Copy link
Copy Markdown
Author

Closing — the SDK repo is a Copybara mirror of assemblyai/developer_tools/python/sdk/ in the DeepLearning monorepo (PRs here are auto-generated by the sync_out workflow and titled chore: sync sdk code with DeepLearning repo). This work was redone at the source of truth in https://github.com/AssemblyAI/DeepLearning/pull/16402, and will land here automatically via the next Copybara sync after that merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants