diff --git a/src/data/nav/aitransport.ts b/src/data/nav/aitransport.ts index a0cea2f5cc..68c0142ce6 100644 --- a/src/data/nav/aitransport.ts +++ b/src/data/nav/aitransport.ts @@ -27,6 +27,15 @@ export default { }, ], }, + { + name: 'Guides', + pages: [ + { + name: 'Using the OpenAI SDK', + link: '/docs/guides/ai-transport/openai-sdk', + }, + ], + }, ], api: [], } satisfies NavProduct; diff --git a/src/pages/docs/guides/ai-transport/openai-sdk.mdx b/src/pages/docs/guides/ai-transport/openai-sdk.mdx new file mode 100644 index 0000000000..5ad939bf12 --- /dev/null +++ b/src/pages/docs/guides/ai-transport/openai-sdk.mdx @@ -0,0 +1,532 @@ +--- +title: "Guide: Token streaming using Ably and the OpenAI SDK" +meta_description: "Stream AI model responses from the OpenAI SDK over Ably with real-time token delivery." +meta_keywords: "AI, token streaming, OpenAI, Responses API, AI transport, Ably, realtime" +--- + +This guide demonstrates how to stream token-by-token responses from OpenAI's Responses API over Ably's realtime messaging platform. This pattern enables you to broadcast AI model responses to multiple clients simultaneously, with clear message boundaries for handling response lifecycle. + +By using Ably to distribute tokens from the OpenAI SDK, you can scale to thousands of concurrent subscribers, decouple your AI inference from client connections, and maintain reliable message delivery with ordering guarantees. This guide implements the [message-per-token with explicit start/stop events](/docs/ai-transport/features/token-streaming/message-per-token#explicit-events) pattern, which provides clear lifecycle management for each AI response. + + + +## Prerequisites + +To follow this guide, you'll need: + +**Software requirements:** +- Node.js 20 or higher + +**Account requirements:** +- An OpenAI API key +- An Ably API key + +**Useful links:** +- [OpenAPI developer quickstart](https://platform.openai.com/docs/quickstart) +- [Ably JavaScript SDK getting started](/docs/getting-started/javascript) + +## Setting up your environment + +Create a new NPM package. This will contain our publisher and subscriber code: + + +```shell +mkdir ably-openai-example && cd ably-openai-example +npm init -y +``` + + +Install the required packages using NPM: + + +```shell +npm install openai@^4 ably@^2 +``` + + + + +Export your OpenAPI key to the environment; the OpenAI SDK will automatically read it later: + + +```shell +export OPENAI_API_KEY="your_api_key_here" +``` + + +## Initializing the clients + +### Publisher: Initializing the Ably and OpenAI clients + +Initialize both the Ably and OpenAI clients. The Ably Realtime client maintains a persistent connection to the Ably service, allowing you to publish tokens at high message rates with low latency. The OpenAI client provides access to the Responses API. + +Create a new file `publisher.mjs` with the following contents: + + +```javascript +import Ably from 'ably'; +import OpenAI from 'openai'; + +// Initialize Ably Realtime client +const realtime = new Ably.Realtime({ key: '{{API_KEY}}' }); + +// Initialize OpenAI client +const openai = new OpenAI(); + +// Create a channel for publishing streamed AI responses +const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); +``` + + +### Subscriber: Initializing the Ably client + +Initialize the Ably client, in the same way as you did for the publisher. The Ably subscriber will receive streamed AI response events in realtime. + +Create a new file `subscriber.mjs` with the following contents: + + +```javascript +import Ably from 'ably'; + +// Initialize Ably Realtime client +const realtime = new Ably.Realtime({ key: '{{API_KEY}}' }); + +// Create a channel for subscribing to streamed AI responses +const channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}'); +``` + + +## Understanding Responses API events + +OpenAI's Responses API streams model output as a series of events when you set `stream: true` ([OpenAI - streaming API responses](https://platform.openai.com/docs/guides/streaming-responses)). Each streaming event includes a `type`, which describes the event type, and a full output response is made up of the following events: + +1. **Stream start**: the `response.created` event signals the start of the stream +2. **Output item start**: the response will be sent in one or more output item blocks. The start of an output item is signalled by the `response.output_item.added` event +3. **Content part**: `response.content_part.start` signals the start of a piece of content within the output item +4. **Text delta**: multiple `response.output_text.delta` events contain the incremental text of the response (tokens) +5. **Content completion**: the `response.output_text.done`, `response.content_part.done` and `response.output_item.done` events signal the end of each block in the response +6. **Stream completion**: the `response.completed` event signals that the stream is complete + + + +For more details on streaming events, see the [OpenAI Responses API streaming documentation](https://platform.openai.com/docs/api-reference/responses-streaming/response). + +## Publishing tokens to Ably + +To stream OpenAI responses over Ably, your publisher iterates through the streaming events received from OpenAI and publishes each token to an Ably channel. This implementation follows the [explicit start/stop events pattern](/docs/ai-transport/features/token-streaming/message-per-token#explicit-events), which provides clear response boundaries. + +The pattern works as follows: +- **First event**: Publish a `start` message to signal response beginning +- **Each delta**: Publish a `token` message with the incremental text content +- **Stream end**: Publish a `stop` message to signal response completion + +### Mapping OpenAI events to Ably messages + +The publisher will map the events from the OpenAI Responses API to Ably events, using the response ID from OpenAI events in the Ably message extras to correlate tokens belonging to the same response. + +1. `response.created`: This gives us the response ID, which we'll include in all messages that we publish to Ably for this response. When we receive this event, we'll publish an Ably message named `start`. + +```json +{ + type: 'response.created', + response: { + id: 'resp_abc123', + … + }, + … +} +``` + +2. `response.output_item.added`: This indicates the start of the output message. We'll extract its `output_index` and only publish `token` events to Ably for matching output items. + +```json +{ + type: 'response.output_item.added', + output_index: 1, + item: { + type: 'message', + … + }, + … +} +``` + +3. `response.content_part.added`: This indicates the start of the output text. We'll extract its `content_index` and only publish `token` events to Ably for matching content parts. + +```json +{ + type: 'response.content_part.added', + output_index: 1, + content_index: 0, + part: { + type: 'output_text', + … + }, + … +} +``` + +4. `response.output_text.delta`: This indicates that some tokens have been generated. When we receive such an event, we'll publish an Ably message named `token`, containing the text delta. + +```json +{ + type: 'response.output_text.delta', + output_index: 1, + content_index: 0, + delta: 'a bit of text that', + … +} +``` + +5. `response.completed`: This indicates that the response is complete and will not emit any further events. When we receive this event, we'll publish an Ably message named `stop`. + +```json +{ + type: 'response.completed', + response: { + id: 'resp_abc123', + … + }, + … +} +``` + + +### Publisher code + +On the publisher side, send a user prompt to OpenAI Responses API and map the stream of response events into messages on the Ably channel using the pattern described above. + +Add the following code to `publisher.mjs`: + + +```javascript +import { randomUUID } from "crypto"; + +async function streamOpenAIResponse(userInput) { + // Create streaming response from OpenAI + const stream = await openai.responses.create({ + model: "gpt-5", + input: userInput, + stream: true, + }); + + // Create a random request ID for correlating log messages when there are concurrent requests + const requestId = randomUUID().slice(0, 7); + console.log(`${requestId}: Created stream`); + + // In this simple example, we'll only stream tokens for a single + // content_part of a single output_item (specifically, the first + // content_part of type "output_text" of the first output_item of type + // "message"). These variables contain the output_index and content_index + // event properties that correspond to these entities. + let outputIndex = null; + let contentIndex = null; + + // Populated from the "response.created" OpenAI event and then included in + // all messages published to Ably for this response. + let responseId = null; + + // Iterate through streaming events + for await (const event of stream) { + console.log(`${requestId}: Got event ${event.type}`); + + if (event.type === "response.created") { + responseId = event.response.id; + + console.log( + `${requestId}: Publishing 'start' event for response ${responseId}`, + ); + channel.publish({ + name: "start", + extras: { + headers: { + responseId: responseId, + }, + }, + }); + } else { + if (responseId === null) { + throw new Error( + "OpenAI SDK behavior not as expected (initial response.created event not received)", + ); + } + + if ( + event.type === "response.output_item.added" && + event.item.type === "message" && + outputIndex === null + ) { + outputIndex = event.output_index; + } else if ( + event.type === "response.content_part.added" && + event.part.type === "output_text" && + event.output_index === outputIndex && + contentIndex === null + ) { + contentIndex = event.content_index; + } else if ( + event.type === "response.output_text.delta" && + event.output_index === outputIndex && + event.content_index === contentIndex + ) { + const delta = event.delta; + + console.log( + `${requestId}: Publishing 'token' event with delta "${delta}" for response ${responseId}`, + ); + + channel.publish({ + name: "token", + data: delta, + extras: { + headers: { + responseId: responseId, + }, + }, + }); + } else if (event.type === "response.completed") { + console.log( + `${requestId}: Publishing 'stop' event for response ${responseId}`, + ); + + channel.publish({ + name: "stop", + extras: { + headers: { + responseId: responseId, + }, + }, + }); + } else { + console.log( + `${requestId}: Ignoring OpenAI SDK event ${event.type} for response ${responseId}`, + ); + } + } + } +} + +// Usage example: Multiple concurrent responses +streamOpenAIResponse("Tell me a short joke"); +streamOpenAIResponse("Give me one interesting fact about Spain"); +streamOpenAIResponse("Write a one-line poem about carrot cake"); +``` + + +**Important implementation notes:** + +- **Don't await publish calls**: As shown in the code above, `channel.publish()` is called without `await`. This maximizes throughput by allowing Ably to batch acknowledgments. Messages are still published in order. For more details, see [publishing tokens](/docs/ai-transport/features/token-streaming/message-per-token#publishing) in the message-per-token guide. +- **Response ID correlation**: The `response_id` from OpenAI uniquely identifies each response, allowing subscribers to correctly handle multiple concurrent responses. +- **Event filtering**: Only publish when `event.delta` is present, as some events may not contain content. + +For more information about channels and publishing, see the [channels documentation](/docs/channels) and [message properties](/docs/messages#properties). + +## Subscribing to tokens + +On the client side, subscribe to all three event types (`start`, `token`, and `stop`) to handle the complete response lifecycle. Use a `Map` to track multiple concurrent responses by their `responseId`. + +Add the following code to `subscriber.mjs`: + + +```javascript +// Track responses by ID +const responses = new Map(); + +// Subscribe to response start events +await channel.subscribe("start", (message) => { + const responseId = message.extras?.headers?.responseId; + + if (!responseId) { + console.warn("Start event missing responseId"); + return; + } + + console.log(`${responseId}: Response started`); + + // Initialize empty response + responses.set(responseId, { + text: "", + startTime: Date.now(), + }); +}); + +// Subscribe to token events +await channel.subscribe("token", (message) => { + const responseId = message.extras?.headers?.responseId; + const token = message.data; + + if (!responseId) { + console.warn("Token event missing responseId"); + return; + } + + if (!responses.has(responseId)) { + console.warn(`${responseId}: Received token for unknown response`); + return; + } + + // Append token to response + const response = responses.get(responseId); + response.text += token; + responses.set(responseId, response); + + // Update UI or process token as it arrives + console.log(`${responseId}: Received token: "${token}"`); +}); + +// Subscribe to response stop events +await channel.subscribe("stop", (message) => { + const responseId = message.extras?.headers?.responseId; + + if (!responseId) { + console.warn("Stop event missing responseId"); + return; + } + + const response = responses.get(responseId); + + if (response) { + const duration = Date.now() - response.startTime; + console.log( + `${responseId} Response completed in ${duration}ms. Full text: ${response.text}`, + ); + + // Clean up completed response + responses.delete(responseId); + } +}); + +console.log("Waiting to receive events from Ably"); +``` + + +**Key implementation points:** + +- **Handle all event types**: Subscribe to `start`, `token`, and `stop` to manage the complete response lifecycle +- **Track concurrent responses**: Use a `Map` to handle multiple responses streaming simultaneously +- **Progressive updates**: Update your UI or process tokens as they arrive in the `token` handler + +For more information about subscribing to channels, see the [channel subscription documentation](/docs/channels/subscribe). + + + +## Run the subscriber and publisher + +We're going to run the publisher and two subscribers. This will allow us to see that: + +- messages published by a single publisher are successfully delivered simultaneously to multiple subscribers +- subscribers are able to handle multiple AI responses being received simultaneously + +Open two new terminal tabs and run the subscriber in each of them: + + +```shell +node subscriber.mjs +``` + + +Now, in the original terminal tab (the one in which you exported `OPENAI_API_KEY`), run the publisher: + + +```shell +node publisher.mjs +``` + + +You should see publisher output similar to the following: + +```text +f03945a: Created stream +f03945a: Got event response.created +f03945a: Publishing 'start' event for response resp_097628d5ede953e800693c497c30148194adc300e4ee412171 +f03945a: Got event response.in_progress +f03945a: Ignoring OpenAI SDK event response.in_progress for response resp_097628d5ede953e800693c497c30148194adc300e4ee412171 +de6fbd3: Created stream +de6fbd3: Got event response.created +de6fbd3: Publishing 'start' event for response resp_0f89f403f4f5f71800693c497c319c8195acdf3676dbe32cf5 +de6fbd3: Got event response.in_progress +de6fbd3: Ignoring OpenAI SDK event response.in_progress for response resp_0f89f403f4f5f71800693c497c319c8195acdf3676dbe32cf5 +38b6c91: Created stream +38b6c91: Got event response.created +38b6c91: Publishing 'start' event for response resp_053c6147aba7289e00693c497c2ec88195ac8b2dcacef1231c + +... + +de6fbd3: Got event response.output_text.delta +de6fbd3: Publishing 'token' event with delta "I" for response resp_0f89f403f4f5f71800693c497c319c8195acdf3676dbe32cf5 +de6fbd3: Got event response.output_text.delta +de6fbd3: Publishing 'token' event with delta " told" for response resp_0f89f403f4f5f71800693c497c319c8195acdf3676dbe32cf5 +de6fbd3: Got event response.output_text.delta +de6fbd3: Publishing 'token' event with delta " my" for response resp_0f89f403f4f5f71800693c497c319c8195acdf3676dbe32cf5 +38b6c91: Got event response.output_item.done +38b6c91: Publishing 'token' event with delta "Spain" for response resp_053c6147aba7289e00693c497c2ec88195ac8b2dcacef1231c +38b6c91: Got event response.output_text.delta +38b6c91: Publishing 'token' event with delta "’s" for response resp_053c6147aba7289e00693c497c2ec88195ac8b2dcacef1231c +38b6c91: Got event response.output_text.delta +38b6c91: Publishing 'token' event with delta " national" for response resp_053c6147aba7289e00693c497c2ec88195ac8b2dcacef1231c +38b6c91: Got event response.output_text.delta + +... + +38b6c91: Got event response.completed +38b6c91: Publishing 'stop' event for response resp_053c6147aba7289e00693c497c2ec88195ac8b2dcacef1231c +de6fbd3: Got event response.completed +de6fbd3: Publishing 'stop' event for response resp_0f89f403f4f5f71800693c497c319c8195acdf3676dbe32cf5 +f03945a: Got event response.completed +f03945a: Publishing 'stop' event for response resp_097628d5ede953e800693c497c30148194adc300e4ee412171 +``` + +And you should see subscriber output similar to the following, in both of your subscriber terminal tabs: + +```text +Waiting to receive events from Ably +resp_097628d5ede953e800693c497c30148194adc300e4ee412171: Response started +resp_0f89f403f4f5f71800693c497c319c8195acdf3676dbe32cf5: Response started +resp_053c6147aba7289e00693c497c2ec88195ac8b2dcacef1231c: Response started +resp_0f89f403f4f5f71800693c497c319c8195acdf3676dbe32cf5: Received token: "I" +resp_0f89f403f4f5f71800693c497c319c8195acdf3676dbe32cf5: Received token: " told" +resp_0f89f403f4f5f71800693c497c319c8195acdf3676dbe32cf5: Received token: " my" +resp_053c6147aba7289e00693c497c2ec88195ac8b2dcacef1231c: Received token: "Spain" +resp_053c6147aba7289e00693c497c2ec88195ac8b2dcacef1231c: Received token: "’s" +resp_053c6147aba7289e00693c497c2ec88195ac8b2dcacef1231c: Received token: " national" +resp_053c6147aba7289e00693c497c2ec88195ac8b2dcacef1231c: Received token: " anthem" +resp_053c6147aba7289e00693c497c2ec88195ac8b2dcacef1231c: Received token: "," +resp_097628d5ede953e800693c497c30148194adc300e4ee412171: Received token: "Car" +resp_097628d5ede953e800693c497c30148194adc300e4ee412171: Received token: "rot" + +... + +resp_053c6147aba7289e00693c497c2ec88195ac8b2dcacef1231c Response completed in 1555ms. Full text: Spain’s national anthem, the “Marcha Real,” is one of the few in the world with no official lyrics. +resp_0f89f403f4f5f71800693c497c319c8195acdf3676dbe32cf5 Response completed in 1651ms. Full text: I told my suitcase we’re not going on vacation this year. Now I’m dealing with emotional baggage. +resp_097628d5ede953e800693c497c30148194adc300e4ee412171 Response completed in 3253ms. Full text: Carrot cake: garden gold grated into spice, dreaming beneath cream-cheese snow. +``` + +**Key points**: + +- **Multiple concurrent responses are handled correctly**: The subscriber receives interleaved tokens for three concurrent AI responses, and correctly pieces together the three separate messages: + - "Spain’s national anthem, the “Marcha Real,” is one of the few in the world with no official lyrics." + - "I told my suitcase we’re not going on vacation this year. Now I’m dealing with emotional baggage." + - "Carrot cake: garden gold grated into spice, dreaming beneath cream-cheese snow." +- **Multiple subscribers receive the same token stream**: Both subscribers receive exactly the same token stream that was generated by a single publisher. + +## Next steps + +You now have a working implementation of token streaming from OpenAI's Responses API over Ably. Here are some suggestions for taking this further: + +**Explore advanced patterns:** +- Learn about [client hydration strategies](/docs/ai-transport/features/token-streaming/message-per-token#hydration) for handling late joiners and reconnections + +**Production readiness:** +- Implement [token authentication](/docs/auth/token) for secure client connections +- Add error handling for OpenAI API failures and rate limits + +**Learn more:** +- [Token streaming](/docs/ai-transport/features/token-streaming) - Complete guide to token streaming patterns