Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 167 additions & 59 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -699,79 +699,187 @@ for result in transcript.auto_highlights.results:

### **Streaming Examples**

[Read more about our streaming service.](https://www.assemblyai.com/docs/streaming/universal-3-pro)
Real-time speech-to-text via WebSocket against the `u3-rt-pro` model. The SDK ships two clients with identical option/event/handler surfaces — `StreamingClient` (threaded) and `AsyncStreamingClient` (asyncio). Pick whichever fits your codebase.

**Handler contract**: every handler is called as `handler(client, event)`. Plain functions and `async def` functions both work; `AsyncStreamingClient` awaits async handlers inline on the read task, so don't block — use `asyncio.create_task(...)` if you need concurrent work.

[Read more about the streaming service.](https://www.assemblyai.com/docs/streaming/universal-3-pro)

<details>
<summary>Stream a local file (sync)</summary>

```python
import assemblyai as aai
from assemblyai.streaming.v3 import (
BeginEvent, StreamingClient, StreamingClientOptions, StreamingError,
StreamingEvents, StreamingParameters, TerminationEvent, TurnEvent,
)

def on_begin(client, event: BeginEvent):
print(f"Session started: {event.id}")

def on_turn(client, event: TurnEvent):
print(f"{event.transcript} (end_of_turn={event.end_of_turn})")

def on_terminated(client, event: TerminationEvent):
print(f"Done: {event.audio_duration_seconds}s of audio processed")

def on_error(client, error: StreamingError):
print(f"Error: {error} (code={error.code})")

client = StreamingClient(StreamingClientOptions(api_key="<YOUR_API_KEY>"))
client.on(StreamingEvents.Begin, on_begin)
client.on(StreamingEvents.Turn, on_turn)
client.on(StreamingEvents.Termination, on_terminated)
client.on(StreamingEvents.Error, on_error)

client.connect(StreamingParameters(
sample_rate=16000, speech_model="u3-rt-pro", format_turns=True,
))
try:
client.stream(aai.extras.stream_file(filepath="audio.wav", sample_rate=16000))
finally:
client.disconnect(terminate=True)
```

</details>

<details>
<summary>Stream your microphone in real-time</summary>
<summary>Stream your microphone (sync)</summary>

`MicrophoneStream` requires PyAudio:

```bash
pip install -U assemblyai
pip install -U "assemblyai[extras]"
```

```python
import logging
from typing import Type

import assemblyai as aai
from assemblyai.streaming.v3 import (
BeginEvent,
StreamingClient,
StreamingClientOptions,
StreamingError,
StreamingEvents,
StreamingParameters,
TurnEvent,
TerminationEvent,
StreamingClient, StreamingClientOptions, StreamingEvents, StreamingParameters,
)

api_key = "<YOUR_API_KEY>"
def on_turn(client, event):
print(f"{event.transcript} (end_of_turn={event.end_of_turn})")

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
client = StreamingClient(StreamingClientOptions(api_key="<YOUR_API_KEY>"))
client.on(StreamingEvents.Turn, on_turn)
client.connect(StreamingParameters(sample_rate=16000, speech_model="u3-rt-pro"))

def on_begin(self: Type[StreamingClient], event: BeginEvent):
print(f"Session started: {event.id}")
try:
client.stream(aai.extras.MicrophoneStream(sample_rate=16000))
finally:
client.disconnect(terminate=True)
```

</details>

<details>
<summary>Stream a local file (async)</summary>

`AsyncStreamingClient` mirrors `StreamingClient` with async methods. It's safe to use as an async context manager — `disconnect()` runs on block exit even if user code raises. Don't pass `extras.stream_file` directly (it uses blocking `time.sleep`); pace from an async generator instead.

```python
import asyncio
from assemblyai.streaming.v3 import (
AsyncStreamingClient, StreamingClientOptions, StreamingEvents, StreamingParameters,
)

async def stream_file_async(path: str, sample_rate: int, chunk_duration: float = 0.3):
bytes_per_chunk = int(sample_rate * chunk_duration) * 2
with open(path, "rb") as f:
while chunk := f.read(bytes_per_chunk):
yield chunk
await asyncio.sleep(chunk_duration)

async def on_turn(client, event):
print(f"{event.transcript} (end_of_turn={event.end_of_turn})")

async def main():
async with AsyncStreamingClient(StreamingClientOptions(api_key="<YOUR_API_KEY>")) as client:
client.on(StreamingEvents.Turn, on_turn)
await client.connect(StreamingParameters(
sample_rate=16000, speech_model="u3-rt-pro", format_turns=True,
))
await client.stream(stream_file_async("audio.wav", 16000))

asyncio.run(main())
```

</details>

def on_turn(self: Type[StreamingClient], event: TurnEvent):
print(f"{event.transcript} ({event.end_of_turn})")

def on_terminated(self: Type[StreamingClient], event: TerminationEvent):
print(
f"Session terminated: {event.audio_duration_seconds} seconds of audio processed"
)

def on_error(self: Type[StreamingClient], error: StreamingError):
print(f"Error occurred: {error}")

def main():
client = StreamingClient(
StreamingClientOptions(
api_key=api_key,
api_host="streaming.assemblyai.com",
)
)

client.on(StreamingEvents.Begin, on_begin)
client.on(StreamingEvents.Turn, on_turn)
client.on(StreamingEvents.Termination, on_terminated)
client.on(StreamingEvents.Error, on_error)

client.connect(
StreamingParameters(
sample_rate=16000,
speech_model="u3-rt-pro",
)
)

try:
client.stream(
aai.extras.MicrophoneStream(sample_rate=16000)
)
finally:
client.disconnect(terminate=True)

if __name__ == "__main__":
main()
<details>
<summary>Handle errors</summary>

Server-side errors arrive on the `Error` event rather than being raised. The handler receives a `StreamingError` (an `Exception` subclass) with `.code: int | None` — **not** the wire `ErrorEvent` class.

`StreamingErrorCodes` is a `dict[int, str]` mapping wire codes to human-readable messages. Use `.get(...)` for lookup:

```python
from assemblyai.streaming.v3 import StreamingErrorCodes

def on_error(client, error):
message = StreamingErrorCodes.get(error.code, str(error))
print(f"Streaming error {error.code}: {message}")
```

Common codes: `4001` Not Authorized, `4002` Insufficient Funds, `4029` Client sent audio too fast, `4031` Session idle for too long.

</details>

<details>
<summary>Change settings mid-session</summary>

`set_params` updates an active session. Typical use: enable turn formatting (punctuation, casing) only on confirmed end-of-turn so partial transcripts stay raw:

```python
from assemblyai.streaming.v3 import StreamingSessionParameters

def on_turn(client, event):
if event.end_of_turn and not event.turn_is_formatted:
client.set_params(StreamingSessionParameters(format_turns=True))
```

For voice agents, `force_endpoint()` flushes the current turn — useful when an external signal (UI button, barge-in detection) determines the user has stopped speaking before VAD does:

```python
client.force_endpoint() # ends the current turn immediately
```

</details>

<details>
<summary>Temporary tokens for browser / edge clients</summary>

Don't ship your API key to browsers. Mint a short-lived token server-side and pass it to the client.

**Sync server (Flask / WSGI / scripts):**
```python
client = StreamingClient(StreamingClientOptions(api_key="<YOUR_API_KEY>"))
token = client.create_temporary_token(expires_in_seconds=60)
# Send `token` to the browser, which connects with options(token=token).
```

**Async server (FastAPI / asyncio):** always wrap in `async with` even though you don't call `connect()` — `create_temporary_token` lazily opens an `httpx.AsyncClient` pool. The context manager closes it on exit; without it you leak a pool every request.

```python
from fastapi import FastAPI
from assemblyai.streaming.v3 import AsyncStreamingClient, StreamingClientOptions

app = FastAPI()
MASTER_KEY = "<YOUR_API_KEY>"

@app.get("/streaming-token")
async def streaming_token():
async with AsyncStreamingClient(StreamingClientOptions(api_key=MASTER_KEY)) as client:
return {"token": await client.create_temporary_token(expires_in_seconds=60)}
```

**Browser / edge client:** pass the token via `StreamingClientOptions(token=...)`:

```python
client = StreamingClient(StreamingClientOptions(token="<TOKEN_FROM_SERVER>"))
client.connect(StreamingParameters(sample_rate=16000, speech_model="u3-rt-pro"))
```

</details>
Expand Down
19 changes: 1 addition & 18 deletions assemblyai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
from .__version__ import __version__
from .client import Client
from .lemur import Lemur
from .transcriber import RealtimeTranscriber, Transcriber, Transcript, TranscriptGroup
from .transcriber import Transcriber, Transcript, TranscriptGroup
from .types import (
AssemblyAIError,
AudioEncoding,
AutohighlightResponse,
AutohighlightResult,
Chapter,
Expand Down Expand Up @@ -47,13 +46,6 @@
PIIRedactionPolicy,
PIISubstitutionPolicy,
RawTranscriptionConfig,
RealtimeError,
RealtimeFinalTranscript,
RealtimePartialTranscript,
RealtimeSessionInformation,
RealtimeSessionOpened,
RealtimeTranscript,
RealtimeWord,
RedactPiiAudioOptions,
Sentence,
Sentiment,
Expand Down Expand Up @@ -93,7 +85,6 @@
__all__ = [
# types
"AssemblyAIError",
"AudioEncoding",
"AutohighlightResponse",
"AutohighlightResult",
"Chapter",
Expand Down Expand Up @@ -170,14 +161,6 @@
"Word",
"WordBoost",
"WordSearchMatch",
"RealtimeTranscriber",
"RealtimeError",
"RealtimeFinalTranscript",
"RealtimePartialTranscript",
"RealtimeSessionInformation",
"RealtimeSessionOpened",
"RealtimeTranscript",
"RealtimeWord",
# package globals
"settings",
# packages
Expand Down
2 changes: 1 addition & 1 deletion assemblyai/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.64.2"
__version__ = "0.64.3"
23 changes: 0 additions & 23 deletions assemblyai/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
ENDPOINT_UPLOAD = "/v2/upload"
ENDPOINT_LEMUR_BASE = "/lemur/v3"
ENDPOINT_LEMUR = f"{ENDPOINT_LEMUR_BASE}/generate"
ENDPOINT_REALTIME_WEBSOCKET = "/v2/realtime/ws"
ENDPOINT_REALTIME_TOKEN = "/v2/realtime/token"


def _get_error_message(response: httpx.Response) -> str:
Expand Down Expand Up @@ -415,24 +413,3 @@ def lemur_get_response_data(
return types.LemurQuestionResponse.parse_obj(json_data)

return types.LemurStringResponse.parse_obj(json_data)


def create_temporary_token(
client: httpx.Client,
request: types.RealtimeCreateTemporaryTokenRequest,
http_timeout: Optional[float],
) -> str:
response = client.post(
f"{ENDPOINT_REALTIME_TOKEN}",
json=request.dict(exclude_none=True),
timeout=http_timeout,
)

if response.status_code != httpx.codes.OK:
raise types.AssemblyAIError(
f"Failed to create temporary token: {_get_error_message(response)}",
response.status_code,
)

data = types.RealtimeCreateTemporaryTokenResponse.parse_obj(response.json())
return data.token
4 changes: 4 additions & 0 deletions assemblyai/streaming/v3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .async_client import AsyncStreamingClient
from .client import StreamingClient
from .models import (
BeginEvent,
Expand All @@ -9,6 +10,7 @@
SpeechStartedEvent,
StreamingClientOptions,
StreamingError,
StreamingErrorCodes,
StreamingEvents,
StreamingParameters,
StreamingPiiPolicy,
Expand All @@ -21,6 +23,7 @@
)

__all__ = [
"AsyncStreamingClient",
"BeginEvent",
"Encoding",
"EventMessage",
Expand All @@ -31,6 +34,7 @@
"StreamingClient",
"StreamingClientOptions",
"StreamingError",
"StreamingErrorCodes",
"StreamingEvents",
"StreamingParameters",
"StreamingPiiPolicy",
Expand Down
Loading
Loading