Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions packages/server/src/utils/buildAgentflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,6 @@ const executeNode = async ({
flowData: JSON.stringify(iterationFlowData)
}

// Initialize array to collect results from iterations
const iterationResults: string[] = []

// Execute sub-flow for each item in the iteration array
Expand Down Expand Up @@ -1334,9 +1333,16 @@ const executeNode = async ({
productId
})

// Store the result
if (subFlowResult?.text) {
iterationResults.push(subFlowResult.text)
if (subFlowResult) {
iterationResults.push(subFlowResult.text || '')
if (subFlowResult.text) {
// Stream each result as it completes rather than batching at the end.
// Sub-flows run with isRecursive=true, so inner nodes (e.g. DirectReply)
// never reach isLastNode=true and never call streamTokenEvent themselves.
if (isLastNode && sseStreamer) {
sseStreamer.streamTokenEvent(chatId, (i > 0 ? '\n' : '') + subFlowResult.text)
}
}
}
Comment on lines +1336 to 1346
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current streaming logic skips empty results, which creates a discrepancy with the final output generated by iterationResults.join('\n') (which includes empty strings as empty lines).

To maintain consistency between the real-time stream and the final persisted message, consider streaming every result. Using the loop index i instead of successfulCount ensures the newline separator correctly matches the join behavior.

                        if (subFlowResult) {
                            const resultText = subFlowResult.text || ''
                            iterationResults.push(resultText)
                            if (isLastNode && sseStreamer) {
                                // Stream each result (including empty ones) to match the final joined output
                                sseStreamer.streamTokenEvent(chatId, (i > 0 ? '\n' : '') + resultText)
                            }
                        }
References
  1. A suggested change, even if valid, may be deferred if it pertains to logic that is not being modified in the current pull request.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HenryHengZJ @jocelynlin-wd
There are 2 approaches to resolving the streaming bug:

Approach 1 — Errors excluded from content (Ref: 328d652)
Aligns iteration with every other v2 node (errors go to execution log only, not chat content), but
this changes the non-streaming API response and persisted chat history for existing flows.

Approach 2 — Errors included in content, not in stream (Changes made in this PR)
Preserves the pre-fix API response and chat history (errors visible on reload), but what was streamed
live differs from what gets saved — a user watching live sees fewer results than a user reading
history.

Decision: Should iteration errors be part of content (and therefore streamed to keep live and history
consistent), or excluded from content entirely to match all v2 node behavior ?


// Add executed data from sub-flow to main execution data with appropriate iteration context
Expand Down Expand Up @@ -1392,7 +1398,6 @@ const executeNode = async ({
}
}

// Update the output with combined results
results.output = {
...(results.output || {}),
iterationResults,
Expand Down
Loading