fix(auditlog): reconstruct streamed tool calls#317
Conversation
📝 WalkthroughWalkthroughBuilds richer ChatCompletion responses from SSE events: captures metadata, supports multiple choices, accumulates per-choice content and tool calls with truncation-aware appends, sorts choices/tool_calls deterministically, and adds unit tests for streamed reconstructions. ChangesStreaming Chat Completion Response Enhancement
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/auditlog/stream_observer.go`:
- Around line 211-218: The current defaultChatChoiceIndex uses len(states) which
can collide with sparse provider indexes; change the logic to return the single
existing key when len(states)==1, and otherwise compute the maximum key present
in the states map and return maxKey+1 (or 0 if the map is empty) so fallback
synthetic indexes never reuse or assume contiguous provider indexes; apply the
same fix to the analogous function used around lines 265-272 (the other default
index helper) referencing the same map parameter name (states) and function name
patterns to locate and update both implementations.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: de167519-151c-4578-8eee-7a6c44e9a38d
📒 Files selected for processing (3)
internal/auditlog/auditlog_test.gointernal/auditlog/stream_observer.gointernal/auditlog/stream_wrapper.go
Greptile SummaryThis PR rewrites the streaming audit-log builder to reconstruct a proper multi-choice, multi-tool-call
Confidence Score: 5/5Safe to merge — the change is well-tested, audit-path only, and carries no impact on request routing or response pass-through. All new code paths are in the audit-log builder, which only affects stored audit rows and never touches the proxied response seen by callers. Eight new unit tests exercise the key scenarios (text-only, tool-call-only, interleaved, parallel, sparse-index, orphan-tool-call, trailing usage, budget truncation). The design trade-offs around shared budget and sparse-index fallback are already documented in prior review threads and are deliberate choices rather than defects. No files require special attention; the three changed files are self-contained within the auditlog package. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[SSE chunk arrives] --> B[parseChatCompletionEvent]
B --> C[Update builder metadata\nid, model, created, provider,\nsystem_fingerprint, usage]
B --> D{choices present?}
D -- yes --> E[For each choice]
D -- no --> Z[Done]
E --> F{index field present?}
F -- yes --> G[use explicit index]
F -- no --> H[defaultChatChoiceIndex\nlen==1: reuse existing\nlen!=1: max+1]
G --> I[builder.chatChoice index]
H --> I
I --> J[Update FinishReason if set]
I --> K[Append delta.content\nappendLimitedStreamText]
I --> L[appendChatToolCalls]
L --> M[For each tool_call delta]
M --> N{index field present?}
N -- yes --> O[use explicit tool index]
N -- no --> P[defaultToolCallIndex\nlen==1: reuse existing\nlen!=1: max+1]
O --> Q[state.toolCall index]
P --> Q
Q --> R[Set id / type if not yet set]
Q --> S{function field?}
S -- no --> T[continue - no hasFunction flag]
S -- yes --> U[Set hasFunction=true\nSet name if not yet set\nAppend arguments text]
V[buildChatCompletionResponse] --> W[buildChatChoices\nsort by index]
W --> X[For each choice state\nbuild message map]
X --> Y[buildStreamChatToolCalls\nskip !hasFunction\nsort by index]
Y --> Z2[Emit final choices array]
Reviews (2): Last reviewed commit: "fix(auditlog): mark exact stream capture..." | Re-trigger Greptile |
|
Addressed the review follow-ups in
I left the shared Validation rerun:
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/auditlog/stream_wrapper.go`:
- Around line 99-111: Add a brief clarifying comment above the content selection
logic in stream_wrapper.go explaining why message["content"] is set to nil when
toolCalls exist but content is empty, and why it uses an empty string ("") for
truly empty messages; reference the variables/state.Content.String(),
state.ToolCalls, buildStreamChatToolCalls(...) and the message["content"]
assignment so maintainers know this mirrors the OpenAI spec distinction between
"no text because tools provided output" (nil) and "explicitly empty message"
("").
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 81eef941-8bb0-41e5-8169-dc1e5d030a8b
📒 Files selected for processing (3)
internal/auditlog/auditlog_test.gointernal/auditlog/stream_observer.gointernal/auditlog/stream_wrapper.go
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/auditlog/stream_observer.go (1)
316-327:⚠️ Potential issue | 🟠 Major | ⚡ Quick winMark exact-budget overflow as truncated.
Line 317 drops later chunks once
contentLenreachesMaxContentCapture, butbuilder.truncatedis only flipped when the current chunk crosses the boundary. If the buffer fills exactly and one more chunk arrives, we silently lose content whileResponseBodyTooBigToHandlestaysfalse.Suggested fix
func appendLimitedStreamText(builder *streamResponseBuilder, dst *strings.Builder, content string) { - if builder == nil || dst == nil || content == "" || builder.truncated || builder.contentLen >= MaxContentCapture { + if builder == nil || dst == nil || content == "" || builder.truncated { return } + if builder.contentLen >= MaxContentCapture { + builder.truncated = true + return + } remaining := MaxContentCapture - builder.contentLen if len(content) > remaining { content = content[:remaining] builder.truncated = true🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/auditlog/stream_observer.go` around lines 316 - 327, appendLimitedStreamText currently only sets builder.truncated when the incoming chunk itself exceeds the remaining budget, which misses the case where builder.contentLen == MaxContentCapture and a later chunk is dropped; update the function (appendLimitedStreamText) to mark builder.truncated = true whenever there is no remaining budget (remaining == 0) or when any incoming chunk is truncated, i.e., compute remaining := MaxContentCapture - builder.contentLen and if remaining <= 0 set builder.truncated = true and return, otherwise truncate the content to remaining and set builder.truncated = true if truncation occurred, then write and update builder.contentLen accordingly so ResponseBodyTooBigToHandle state reflects exact-budget overflows.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/auditlog/stream_wrapper.go`:
- Around line 137-146: The code currently appends a tool call as soon as
state.hasFunction is true even if the function name is empty, producing
incomplete tool_calls with an empty function.name; update the append logic in
the loop that builds toolCalls to require both state.hasFunction and a non-empty
state.Name (i.e., check state.Name != "" or use nonEmptyString(state.Name, "")
to ensure it's present) before appending the map (the block that sets
"id","type" and "function": {"name": state.Name,"arguments":
state.Arguments.String()}); this will skip incomplete
function-only/arguments-only deltas until the function name is known so
reconstructed audit payloads are complete and safe for consumers.
---
Outside diff comments:
In `@internal/auditlog/stream_observer.go`:
- Around line 316-327: appendLimitedStreamText currently only sets
builder.truncated when the incoming chunk itself exceeds the remaining budget,
which misses the case where builder.contentLen == MaxContentCapture and a later
chunk is dropped; update the function (appendLimitedStreamText) to mark
builder.truncated = true whenever there is no remaining budget (remaining == 0)
or when any incoming chunk is truncated, i.e., compute remaining :=
MaxContentCapture - builder.contentLen and if remaining <= 0 set
builder.truncated = true and return, otherwise truncate the content to remaining
and set builder.truncated = true if truncation occurred, then write and update
builder.contentLen accordingly so ResponseBodyTooBigToHandle state reflects
exact-budget overflows.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: dd16e9de-28ca-45cc-a5b5-78210f4dfbbd
📒 Files selected for processing (2)
internal/auditlog/stream_observer.gointernal/auditlog/stream_wrapper.go
| if !state.hasFunction { | ||
| continue | ||
| } | ||
| toolCalls = append(toolCalls, map[string]any{ | ||
| "id": state.ID, | ||
| "type": nonEmptyString(state.Type, "function"), | ||
| "function": map[string]any{ | ||
| "name": state.Name, | ||
| "arguments": state.Arguments.String(), | ||
| }, |
There was a problem hiding this comment.
Skip incomplete tool calls until the function name is known.
Line 137 only checks hasFunction. If a stream ends after an arguments-only delta, this code emits tool_calls[].function.name = "", which makes the reconstructed audit payload incomplete and harder for consumers to process safely.
Suggested fix
for _, index := range indexes {
state := states[index]
- if !state.hasFunction {
+ if !state.hasFunction || strings.TrimSpace(state.Name) == "" {
continue
}
toolCalls = append(toolCalls, map[string]any{
"id": state.ID,
"type": nonEmptyString(state.Type, "function"),As per coding guidelines, "Accept provider responses liberally and return them to the user in a conservative OpenAI-compatible format."
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if !state.hasFunction { | |
| continue | |
| } | |
| toolCalls = append(toolCalls, map[string]any{ | |
| "id": state.ID, | |
| "type": nonEmptyString(state.Type, "function"), | |
| "function": map[string]any{ | |
| "name": state.Name, | |
| "arguments": state.Arguments.String(), | |
| }, | |
| if !state.hasFunction || strings.TrimSpace(state.Name) == "" { | |
| continue | |
| } | |
| toolCalls = append(toolCalls, map[string]any{ | |
| "id": state.ID, | |
| "type": nonEmptyString(state.Type, "function"), | |
| "function": map[string]any{ | |
| "name": state.Name, | |
| "arguments": state.Arguments.String(), | |
| }, |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/auditlog/stream_wrapper.go` around lines 137 - 146, The code
currently appends a tool call as soon as state.hasFunction is true even if the
function name is empty, producing incomplete tool_calls with an empty
function.name; update the append logic in the loop that builds toolCalls to
require both state.hasFunction and a non-empty state.Name (i.e., check
state.Name != "" or use nonEmptyString(state.Name, "") to ensure it's present)
before appending the map (the block that sets "id","type" and "function":
{"name": state.Name,"arguments": state.Arguments.String()}); this will skip
incomplete function-only/arguments-only deltas until the function name is known
so reconstructed audit payloads are complete and safe for consumers.
|
Addressed the latest review findings in
Validation rerun:
|
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
internal/auditlog/stream_wrapper.go (1)
127-154:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winTool-call serialization gating on
hasFunctiononly.Same gating concern as previously raised:
state.Namecan still be""when emitted if a provider streams a function delta containing onlyarguments(or only an emptyname) and never sendsnameafterward. With all currently known providers sendingfunction.namein the first function-bearing delta, this is acceptable, but it leaves the door open for malformedtool_calls[].function.name = ""in audit payloads for non-standard providers. Worth at least an inline note documenting the assumption.As per coding guidelines, "Accept provider responses liberally and return them to the user in a conservative OpenAI-compatible format."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/auditlog/stream_wrapper.go` around lines 127 - 154, The buildStreamChatToolCalls function currently emits tool calls whenever state.hasFunction is true, which can produce an empty function.name; change the gating so a tool call is only appended if state.hasFunction is true AND state.Name is non-empty (or use nonEmptyString(state.Name, "") to validate), and update the emitted "function"."name" to use that validated non-empty value (or skip emitting the tool call if name is empty); alternatively add an explicit inline comment in buildStreamChatToolCalls documenting the assumption that providers must stream function.name in their first function delta and that empty names are rejected by the gating logic (referencing state.hasFunction, state.Name, buildStreamChatToolCalls).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/auditlog/stream_observer.go`:
- Around line 184-208: Add a short clarifying comment to the
defaultStreamStateIndex function documenting the single-stream merge assumption:
when len(states) == 1 the function returns the existing key which means multiple
index-less entries in the same event will be collapsed into that single
synthetic index; reference this behavior and note callers like the loop in
stream_observer.go that uses builder.chatChoice, appendChatToolCalls, and
appendChatContent may rely on provider-sent indices to avoid collisions, so
future changes should preserve or explicitly handle the "single-stream merge"
edge-case instead of silently altering it.
In `@internal/auditlog/stream_wrapper.go`:
- Around line 73-84: The current code fabricates a synthetic assistant choice
when b.Choices is empty; instead, change the logic in the audit-serialization
path that uses b.Choices so that if len(b.Choices) == 0 you return an empty
slice (or omit the "choices" field) rather than injecting the artificial
{"index":0,"message":{...},"finish_reason":""}; update the branch handling
b.Choices to produce an actual empty []map[string]any (or skip adding the
choices key) so downstream consumers can distinguish "no deltas received" from
an empty assistant output, ensuring you only change the empty-case behavior and
leave non-empty b.Choices handling unchanged.
- Around line 270-309: Add short doc comments: above copyAnyMap(document that
maps.Clone performs a shallow copy, so nested map values remain shared and the
function assumes the caller/producer retains ownership and will not mutate
nested structures), and above jsonNumberToInt and jsonNumberToInt64(note they
currently only handle encoding/json's default float64/int/int64 and that if the
upstream SSE decoder switches to Decoder.UseNumber() you must add a json.Number
case to parse via .Int64()/String()). Optionally proactively add json.Number
handling in jsonNumberToInt/jsonNumberToInt64 if you want to be defensive.
---
Duplicate comments:
In `@internal/auditlog/stream_wrapper.go`:
- Around line 127-154: The buildStreamChatToolCalls function currently emits
tool calls whenever state.hasFunction is true, which can produce an empty
function.name; change the gating so a tool call is only appended if
state.hasFunction is true AND state.Name is non-empty (or use
nonEmptyString(state.Name, "") to validate), and update the emitted
"function"."name" to use that validated non-empty value (or skip emitting the
tool call if name is empty); alternatively add an explicit inline comment in
buildStreamChatToolCalls documenting the assumption that providers must stream
function.name in their first function delta and that empty names are rejected by
the gating logic (referencing state.hasFunction, state.Name,
buildStreamChatToolCalls).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: b738b708-77c1-4345-99f6-675235d1380a
📒 Files selected for processing (3)
internal/auditlog/auditlog_test.gointernal/auditlog/stream_observer.gointernal/auditlog/stream_wrapper.go
| for _, choiceAny := range choices { | ||
| choice, ok := choiceAny.(map[string]any) | ||
| if !ok { | ||
| continue | ||
| } | ||
| index, ok := jsonNumberToInt(choice["index"]) | ||
| if !ok { | ||
| index = defaultChatChoiceIndex(builder.Choices) | ||
| } | ||
| state := builder.chatChoice(index) | ||
| if fr, ok := choice["finish_reason"].(string); ok && fr != "" { | ||
| builder.FinishReason = fr | ||
| state.FinishReason = fr | ||
| } | ||
|
|
||
| if delta, ok := choice["delta"].(map[string]any); ok { | ||
| if role, ok := delta["role"].(string); ok { | ||
| builder.Role = role | ||
| state.Role = role | ||
| } | ||
| if content, ok := delta["content"].(string); ok && content != "" { | ||
| appendStreamContent(builder, content) | ||
| appendChatContent(builder, state, content) | ||
| } | ||
| appendChatToolCalls(builder, state, delta) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 💤 Low value
Edge case: two index-less entries in the same event collapse into one.
defaultStreamStateIndex returns the existing single key when len(states) == 1. Inside this loop, if a provider event happens to include two choices (or two tool_calls via appendChatToolCalls) without an index, the first iteration creates state at synthetic index N, and the second iteration sees len==1 and returns N — both index-less entries merge into the same state. In practice every observed provider sends index on parallel choices/tool_calls, so the risk is low, but worth a brief comment on the function noting the "single-stream merge" assumption so future maintainers don't reinforce the behavior accidentally.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/auditlog/stream_observer.go` around lines 184 - 208, Add a short
clarifying comment to the defaultStreamStateIndex function documenting the
single-stream merge assumption: when len(states) == 1 the function returns the
existing key which means multiple index-less entries in the same event will be
collapsed into that single synthetic index; reference this behavior and note
callers like the loop in stream_observer.go that uses builder.chatChoice,
appendChatToolCalls, and appendChatContent may rely on provider-sent indices to
avoid collisions, so future changes should preserve or explicitly handle the
"single-stream merge" edge-case instead of silently altering it.
| if len(b.Choices) == 0 { | ||
| return []map[string]any{ | ||
| { | ||
| "index": 0, | ||
| "message": map[string]any{ | ||
| "role": role, | ||
| "content": b.Content.String(), | ||
| "role": "assistant", | ||
| "content": "", | ||
| }, | ||
| "finish_reason": b.FinishReason, | ||
| "finish_reason": "", | ||
| }, | ||
| }, | ||
| } | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 💤 Low value
Synthetic empty choice when no deltas were ever observed.
If a stream closes without any choices events (e.g. error mid-stream after only metadata/usage), this still emits a fabricated assistant/""/finish_reason: "" entry in the audit row. That can be confusing for downstream consumers, who can't distinguish "model produced empty assistant content" from "no choice deltas ever arrived". Consider returning an empty slice (or omitting choices) in that case, but only if it doesn't break existing audit consumers that assume choices is always populated.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/auditlog/stream_wrapper.go` around lines 73 - 84, The current code
fabricates a synthetic assistant choice when b.Choices is empty; instead, change
the logic in the audit-serialization path that uses b.Choices so that if
len(b.Choices) == 0 you return an empty slice (or omit the "choices" field)
rather than injecting the artificial
{"index":0,"message":{...},"finish_reason":""}; update the branch handling
b.Choices to produce an actual empty []map[string]any (or skip adding the
choices key) so downstream consumers can distinguish "no deltas received" from
an empty assistant output, ensuring you only change the empty-case behavior and
leave non-empty b.Choices handling unchanged.
| func copyAnyMap(m map[string]any) map[string]any { | ||
| if m == nil { | ||
| return nil | ||
| } | ||
| return maps.Clone(m) | ||
| } | ||
|
|
||
| func jsonNumberToInt(value any) (int, bool) { | ||
| switch v := value.(type) { | ||
| case float64: | ||
| return int(v), true | ||
| case int: | ||
| return v, true | ||
| case int64: | ||
| return int(v), true | ||
| default: | ||
| return 0, false | ||
| } | ||
| } | ||
|
|
||
| func jsonNumberToInt64(value any) (int64, bool) { | ||
| switch v := value.(type) { | ||
| case float64: | ||
| return int64(v), true | ||
| case int: | ||
| return int64(v), true | ||
| case int64: | ||
| return v, true | ||
| default: | ||
| return 0, false | ||
| } | ||
| } | ||
|
|
||
| func nonEmptyString(value, fallback string) string { | ||
| value = strings.TrimSpace(value) | ||
| if value != "" { | ||
| return value | ||
| } | ||
| return strings.TrimSpace(fallback) | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 💤 Low value
Helpers are idiomatic, but copyAnyMap is shallow.
maps.Clone performs a shallow copy, so nested maps inside Usage (e.g. OpenAI's completion_tokens_details, prompt_tokens_details) would share references with the originating JSON event map. Today this is safe because the SSE parser hands ownership of the decoded map to the observer and doesn't mutate it, but if that contract ever changes the builder's Usage could be silently mutated. Consider a brief doc comment to record the shallow-copy assumption.
jsonNumberToInt/jsonNumberToInt64 cover the default encoding/json decoding (float64). If the upstream SSE decoder ever switches to Decoder.UseNumber(), you'll also want a json.Number case here — worth a short comment noting the float64-only assumption.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/auditlog/stream_wrapper.go` around lines 270 - 309, Add short doc
comments: above copyAnyMap(document that maps.Clone performs a shallow copy, so
nested map values remain shared and the function assumes the caller/producer
retains ownership and will not mutate nested structures), and above
jsonNumberToInt and jsonNumberToInt64(note they currently only handle
encoding/json's default float64/int/int64 and that if the upstream SSE decoder
switches to Decoder.UseNumber() you must add a json.Number case to parse via
.Int64()/String()). Optionally proactively add json.Number handling in
jsonNumberToInt/jsonNumberToInt64 if you want to be defensive.
Summary
delta.tool_callsid/type/name and concatenated argumentsVerification
go test ./internal/auditlog ./internal/streaming ./internal/responsecachego test ./...make test-race,go mod tidy, hot-path performance guard,make lintamazon.nova-micro-v1:0: audit row storedtool_calls[0]asget_weatherwith{"city":"Paris"},finish_reason=tool_calls, and usage total tokensSummary by CodeRabbit
Tests
Improvements