diff --git a/src/pages/docs/ai-transport/messaging/accepting-user-input.mdx b/src/pages/docs/ai-transport/messaging/accepting-user-input.mdx index 9ec4af4b8f..9713595eb8 100644 --- a/src/pages/docs/ai-transport/messaging/accepting-user-input.mdx +++ b/src/pages/docs/ai-transport/messaging/accepting-user-input.mdx @@ -472,7 +472,7 @@ async def stream_response(channel, prompt, prompt_id): # Stream tokens by appending to the message async for token in generate_tokens(prompt): - await channel.append_message( + await channel.append_message(Message( serial=message_serial, data=token, extras={ @@ -480,7 +480,7 @@ async def stream_response(channel, prompt, prompt_id): 'promptId': prompt_id } } - ) + )) ``` ```java void streamResponse(Channel channel, String prompt, String promptId) throws Exception { diff --git a/src/pages/docs/ai-transport/messaging/chain-of-thought.mdx b/src/pages/docs/ai-transport/messaging/chain-of-thought.mdx index fa4cd254b1..c218673536 100644 --- a/src/pages/docs/ai-transport/messaging/chain-of-thought.mdx +++ b/src/pages/docs/ai-transport/messaging/chain-of-thought.mdx @@ -88,7 +88,7 @@ channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}') async for event in stream: if event['type'] == 'reasoning': # Publish reasoning messages - await channel.publish( + await channel.publish(Message( name='reasoning', data=event['text'], extras={ @@ -96,10 +96,10 @@ async for event in stream: 'responseId': event['responseId'] } } - ) + )) elif event['type'] == 'message': # Publish model output messages - await channel.publish( + await channel.publish(Message( name='message', data=event['text'], extras={ @@ -107,7 +107,7 @@ async for event in stream: 'responseId': event['responseId'] } } - ) + )) ``` ```java Channel channel = realtime.channels.get("{{RANDOM_CHANNEL_NAME}}"); @@ -340,14 +340,14 @@ conversation_channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}') async for event in stream: if event['type'] == 'start': # Publish response start control message - await conversation_channel.publish( + await conversation_channel.publish(Message( name='start', extras={ 'headers': { 'responseId': event['responseId'] } } - ) + )) elif event['type'] == 'reasoning': # Publish reasoning to separate reasoning channel reasoning_channel = realtime.channels.get(f"{{{{RANDOM_CHANNEL_NAME}}}}:{event['responseId']}") @@ -358,7 +358,7 @@ async for event in stream: ) elif event['type'] == 'message': # Publish model output to main channel - await conversation_channel.publish( + await conversation_channel.publish(Message( name='message', data=event['text'], extras={ @@ -366,7 +366,7 @@ async for event in stream: 'responseId': event['responseId'] } } - ) + )) ``` ```java Channel conversationChannel = realtime.channels.get("{{RANDOM_CHANNEL_NAME}}"); diff --git a/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx b/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx index a921222bd2..891b3f4cd3 100644 --- a/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx +++ b/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx @@ -262,14 +262,14 @@ responses = {} def on_message(message): action = message.action - if action == 'message.create': + if action == MessageAction.MESSAGE_CREATE: # New response started responses[message.serial] = message.data - elif action == 'message.append': + elif action == MessageAction.MESSAGE_APPEND: # Append token to existing response current = responses.get(message.serial, '') responses[message.serial] = current + message.data - elif action == 'message.update': + elif action == MessageAction.MESSAGE_UPDATE: # Replace entire response content responses[message.serial] = message.data @@ -284,16 +284,16 @@ Map responses = new ConcurrentHashMap<>(); // Subscribe to live messages (implicitly attaches the channel) channel.subscribe(message -> { switch (message.action) { - case "message.create": + case MessageAction.MESSAGE_CREATE: // New response started responses.put(message.serial, (String) message.data); break; - case "message.append": + case MessageAction.MESSAGE_APPEND: // Append token to existing response String current = responses.getOrDefault(message.serial, ""); responses.put(message.serial, current + (String) message.data); break; - case "message.update": + case MessageAction.MESSAGE_UPDATE: // Replace entire response content responses.put(message.serial, (String) message.data); break; @@ -358,14 +358,14 @@ responses = {} def on_message(message): action = message.action - if action == 'message.create': + if action == MessageAction.MESSAGE_CREATE: # New response started responses[message.serial] = message.data - elif action == 'message.append': + elif action == MessageAction.MESSAGE_APPEND: # Append token to existing response current = responses.get(message.serial, '') responses[message.serial] = current + message.data - elif action == 'message.update': + elif action == MessageAction.MESSAGE_UPDATE: # Replace entire response content responses[message.serial] = message.data @@ -384,16 +384,16 @@ Map responses = new ConcurrentHashMap<>(); // which are delivered in order to the subscription channel.subscribe(message -> { switch (message.action) { - case "message.create": + case MessageAction.MESSAGE_CREATE: // New response started responses.put(message.serial, (String) message.data); break; - case "message.append": + case MessageAction.MESSAGE_APPEND: // Append token to existing response String current = responses.getOrDefault(message.serial, ""); responses.put(message.serial, current + (String) message.data); break; - case "message.update": + case MessageAction.MESSAGE_UPDATE: // Replace entire response content responses.put(message.serial, (String) message.data); break; @@ -466,14 +466,14 @@ responses = {} def on_message(message): action = message.action - if action == 'message.create': + if action == MessageAction.MESSAGE_CREATE: # New response started responses[message.serial] = message.data - elif action == 'message.append': + elif action == MessageAction.MESSAGE_APPEND: # Append token to existing response current = responses.get(message.serial, '') responses[message.serial] = current + message.data - elif action == 'message.update': + elif action == MessageAction.MESSAGE_UPDATE: # Replace entire response content responses[message.serial] = message.data @@ -501,16 +501,16 @@ Map responses = new ConcurrentHashMap<>(); // Subscribe to live messages (implicitly attaches the channel) channel.subscribe(message -> { switch (message.action) { - case "message.create": + case MessageAction.MESSAGE_CREATE: // New response started responses.put(message.serial, (String) message.data); break; - case "message.append": + case MessageAction.MESSAGE_APPEND: // Append token to existing response String current = responses.getOrDefault(message.serial, ""); responses.put(message.serial, current + (String) message.data); break; - case "message.update": + case MessageAction.MESSAGE_UPDATE: // Replace entire response content responses.put(message.serial, (String) message.data); break; @@ -708,14 +708,14 @@ def on_message(message): action = message.action - if action == 'message.create': + if action == MessageAction.MESSAGE_CREATE: # New response started in_progress_responses[response_id] = message.data - elif action == 'message.append': + elif action == MessageAction.MESSAGE_APPEND: # Append token to existing response current = in_progress_responses.get(response_id, '') in_progress_responses[response_id] = current + message.data - elif action == 'message.update': + elif action == MessageAction.MESSAGE_UPDATE: # Replace entire response content in_progress_responses[response_id] = message.data @@ -749,16 +749,16 @@ channel.subscribe(message -> { } switch (message.action) { - case "message.create": + case MessageAction.MESSAGE_CREATE: // New response started inProgressResponses.put(responseId, (String) message.data); break; - case "message.append": + case MessageAction.MESSAGE_APPEND: // Append token to existing response String current = inProgressResponses.getOrDefault(responseId, ""); inProgressResponses.put(responseId, current + (String) message.data); break; - case "message.update": + case MessageAction.MESSAGE_UPDATE: // Replace entire response content inProgressResponses.put(responseId, (String) message.data); break; @@ -870,14 +870,14 @@ def on_message(message): action = message.action - if action == 'message.create': + if action == MessageAction.MESSAGE_CREATE: # New response started in_progress_responses[response_id] = message.data - elif action == 'message.append': + elif action == MessageAction.MESSAGE_APPEND: # Append token to existing response current = in_progress_responses.get(response_id, '') in_progress_responses[response_id] = current + message.data - elif action == 'message.update': + elif action == MessageAction.MESSAGE_UPDATE: # Replace entire response content in_progress_responses[response_id] = message.data @@ -933,16 +933,16 @@ channel.subscribe(message -> { } switch (message.action) { - case "message.create": + case MessageAction.MESSAGE_CREATE: // New response started inProgressResponses.put(responseId, (String) message.data); break; - case "message.append": + case MessageAction.MESSAGE_APPEND: // Append token to existing response String current = inProgressResponses.getOrDefault(responseId, ""); inProgressResponses.put(responseId, current + (String) message.data); break; - case "message.update": + case MessageAction.MESSAGE_UPDATE: // Replace entire response content inProgressResponses.put(responseId, (String) message.data); break; diff --git a/src/pages/docs/ai-transport/token-streaming/message-per-token.mdx b/src/pages/docs/ai-transport/token-streaming/message-per-token.mdx index 310bdccb1d..dc9bfbb35e 100644 --- a/src/pages/docs/ai-transport/token-streaming/message-per-token.mdx +++ b/src/pages/docs/ai-transport/token-streaming/message-per-token.mdx @@ -197,7 +197,7 @@ channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}') # Example: stream returns events like { 'type': 'token', 'text': 'Hello', 'responseId': 'resp_abc123' } async for event in stream: if event['type'] == 'token': - channel.publish( + channel.publish(Message( name='token', data=event['text'], extras={ @@ -205,7 +205,7 @@ async for event in stream: 'responseId': event['responseId'] } } - ) + )) ``` ```java Channel channel = realtime.channels.get("{{RANDOM_CHANNEL_NAME}}"); @@ -362,17 +362,17 @@ channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}') async for event in stream: if event['type'] == 'message_start': # Publish response start - channel.publish( + channel.publish(Message( name='start', extras={ 'headers': { 'responseId': event['responseId'] } } - ) + )) elif event['type'] == 'message_delta': # Publish tokens - channel.publish( + channel.publish(Message( name='token', data=event['text'], extras={ @@ -380,17 +380,17 @@ async for event in stream: 'responseId': event['responseId'] } } - ) + )) elif event['type'] == 'message_stop': # Publish response stop - channel.publish( + channel.publish(Message( name='stop', extras={ 'headers': { 'responseId': event['responseId'] } } - ) + )) ``` ```java Channel channel = realtime.channels.get("{{RANDOM_CHANNEL_NAME}}");