Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -472,15 +472,15 @@ async def stream_response(channel, prompt, prompt_id):

# Stream tokens by appending to the message
async for token in generate_tokens(prompt):
await channel.append_message(
await channel.append_message(Message(
serial=message_serial,
data=token,
extras={
'headers': {
'promptId': prompt_id
}
}
)
))
```
```java
void streamResponse(Channel channel, String prompt, String promptId) throws Exception {
Expand Down
16 changes: 8 additions & 8 deletions src/pages/docs/ai-transport/messaging/chain-of-thought.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,26 @@ channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}')
async for event in stream:
if event['type'] == 'reasoning':
# Publish reasoning messages
await channel.publish(
await channel.publish(Message(
name='reasoning',
data=event['text'],
extras={
'headers': {
'responseId': event['responseId']
}
}
)
))
elif event['type'] == 'message':
# Publish model output messages
await channel.publish(
await channel.publish(Message(
name='message',
data=event['text'],
extras={
'headers': {
'responseId': event['responseId']
}
}
)
))
```
```java
Channel channel = realtime.channels.get("{{RANDOM_CHANNEL_NAME}}");
Expand Down Expand Up @@ -340,14 +340,14 @@ conversation_channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}')
async for event in stream:
if event['type'] == 'start':
# Publish response start control message
await conversation_channel.publish(
await conversation_channel.publish(Message(
name='start',
extras={
'headers': {
'responseId': event['responseId']
}
}
)
))
elif event['type'] == 'reasoning':
# Publish reasoning to separate reasoning channel
reasoning_channel = realtime.channels.get(f"{{{{RANDOM_CHANNEL_NAME}}}}:{event['responseId']}")
Expand All @@ -358,15 +358,15 @@ async for event in stream:
)
elif event['type'] == 'message':
# Publish model output to main channel
await conversation_channel.publish(
await conversation_channel.publish(Message(
name='message',
data=event['text'],
extras={
'headers': {
'responseId': event['responseId']
}
}
)
))
```
```java
Channel conversationChannel = realtime.channels.get("{{RANDOM_CHANNEL_NAME}}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,14 @@ responses = {}
def on_message(message):
action = message.action

if action == 'message.create':
if action == MessageAction.MESSAGE_CREATE:
# New response started
responses[message.serial] = message.data
elif action == 'message.append':
elif action == MessageAction.MESSAGE_APPEND:
# Append token to existing response
current = responses.get(message.serial, '')
responses[message.serial] = current + message.data
elif action == 'message.update':
elif action == MessageAction.MESSAGE_UPDATE:
# Replace entire response content
responses[message.serial] = message.data

Expand All @@ -284,16 +284,16 @@ Map<String, String> responses = new ConcurrentHashMap<>();
// Subscribe to live messages (implicitly attaches the channel)
channel.subscribe(message -> {
switch (message.action) {
case "message.create":
case MessageAction.MESSAGE_CREATE:
// New response started
responses.put(message.serial, (String) message.data);
break;
case "message.append":
case MessageAction.MESSAGE_APPEND:
// Append token to existing response
String current = responses.getOrDefault(message.serial, "");
responses.put(message.serial, current + (String) message.data);
break;
case "message.update":
case MessageAction.MESSAGE_UPDATE:
// Replace entire response content
responses.put(message.serial, (String) message.data);
break;
Expand Down Expand Up @@ -358,14 +358,14 @@ responses = {}
def on_message(message):
action = message.action

if action == 'message.create':
if action == MessageAction.MESSAGE_CREATE:
# New response started
responses[message.serial] = message.data
elif action == 'message.append':
elif action == MessageAction.MESSAGE_APPEND:
# Append token to existing response
current = responses.get(message.serial, '')
responses[message.serial] = current + message.data
elif action == 'message.update':
elif action == MessageAction.MESSAGE_UPDATE:
# Replace entire response content
responses[message.serial] = message.data

Expand All @@ -384,16 +384,16 @@ Map<String, String> responses = new ConcurrentHashMap<>();
// which are delivered in order to the subscription
channel.subscribe(message -> {
switch (message.action) {
case "message.create":
case MessageAction.MESSAGE_CREATE:
// New response started
responses.put(message.serial, (String) message.data);
break;
case "message.append":
case MessageAction.MESSAGE_APPEND:
// Append token to existing response
String current = responses.getOrDefault(message.serial, "");
responses.put(message.serial, current + (String) message.data);
break;
case "message.update":
case MessageAction.MESSAGE_UPDATE:
// Replace entire response content
responses.put(message.serial, (String) message.data);
break;
Expand Down Expand Up @@ -466,14 +466,14 @@ responses = {}
def on_message(message):
action = message.action

if action == 'message.create':
if action == MessageAction.MESSAGE_CREATE:
# New response started
responses[message.serial] = message.data
elif action == 'message.append':
elif action == MessageAction.MESSAGE_APPEND:
# Append token to existing response
current = responses.get(message.serial, '')
responses[message.serial] = current + message.data
elif action == 'message.update':
elif action == MessageAction.MESSAGE_UPDATE:
# Replace entire response content
responses[message.serial] = message.data

Expand Down Expand Up @@ -501,16 +501,16 @@ Map<String, String> responses = new ConcurrentHashMap<>();
// Subscribe to live messages (implicitly attaches the channel)
channel.subscribe(message -> {
switch (message.action) {
case "message.create":
case MessageAction.MESSAGE_CREATE:
// New response started
responses.put(message.serial, (String) message.data);
break;
case "message.append":
case MessageAction.MESSAGE_APPEND:
// Append token to existing response
String current = responses.getOrDefault(message.serial, "");
responses.put(message.serial, current + (String) message.data);
break;
case "message.update":
case MessageAction.MESSAGE_UPDATE:
// Replace entire response content
responses.put(message.serial, (String) message.data);
break;
Expand Down Expand Up @@ -708,14 +708,14 @@ def on_message(message):

action = message.action

if action == 'message.create':
if action == MessageAction.MESSAGE_CREATE:
# New response started
in_progress_responses[response_id] = message.data
elif action == 'message.append':
elif action == MessageAction.MESSAGE_APPEND:
# Append token to existing response
current = in_progress_responses.get(response_id, '')
in_progress_responses[response_id] = current + message.data
elif action == 'message.update':
elif action == MessageAction.MESSAGE_UPDATE:
# Replace entire response content
in_progress_responses[response_id] = message.data

Expand Down Expand Up @@ -749,16 +749,16 @@ channel.subscribe(message -> {
}

switch (message.action) {
case "message.create":
case MessageAction.MESSAGE_CREATE:
// New response started
inProgressResponses.put(responseId, (String) message.data);
break;
case "message.append":
case MessageAction.MESSAGE_APPEND:
// Append token to existing response
String current = inProgressResponses.getOrDefault(responseId, "");
inProgressResponses.put(responseId, current + (String) message.data);
break;
case "message.update":
case MessageAction.MESSAGE_UPDATE:
// Replace entire response content
inProgressResponses.put(responseId, (String) message.data);
break;
Expand Down Expand Up @@ -870,14 +870,14 @@ def on_message(message):

action = message.action

if action == 'message.create':
if action == MessageAction.MESSAGE_CREATE:
# New response started
in_progress_responses[response_id] = message.data
elif action == 'message.append':
elif action == MessageAction.MESSAGE_APPEND:
# Append token to existing response
current = in_progress_responses.get(response_id, '')
in_progress_responses[response_id] = current + message.data
elif action == 'message.update':
elif action == MessageAction.MESSAGE_UPDATE:
# Replace entire response content
in_progress_responses[response_id] = message.data

Expand Down Expand Up @@ -933,16 +933,16 @@ channel.subscribe(message -> {
}

switch (message.action) {
case "message.create":
case MessageAction.MESSAGE_CREATE:
// New response started
inProgressResponses.put(responseId, (String) message.data);
break;
case "message.append":
case MessageAction.MESSAGE_APPEND:
// Append token to existing response
String current = inProgressResponses.getOrDefault(responseId, "");
inProgressResponses.put(responseId, current + (String) message.data);
break;
case "message.update":
case MessageAction.MESSAGE_UPDATE:
// Replace entire response content
inProgressResponses.put(responseId, (String) message.data);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,15 @@ channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}')
# Example: stream returns events like { 'type': 'token', 'text': 'Hello', 'responseId': 'resp_abc123' }
async for event in stream:
if event['type'] == 'token':
channel.publish(
channel.publish(Message(
name='token',
data=event['text'],
extras={
'headers': {
'responseId': event['responseId']
}
}
)
))
```
```java
Channel channel = realtime.channels.get("{{RANDOM_CHANNEL_NAME}}");
Expand Down Expand Up @@ -362,35 +362,35 @@ channel = realtime.channels.get('{{RANDOM_CHANNEL_NAME}}')
async for event in stream:
if event['type'] == 'message_start':
# Publish response start
channel.publish(
channel.publish(Message(
name='start',
extras={
'headers': {
'responseId': event['responseId']
}
}
)
))
elif event['type'] == 'message_delta':
# Publish tokens
channel.publish(
channel.publish(Message(
name='token',
data=event['text'],
extras={
'headers': {
'responseId': event['responseId']
}
}
)
))
elif event['type'] == 'message_stop':
# Publish response stop
channel.publish(
channel.publish(Message(
name='stop',
extras={
'headers': {
'responseId': event['responseId']
}
}
)
))
```
```java
Channel channel = realtime.channels.get("{{RANDOM_CHANNEL_NAME}}");
Expand Down