diff --git a/llm-access-kiro/src/anthropic/stream.rs b/llm-access-kiro/src/anthropic/stream/context.rs
similarity index 76%
rename from llm-access-kiro/src/anthropic/stream.rs
rename to llm-access-kiro/src/anthropic/stream/context.rs
index 07e11d6..3ee3210 100644
--- a/llm-access-kiro/src/anthropic/stream.rs
+++ b/llm-access-kiro/src/anthropic/stream/context.rs
@@ -1,114 +1,38 @@
-//! SSE event stream adapter for the Anthropic-compatible endpoint.
+//! Per-request streaming drivers.
//!
-//! Converts Kiro upstream binary events into Anthropic-compatible SSE events.
-//! Handles `` block extraction from inline content, tool_use block
-//! interleaving, and a buffered mode for Claude Code that collects all events
-//! before flushing (to rewrite input_tokens from context-usage feedback).
+//! `StreamContext` converts Kiro upstream events into Anthropic-compatible SSE
+//! events: it drives the block state machine, extracts inline thinking,
+//! manages tool_use blocks, synthesizes thinking signatures, and counts
+//! tokens. `BufferedStreamContext` collects everything for the Claude Code
+//! endpoint and rewrites input_tokens from context-usage feedback before
+//! flushing.
use std::collections::HashMap;
-use base64::{engine::general_purpose::STANDARD, Engine as _};
-use llm_access_core::store::DEFAULT_KIRO_CONTEXT_USAGE_MIN_REQUEST_TOKENS;
use serde_json::json;
-use sha2::{Digest, Sha512};
use uuid::Uuid;
-use super::converter::{get_context_window_size, ResponseModelIdentity};
-use crate::wire::{AssistantMessage, Event, ToolUseEntry};
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum KiroInputTokenSource {
- UpstreamContextUsage,
- LocalRequestEstimateFallback,
-}
-
-/// Kiro reports bridge/system prompt scaffolding inside contextUsage. For
-/// small client requests the request-side estimate is the user-visible source
-/// of truth; contextUsage remains useful for large context-window requests.
-pub const KIRO_CONTEXT_USAGE_MIN_REQUEST_TOKENS: u64 =
- DEFAULT_KIRO_CONTEXT_USAGE_MIN_REQUEST_TOKENS;
-
-pub fn anthropic_usage_json(
- input_tokens_total: i32,
- output_tokens: i32,
- cache_read_input_tokens: i32,
-) -> serde_json::Value {
- let input_tokens_total = input_tokens_total.max(0);
- let cache_read_input_tokens = cache_read_input_tokens.max(0).min(input_tokens_total);
- let non_cached_input_tokens_total = input_tokens_total.saturating_sub(cache_read_input_tokens);
- let cache_creation_input_tokens =
- if cache_read_input_tokens == 0 { non_cached_input_tokens_total / 2 } else { 0 };
- let input_tokens = non_cached_input_tokens_total.saturating_sub(cache_creation_input_tokens);
- json!({
- "input_tokens": input_tokens,
- "output_tokens": output_tokens.max(0),
- "cache_creation_input_tokens": cache_creation_input_tokens,
- "cache_read_input_tokens": cache_read_input_tokens,
- })
-}
-
-pub fn resolve_input_tokens(
- request_input_tokens: i32,
- context_input_tokens: Option,
-) -> (i32, KiroInputTokenSource) {
- resolve_input_tokens_with_threshold(
- request_input_tokens,
- context_input_tokens,
+use super::{
+ inline_thinking::{
+ build_inline_thinking_content_blocks, find_real_thinking_end_tag,
+ find_real_thinking_end_tag_at_buffer_end, find_real_thinking_start_tag,
+ strip_inline_thinking_content,
+ },
+ signature::synthetic_thinking_signature,
+ sse_event::SseEvent,
+ state::SseStateManager,
+ usage::{
+ anthropic_usage_json, resolve_input_tokens_with_threshold,
KIRO_CONTEXT_USAGE_MIN_REQUEST_TOKENS,
- )
-}
-
-pub fn resolve_input_tokens_with_threshold(
- request_input_tokens: i32,
- context_input_tokens: Option,
- context_usage_min_request_tokens: u64,
-) -> (i32, KiroInputTokenSource) {
- let request_input = request_input_tokens.max(0);
- if request_input as u64 <= context_usage_min_request_tokens {
- return (request_input, KiroInputTokenSource::LocalRequestEstimateFallback);
- }
-
- let context_input = context_input_tokens.unwrap_or_default().max(0);
- if context_input > 0 {
- (context_input, KiroInputTokenSource::UpstreamContextUsage)
- } else {
- (request_input, KiroInputTokenSource::LocalRequestEstimateFallback)
- }
-}
-
-/// A single Server-Sent Event with an event type and JSON data payload.
-#[derive(Debug, Clone)]
-pub struct SseEvent {
- pub event: String,
- pub data: serde_json::Value,
-}
-
-impl SseEvent {
- pub fn new(event: impl Into, data: serde_json::Value) -> Self {
- Self {
- event: event.into(),
- data,
- }
- }
-
- /// Formats this event as a standard SSE text frame (`event: ...\ndata:
- /// ...\n\n`).
- pub fn to_sse_string(&self) -> String {
- format!(
- "event: {}\ndata: {}\n\n",
- self.event,
- serde_json::to_string(&self.data).unwrap_or_default()
- )
- }
-}
-
-// Tracks the lifecycle of a single content block (text, thinking, tool_use).
-#[derive(Debug, Clone)]
-struct BlockState {
- block_type: String,
- started: bool,
- stopped: bool,
-}
+ },
+};
+use crate::{
+ anthropic::converter::{get_context_window_size, ResponseModelIdentity},
+ wire::{AssistantMessage, Event, ToolUseEntry},
+};
+
+/// Placeholder emitted when a thinking block would otherwise be empty.
+const SYNTHETIC_THINKING_PLACEHOLDER: &str = " ";
#[derive(Debug, Clone)]
struct ToolUseAccumulator {
@@ -117,181 +41,6 @@ struct ToolUseAccumulator {
input_buffer: String,
}
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub(super) enum InlineThinkingBlock {
- Thinking(String),
- Text(String),
-}
-
-const THINKING_SIGNATURE_DOMAIN: &[u8] =
- b"staticflow-kiro-anthropic-thinking-signature-anthropic-shape-v6\0";
-const THINKING_SIGNATURE_HEADER_KIND: u64 = 12;
-const THINKING_SIGNATURE_HEADER_MODE: u64 = 2;
-const THINKING_SIGNATURE_HEADER_BODY_LEN: usize = 64;
-const THINKING_SIGNATURE_HEADER_NONCE_LEN: usize = 12;
-const THINKING_SIGNATURE_HEADER_PROOF_LEN: usize = 48;
-const THINKING_SIGNATURE_BODY_MIN_LEN: usize = 619;
-const THINKING_SIGNATURE_BODY_MAX_LEN: usize = 8_192;
-const SYNTHETIC_THINKING_PLACEHOLDER: &str = " ";
-
-fn encode_proto_varint(mut value: u64, out: &mut Vec) {
- loop {
- let mut byte = (value & 0x7f) as u8;
- value >>= 7;
- if value != 0 {
- byte |= 0x80;
- }
- out.push(byte);
- if value == 0 {
- break;
- }
- }
-}
-
-fn encode_proto_key(field_number: u32, wire_type: u8, out: &mut Vec) {
- encode_proto_varint(((field_number as u64) << 3) | u64::from(wire_type), out);
-}
-
-fn proto_varint_len(mut value: usize) -> usize {
- let mut len = 1usize;
- while value >= 0x80 {
- value >>= 7;
- len += 1;
- }
- len
-}
-
-fn proto_bytes_field_encoded_len(field_number: u32, content_len: usize) -> usize {
- proto_varint_len(((field_number as usize) << 3) | 2)
- + proto_varint_len(content_len)
- + content_len
-}
-
-fn encode_proto_varint_field(field_number: u32, value: u64, out: &mut Vec) {
- encode_proto_key(field_number, 0, out);
- encode_proto_varint(value, out);
-}
-
-fn encode_proto_bytes_field(field_number: u32, value: &[u8], out: &mut Vec) {
- encode_proto_key(field_number, 2, out);
- encode_proto_varint(value.len() as u64, out);
- out.extend_from_slice(value);
-}
-
-fn derive_deterministic_signature_bytes(
- model: &str,
- thinking: &str,
- label: &[u8],
- len: usize,
-) -> Vec {
- let mut out = Vec::with_capacity(len);
- let mut counter = 0u32;
- while out.len() < len {
- let mut hasher = Sha512::new();
- hasher.update(THINKING_SIGNATURE_DOMAIN);
- hasher.update(label);
- hasher.update([0]);
- hasher.update(model.as_bytes());
- hasher.update([0]);
- hasher.update(thinking.as_bytes());
- hasher.update(counter.to_le_bytes());
- out.extend_from_slice(&hasher.finalize());
- counter = counter.wrapping_add(1);
- }
- out.truncate(len);
- out
-}
-
-fn signature_body_target_len(thinking: &str) -> usize {
- let thinking_len = thinking.len();
- thinking_len.clamp(THINKING_SIGNATURE_BODY_MIN_LEN, THINKING_SIGNATURE_BODY_MAX_LEN)
-}
-
-// Kiro exposes summarized thinking text but not Anthropic's encrypted thinking
-// signature. Emit a deterministic protobuf envelope that matches the field
-// layout used by recent Claude Code signatures observed locally:
-// outer field-2 payload + outer field-3=1
-// inner fields 1/2/3/4/5
-// header fields 1=12, 3=2, 5=64-byte body, 6=model string, 7=0.
-// This remains synthetic and is not a cryptographically valid signature.
-pub(super) fn synthetic_thinking_signature(model: &str, thinking: &str) -> String {
- let mut header = Vec::new();
- encode_proto_varint_field(1, THINKING_SIGNATURE_HEADER_KIND, &mut header);
- encode_proto_varint_field(3, THINKING_SIGNATURE_HEADER_MODE, &mut header);
- let header_body = derive_deterministic_signature_bytes(
- model,
- thinking,
- b"header-body",
- THINKING_SIGNATURE_HEADER_BODY_LEN,
- );
- encode_proto_bytes_field(5, &header_body, &mut header);
- encode_proto_bytes_field(6, model.as_bytes(), &mut header);
- encode_proto_varint_field(7, 0, &mut header);
-
- let field_2 = derive_deterministic_signature_bytes(
- model,
- thinking,
- b"field-2",
- THINKING_SIGNATURE_HEADER_NONCE_LEN,
- );
- let field_3 = derive_deterministic_signature_bytes(
- model,
- thinking,
- b"field-3",
- THINKING_SIGNATURE_HEADER_NONCE_LEN,
- );
- let field_4 = derive_deterministic_signature_bytes(
- model,
- thinking,
- b"field-4",
- THINKING_SIGNATURE_HEADER_PROOF_LEN,
- );
- let body_len = signature_body_target_len(thinking);
- let field_5 = derive_deterministic_signature_bytes(model, thinking, b"field-5", body_len);
- let fixed_payload_len = proto_bytes_field_encoded_len(1, header.len())
- + proto_bytes_field_encoded_len(2, field_2.len())
- + proto_bytes_field_encoded_len(3, field_3.len())
- + proto_bytes_field_encoded_len(4, field_4.len())
- + proto_bytes_field_encoded_len(5, field_5.len());
-
- let mut payload = Vec::new();
- encode_proto_bytes_field(1, &header, &mut payload);
- encode_proto_bytes_field(2, &field_2, &mut payload);
- encode_proto_bytes_field(3, &field_3, &mut payload);
- encode_proto_bytes_field(4, &field_4, &mut payload);
- encode_proto_bytes_field(5, &field_5, &mut payload);
- debug_assert_eq!(payload.len(), fixed_payload_len);
-
- let mut envelope = Vec::new();
- encode_proto_bytes_field(2, &payload, &mut envelope);
- encode_proto_varint_field(3, 1, &mut envelope);
-
- STANDARD.encode(envelope)
-}
-
-pub fn build_inline_thinking_content_blocks(
- content: &str,
- model: &str,
- thinking_enabled: bool,
-) -> Vec {
- let mut blocks = Vec::new();
- for block in split_inline_thinking_content(content, thinking_enabled) {
- match block {
- InlineThinkingBlock::Thinking(thinking) => blocks.push(json!({
- "type": "thinking",
- "thinking": thinking,
- "signature": synthetic_thinking_signature(model, &thinking),
- })),
- InlineThinkingBlock::Text(text) => {
- if !text.is_empty() {
- blocks.push(json!({"type": "text", "text": text}));
- }
- },
- }
- }
- blocks
-}
-
fn canonicalize_structured_output_json(input: &str) -> String {
let value = if input.is_empty() {
json!({})
@@ -301,184 +50,6 @@ fn canonicalize_structured_output_json(input: &str) -> String {
serde_json::to_string(&value).unwrap_or_else(|_| "{}".to_string())
}
-impl BlockState {
- fn new(block_type: impl Into) -> Self {
- Self {
- block_type: block_type.into(),
- started: false,
- stopped: false,
- }
- }
-}
-
-/// Manages SSE protocol state: tracks which blocks are open, ensures
-/// proper start/delta/stop sequencing, and generates final message events.
-#[derive(Debug)]
-pub struct SseStateManager {
- message_started: bool,
- message_delta_sent: bool,
- active_blocks: HashMap,
- message_ended: bool,
- next_block_index: i32,
- stop_reason: Option,
- has_tool_use: bool,
-}
-
-impl Default for SseStateManager {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl SseStateManager {
- pub fn new() -> Self {
- Self {
- message_started: false,
- message_delta_sent: false,
- active_blocks: HashMap::new(),
- message_ended: false,
- next_block_index: 0,
- stop_reason: None,
- has_tool_use: false,
- }
- }
-
- fn is_block_open_of_type(&self, index: i32, expected_type: &str) -> bool {
- self.active_blocks.get(&index).is_some_and(|block| {
- block.started && !block.stopped && block.block_type == expected_type
- })
- }
-
- pub fn next_block_index(&mut self) -> i32 {
- let index = self.next_block_index;
- self.next_block_index += 1;
- index
- }
-
- pub fn set_has_tool_use(&mut self, has_tool_use: bool) {
- self.has_tool_use = has_tool_use;
- }
-
- pub fn set_stop_reason(&mut self, reason: impl Into) {
- self.stop_reason = Some(reason.into());
- }
-
- fn has_non_thinking_blocks(&self) -> bool {
- self.active_blocks
- .values()
- .any(|block| block.block_type != "thinking")
- }
-
- pub fn get_stop_reason(&self) -> String {
- if let Some(reason) = &self.stop_reason {
- reason.clone()
- } else if self.has_tool_use {
- "tool_use".to_string()
- } else {
- "end_turn".to_string()
- }
- }
-
- pub fn handle_message_start(&mut self, event: serde_json::Value) -> Option {
- if self.message_started {
- return None;
- }
- self.message_started = true;
- Some(SseEvent::new("message_start", event))
- }
-
- pub fn handle_content_block_start(
- &mut self,
- index: i32,
- block_type: &str,
- data: serde_json::Value,
- ) -> Vec {
- let mut events = Vec::new();
- if block_type == "tool_use" {
- self.has_tool_use = true;
- for (block_index, block) in self.active_blocks.iter_mut() {
- if block.block_type == "text" && block.started && !block.stopped {
- events.push(SseEvent::new(
- "content_block_stop",
- json!({"type":"content_block_stop","index":block_index}),
- ));
- block.stopped = true;
- }
- }
- }
- if let Some(block) = self.active_blocks.get_mut(&index) {
- if block.started {
- return events;
- }
- block.started = true;
- } else {
- let mut block = BlockState::new(block_type);
- block.started = true;
- self.active_blocks.insert(index, block);
- }
- events.push(SseEvent::new("content_block_start", data));
- events
- }
-
- pub fn handle_content_block_delta(
- &mut self,
- index: i32,
- data: serde_json::Value,
- ) -> Option {
- let block = self.active_blocks.get(&index)?;
- if !block.started || block.stopped {
- return None;
- }
- Some(SseEvent::new("content_block_delta", data))
- }
-
- pub fn handle_content_block_stop(&mut self, index: i32) -> Option {
- let block = self.active_blocks.get_mut(&index)?;
- if block.stopped {
- return None;
- }
- block.stopped = true;
- Some(SseEvent::new(
- "content_block_stop",
- json!({"type":"content_block_stop","index":index}),
- ))
- }
-
- /// Closes any still-open blocks and emits `message_delta` + `message_stop`.
- pub fn generate_final_events(
- &mut self,
- input_tokens: i32,
- output_tokens: i32,
- ) -> Vec {
- let mut events = Vec::new();
- for (index, block) in self.active_blocks.iter_mut() {
- if block.started && !block.stopped {
- events.push(SseEvent::new(
- "content_block_stop",
- json!({"type":"content_block_stop","index":index}),
- ));
- block.stopped = true;
- }
- }
- if !self.message_delta_sent {
- self.message_delta_sent = true;
- events.push(SseEvent::new(
- "message_delta",
- json!({
- "type":"message_delta",
- "delta":{"stop_reason":self.get_stop_reason(),"stop_sequence":null},
- "usage":{"input_tokens":input_tokens,"output_tokens":output_tokens}
- }),
- ));
- }
- if !self.message_ended {
- self.message_ended = true;
- events.push(SseEvent::new("message_stop", json!({"type":"message_stop"})));
- }
- events
- }
-}
-
/// Per-request streaming context that converts Kiro events into SSE events.
///
/// Handles thinking block extraction from inline `` tags,
@@ -1495,161 +1066,23 @@ fn find_char_boundary(s: &str, target: usize) -> usize {
pos
}
-// Characters that indicate a tag is inside a quoted/escaped context
-// and should not be treated as a real thinking boundary.
-const QUOTE_CHARS: &[u8] = b"`\"'\\";
-
-// Checks whether the byte at `pos` is a quote/escape character.
-fn is_quote_char(buffer: &str, pos: usize) -> bool {
- buffer
- .as_bytes()
- .get(pos)
- .map(|value| QUOTE_CHARS.contains(value))
- .unwrap_or(false)
-}
-
-// Finds `` that is not inside quotes. Skips false positives
-// where the tag is adjacent to quote characters.
-fn find_real_thinking_start_tag(buffer: &str) -> Option {
- find_real_tag(buffer, "", false)
-}
-
-// Finds `` followed by `\n\n` (mid-stream boundary).
-// Returns None if the double-newline hasn't arrived yet (partial buffer).
-fn find_real_thinking_end_tag(buffer: &str) -> Option {
- const TAG: &str = "";
- let mut search_start = 0usize;
- while let Some(pos) = buffer[search_start..].find(TAG) {
- let absolute_pos = search_start + pos;
- let after_pos = absolute_pos + TAG.len();
- if (absolute_pos > 0 && is_quote_char(buffer, absolute_pos - 1))
- || is_quote_char(buffer, after_pos)
- {
- search_start = absolute_pos + 1;
- continue;
- }
- let after_content = &buffer[after_pos..];
- if after_content.len() < 2 {
- return None;
- }
- if after_content.starts_with("\n\n") {
- return Some(absolute_pos);
- }
- search_start = absolute_pos + 1;
- }
- None
-}
-
-// Finds `` at the end of the buffer (for tool_use or final flush),
-// where the double-newline requirement is relaxed to trailing whitespace.
-fn find_real_thinking_end_tag_at_buffer_end(buffer: &str) -> Option {
- const TAG: &str = "";
- let mut search_start = 0usize;
-
- while let Some(pos) = buffer[search_start..].find(TAG) {
- let absolute_pos = search_start + pos;
- let after_pos = absolute_pos + TAG.len();
- if (absolute_pos > 0 && is_quote_char(buffer, absolute_pos - 1))
- || is_quote_char(buffer, after_pos)
- {
- search_start = absolute_pos + 1;
- continue;
- }
- if buffer[after_pos..].trim().is_empty() {
- return Some(absolute_pos);
- }
- search_start = absolute_pos + 1;
- }
-
- None
-}
-
-fn find_real_tag(buffer: &str, tag: &str, require_double_newline_after: bool) -> Option {
- let mut search_start = 0usize;
- while let Some(pos) = buffer[search_start..].find(tag) {
- let absolute_pos = search_start + pos;
- let after_pos = absolute_pos + tag.len();
- if (absolute_pos > 0 && is_quote_char(buffer, absolute_pos - 1))
- || is_quote_char(buffer, after_pos)
- {
- search_start = absolute_pos + 1;
- continue;
- }
- if require_double_newline_after {
- let after_content = &buffer[after_pos..];
- if after_content.len() < 2 {
- return None;
- }
- if !after_content.starts_with("\n\n") {
- search_start = absolute_pos + 1;
- continue;
- }
- }
- return Some(absolute_pos);
- }
- None
-}
-
-pub(super) fn split_inline_thinking_content(
- content: &str,
- thinking_enabled: bool,
-) -> Vec {
- if content.is_empty() {
- return Vec::new();
- }
- if !thinking_enabled {
- return vec![InlineThinkingBlock::Text(content.to_string())];
- }
-
- let Some(start_pos) = find_real_thinking_start_tag(content) else {
- return vec![InlineThinkingBlock::Text(content.to_string())];
- };
-
- let mut blocks = Vec::new();
- let before = &content[..start_pos];
- if !before.trim().is_empty() {
- blocks.push(InlineThinkingBlock::Text(before.to_string()));
- }
+#[cfg(test)]
+mod tests {
+ use std::collections::HashMap;
- let mut remaining = &content[start_pos + "".len()..];
- if remaining.starts_with('\n') {
- remaining = &remaining[1..];
- }
+ use base64::{engine::general_purpose::STANDARD, Engine as _};
+ use serde_json::json;
- let end_pos = if let Some(end_pos) = find_real_thinking_end_tag(remaining) {
- end_pos
- } else if let Some(end_pos) = find_real_thinking_end_tag_at_buffer_end(remaining) {
- end_pos
- } else {
- return vec![InlineThinkingBlock::Text(content.to_string())];
+ use super::{
+ build_inline_thinking_content_blocks, synthetic_thinking_signature, BufferedStreamContext,
+ ResponseModelIdentity, SseEvent, StreamContext,
};
-
- blocks.push(InlineThinkingBlock::Thinking(remaining[..end_pos].to_string()));
-
- let after_tag = &remaining[end_pos + "".len()..];
- let after_thinking = after_tag.strip_prefix("\n\n").unwrap_or(after_tag);
- if !after_thinking.is_empty() {
- blocks.push(InlineThinkingBlock::Text(after_thinking.to_string()));
- }
-
- blocks
-}
-
-fn strip_inline_thinking_content(content: &str) -> String {
- split_inline_thinking_content(content, true)
- .into_iter()
- .filter_map(|block| match block {
- InlineThinkingBlock::Text(text) => Some(text),
- InlineThinkingBlock::Thinking(_) => None,
- })
- .collect::>()
- .join("")
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
use crate::{
+ anthropic::stream::signature::{
+ THINKING_SIGNATURE_BODY_MIN_LEN, THINKING_SIGNATURE_HEADER_BODY_LEN,
+ THINKING_SIGNATURE_HEADER_KIND, THINKING_SIGNATURE_HEADER_MODE,
+ THINKING_SIGNATURE_HEADER_NONCE_LEN, THINKING_SIGNATURE_HEADER_PROOF_LEN,
+ },
parser::{
frame::Frame,
header::{HeaderValue, Headers},
@@ -1657,64 +1090,6 @@ mod tests {
wire::{ContextUsageEvent, Event, MeteringEvent, ToolUseEvent},
};
- #[test]
- fn resolve_input_tokens_prefers_request_estimate_for_small_requests() {
- let (input_tokens, source) = resolve_input_tokens(18, Some(4_118));
-
- assert_eq!(input_tokens, 18);
- assert_eq!(source, KiroInputTokenSource::LocalRequestEstimateFallback);
- }
-
- #[test]
- fn resolve_input_tokens_prefers_request_estimate_for_inflated_small_context_usage() {
- let (input_tokens, source) = resolve_input_tokens(148, Some(8_008));
-
- assert_eq!(input_tokens, 148);
- assert_eq!(source, KiroInputTokenSource::LocalRequestEstimateFallback);
- }
-
- #[test]
- fn resolve_input_tokens_prefers_request_estimate_for_small_request_when_context_exceeds_local()
- {
- let (input_tokens, source) = resolve_input_tokens(1_000, Some(6_000));
-
- assert_eq!(input_tokens, 1_000);
- assert_eq!(source, KiroInputTokenSource::LocalRequestEstimateFallback);
- }
-
- #[test]
- fn resolve_input_tokens_uses_context_usage_above_default_threshold() {
- let (input_tokens, source) = resolve_input_tokens(16_000, Some(20_000));
-
- assert_eq!(input_tokens, 20_000);
- assert_eq!(source, KiroInputTokenSource::UpstreamContextUsage);
- }
-
- #[test]
- fn resolve_input_tokens_respects_configured_threshold() {
- let (input_tokens, source) =
- resolve_input_tokens_with_threshold(16_000, Some(20_000), 50_000);
-
- assert_eq!(input_tokens, 16_000);
- assert_eq!(source, KiroInputTokenSource::LocalRequestEstimateFallback);
- }
-
- #[test]
- fn resolve_input_tokens_keeps_upstream_context_for_large_requests() {
- let (input_tokens, source) = resolve_input_tokens(60_000, Some(90_000));
-
- assert_eq!(input_tokens, 90_000);
- assert_eq!(source, KiroInputTokenSource::UpstreamContextUsage);
- }
-
- #[test]
- fn resolve_input_tokens_falls_back_to_local_request_without_context_usage() {
- let (input_tokens, source) = resolve_input_tokens(123, None);
-
- assert_eq!(input_tokens, 123);
- assert_eq!(source, KiroInputTokenSource::LocalRequestEstimateFallback);
- }
-
fn collect_delta_text(events: &[SseEvent], delta_type: &str, field: &str) -> String {
events
.iter()
@@ -1847,28 +1222,6 @@ mod tests {
);
}
- #[test]
- fn sse_event_format_is_valid() {
- let event = SseEvent::new("message_start", json!({"type": "message_start"}));
- let sse = event.to_sse_string();
- assert!(sse.starts_with("event: message_start\n"));
- assert!(sse.contains("data: "));
- assert!(sse.ends_with("\n\n"));
- }
-
- #[test]
- fn split_inline_thinking_content_extracts_non_stream_blocks() {
- let blocks = split_inline_thinking_content(
- "\nCount carefully.\n\n\nbeta",
- true,
- );
-
- assert_eq!(blocks, vec![
- InlineThinkingBlock::Thinking("Count carefully.\n".to_string()),
- InlineThinkingBlock::Text("beta".to_string()),
- ]);
- }
-
#[test]
fn build_inline_thinking_content_blocks_attach_signature() {
let blocks = build_inline_thinking_content_blocks(
diff --git a/llm-access-kiro/src/anthropic/stream/inline_thinking.rs b/llm-access-kiro/src/anthropic/stream/inline_thinking.rs
new file mode 100644
index 0000000..d85fa4d
--- /dev/null
+++ b/llm-access-kiro/src/anthropic/stream/inline_thinking.rs
@@ -0,0 +1,208 @@
+//! Inline `` block extraction.
+//!
+//! Splits assistant content containing inline `...` spans
+//! into thinking/text segments (quote-aware so escaped tags are ignored), and
+//! builds Anthropic content blocks with synthetic signatures attached.
+
+use serde_json::json;
+
+use super::signature::synthetic_thinking_signature;
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+enum InlineThinkingBlock {
+ Thinking(String),
+ Text(String),
+}
+
+pub fn build_inline_thinking_content_blocks(
+ content: &str,
+ model: &str,
+ thinking_enabled: bool,
+) -> Vec {
+ let mut blocks = Vec::new();
+ for block in split_inline_thinking_content(content, thinking_enabled) {
+ match block {
+ InlineThinkingBlock::Thinking(thinking) => blocks.push(json!({
+ "type": "thinking",
+ "thinking": thinking,
+ "signature": synthetic_thinking_signature(model, &thinking),
+ })),
+ InlineThinkingBlock::Text(text) => {
+ if !text.is_empty() {
+ blocks.push(json!({"type": "text", "text": text}));
+ }
+ },
+ }
+ }
+ blocks
+}
+
+// Characters that indicate a tag is inside a quoted/escaped context
+// and should not be treated as a real thinking boundary.
+const QUOTE_CHARS: &[u8] = b"`\"'\\";
+
+// Checks whether the byte at `pos` is a quote/escape character.
+fn is_quote_char(buffer: &str, pos: usize) -> bool {
+ buffer
+ .as_bytes()
+ .get(pos)
+ .map(|value| QUOTE_CHARS.contains(value))
+ .unwrap_or(false)
+}
+
+/// Finds `` that is not inside quotes. Skips false positives
+/// where the tag is adjacent to quote characters.
+pub fn find_real_thinking_start_tag(buffer: &str) -> Option {
+ find_real_tag(buffer, "", false)
+}
+
+/// Finds `` followed by `\n\n` (mid-stream boundary).
+/// Returns None if the double-newline hasn't arrived yet (partial buffer).
+pub fn find_real_thinking_end_tag(buffer: &str) -> Option {
+ const TAG: &str = "";
+ let mut search_start = 0usize;
+ while let Some(pos) = buffer[search_start..].find(TAG) {
+ let absolute_pos = search_start + pos;
+ let after_pos = absolute_pos + TAG.len();
+ if (absolute_pos > 0 && is_quote_char(buffer, absolute_pos - 1))
+ || is_quote_char(buffer, after_pos)
+ {
+ search_start = absolute_pos + 1;
+ continue;
+ }
+ let after_content = &buffer[after_pos..];
+ if after_content.len() < 2 {
+ return None;
+ }
+ if after_content.starts_with("\n\n") {
+ return Some(absolute_pos);
+ }
+ search_start = absolute_pos + 1;
+ }
+ None
+}
+
+/// Finds `` at the end of the buffer (for tool_use or final flush),
+/// where the double-newline requirement is relaxed to trailing whitespace.
+pub fn find_real_thinking_end_tag_at_buffer_end(buffer: &str) -> Option {
+ const TAG: &str = "";
+ let mut search_start = 0usize;
+
+ while let Some(pos) = buffer[search_start..].find(TAG) {
+ let absolute_pos = search_start + pos;
+ let after_pos = absolute_pos + TAG.len();
+ if (absolute_pos > 0 && is_quote_char(buffer, absolute_pos - 1))
+ || is_quote_char(buffer, after_pos)
+ {
+ search_start = absolute_pos + 1;
+ continue;
+ }
+ if buffer[after_pos..].trim().is_empty() {
+ return Some(absolute_pos);
+ }
+ search_start = absolute_pos + 1;
+ }
+
+ None
+}
+
+fn find_real_tag(buffer: &str, tag: &str, require_double_newline_after: bool) -> Option {
+ let mut search_start = 0usize;
+ while let Some(pos) = buffer[search_start..].find(tag) {
+ let absolute_pos = search_start + pos;
+ let after_pos = absolute_pos + tag.len();
+ if (absolute_pos > 0 && is_quote_char(buffer, absolute_pos - 1))
+ || is_quote_char(buffer, after_pos)
+ {
+ search_start = absolute_pos + 1;
+ continue;
+ }
+ if require_double_newline_after {
+ let after_content = &buffer[after_pos..];
+ if after_content.len() < 2 {
+ return None;
+ }
+ if !after_content.starts_with("\n\n") {
+ search_start = absolute_pos + 1;
+ continue;
+ }
+ }
+ return Some(absolute_pos);
+ }
+ None
+}
+
+fn split_inline_thinking_content(
+ content: &str,
+ thinking_enabled: bool,
+) -> Vec {
+ if content.is_empty() {
+ return Vec::new();
+ }
+ if !thinking_enabled {
+ return vec![InlineThinkingBlock::Text(content.to_string())];
+ }
+
+ let Some(start_pos) = find_real_thinking_start_tag(content) else {
+ return vec![InlineThinkingBlock::Text(content.to_string())];
+ };
+
+ let mut blocks = Vec::new();
+ let before = &content[..start_pos];
+ if !before.trim().is_empty() {
+ blocks.push(InlineThinkingBlock::Text(before.to_string()));
+ }
+
+ let mut remaining = &content[start_pos + "".len()..];
+ if remaining.starts_with('\n') {
+ remaining = &remaining[1..];
+ }
+
+ let end_pos = if let Some(end_pos) = find_real_thinking_end_tag(remaining) {
+ end_pos
+ } else if let Some(end_pos) = find_real_thinking_end_tag_at_buffer_end(remaining) {
+ end_pos
+ } else {
+ return vec![InlineThinkingBlock::Text(content.to_string())];
+ };
+
+ blocks.push(InlineThinkingBlock::Thinking(remaining[..end_pos].to_string()));
+
+ let after_tag = &remaining[end_pos + "".len()..];
+ let after_thinking = after_tag.strip_prefix("\n\n").unwrap_or(after_tag);
+ if !after_thinking.is_empty() {
+ blocks.push(InlineThinkingBlock::Text(after_thinking.to_string()));
+ }
+
+ blocks
+}
+
+/// Returns `content` with any inline `` spans removed, keeping only
+/// the surrounding text segments.
+pub fn strip_inline_thinking_content(content: &str) -> String {
+ split_inline_thinking_content(content, true)
+ .into_iter()
+ .filter_map(|block| match block {
+ InlineThinkingBlock::Text(text) => Some(text),
+ InlineThinkingBlock::Thinking(_) => None,
+ })
+ .collect::()
+}
+
+#[cfg(test)]
+mod tests {
+ use super::{split_inline_thinking_content, InlineThinkingBlock};
+
+ #[test]
+ fn split_inline_thinking_content_extracts_non_stream_blocks() {
+ let blocks = split_inline_thinking_content(
+ "\nCount carefully.\n\n\nbeta",
+ true,
+ );
+
+ assert_eq!(blocks, vec![
+ InlineThinkingBlock::Thinking("Count carefully.\n".to_string()),
+ InlineThinkingBlock::Text("beta".to_string()),
+ ]);
+ }
+}
diff --git a/llm-access-kiro/src/anthropic/stream/mod.rs b/llm-access-kiro/src/anthropic/stream/mod.rs
new file mode 100644
index 0000000..4862f8b
--- /dev/null
+++ b/llm-access-kiro/src/anthropic/stream/mod.rs
@@ -0,0 +1,30 @@
+//! Anthropic-compatible SSE stream adapter.
+//!
+//! Converts Kiro upstream binary events into Anthropic-compatible SSE events,
+//! split into focused submodules:
+//!
+//! ```text
+//! Kiro Event
+//! -> [usage] input-token resolution + usage JSON
+//! -> [sse_event] SSE wire value
+//! -> [signature] synthetic thinking signatures
+//! -> [inline_thinking] tag extraction
+//! -> [state] SSE block state machine
+//! -> [context] StreamContext / BufferedStreamContext drivers
+//! ```
+
+mod context;
+mod inline_thinking;
+mod signature;
+mod sse_event;
+mod state;
+mod usage;
+
+pub use context::{BufferedStreamContext, StreamContext};
+pub use inline_thinking::build_inline_thinking_content_blocks;
+pub use sse_event::SseEvent;
+pub use state::SseStateManager;
+pub use usage::{
+ anthropic_usage_json, resolve_input_tokens, resolve_input_tokens_with_threshold,
+ KiroInputTokenSource, KIRO_CONTEXT_USAGE_MIN_REQUEST_TOKENS,
+};
diff --git a/llm-access-kiro/src/anthropic/stream/signature.rs b/llm-access-kiro/src/anthropic/stream/signature.rs
new file mode 100644
index 0000000..10f0258
--- /dev/null
+++ b/llm-access-kiro/src/anthropic/stream/signature.rs
@@ -0,0 +1,161 @@
+//! Synthetic Anthropic thinking-signature synthesis.
+//!
+//! Kiro exposes summarized thinking text but not Anthropic's encrypted
+//! signature. This module emits a deterministic protobuf envelope matching the
+//! observed Claude Code field layout. It is synthetic, not cryptographically
+//! valid.
+
+use base64::{engine::general_purpose::STANDARD, Engine as _};
+use sha2::{Digest, Sha512};
+
+const THINKING_SIGNATURE_DOMAIN: &[u8] =
+ b"staticflow-kiro-anthropic-thinking-signature-anthropic-shape-v6\0";
+/// Protobuf header field-1 value identifying the signature kind.
+pub const THINKING_SIGNATURE_HEADER_KIND: u64 = 12;
+/// Protobuf header field-3 value identifying the signature mode.
+pub const THINKING_SIGNATURE_HEADER_MODE: u64 = 2;
+/// Byte length of the header field-5 body block.
+pub const THINKING_SIGNATURE_HEADER_BODY_LEN: usize = 64;
+/// Byte length of the inner nonce fields (2 and 3).
+pub const THINKING_SIGNATURE_HEADER_NONCE_LEN: usize = 12;
+/// Byte length of the inner proof field (4).
+pub const THINKING_SIGNATURE_HEADER_PROOF_LEN: usize = 48;
+/// Minimum byte length of the inner signature body field (5).
+pub const THINKING_SIGNATURE_BODY_MIN_LEN: usize = 619;
+const THINKING_SIGNATURE_BODY_MAX_LEN: usize = 8_192;
+
+fn encode_proto_varint(mut value: u64, out: &mut Vec) {
+ loop {
+ let mut byte = (value & 0x7f) as u8;
+ value >>= 7;
+ if value != 0 {
+ byte |= 0x80;
+ }
+ out.push(byte);
+ if value == 0 {
+ break;
+ }
+ }
+}
+
+fn encode_proto_key(field_number: u32, wire_type: u8, out: &mut Vec) {
+ encode_proto_varint(((field_number as u64) << 3) | u64::from(wire_type), out);
+}
+
+fn proto_varint_len(mut value: usize) -> usize {
+ let mut len = 1usize;
+ while value >= 0x80 {
+ value >>= 7;
+ len += 1;
+ }
+ len
+}
+
+fn proto_bytes_field_encoded_len(field_number: u32, content_len: usize) -> usize {
+ proto_varint_len(((field_number as usize) << 3) | 2)
+ + proto_varint_len(content_len)
+ + content_len
+}
+
+fn encode_proto_varint_field(field_number: u32, value: u64, out: &mut Vec) {
+ encode_proto_key(field_number, 0, out);
+ encode_proto_varint(value, out);
+}
+
+fn encode_proto_bytes_field(field_number: u32, value: &[u8], out: &mut Vec) {
+ encode_proto_key(field_number, 2, out);
+ encode_proto_varint(value.len() as u64, out);
+ out.extend_from_slice(value);
+}
+
+fn derive_deterministic_signature_bytes(
+ model: &str,
+ thinking: &str,
+ label: &[u8],
+ len: usize,
+) -> Vec {
+ let mut out = Vec::with_capacity(len);
+ let mut counter = 0u32;
+ while out.len() < len {
+ let mut hasher = Sha512::new();
+ hasher.update(THINKING_SIGNATURE_DOMAIN);
+ hasher.update(label);
+ hasher.update([0]);
+ hasher.update(model.as_bytes());
+ hasher.update([0]);
+ hasher.update(thinking.as_bytes());
+ hasher.update(counter.to_le_bytes());
+ out.extend_from_slice(&hasher.finalize());
+ counter = counter.wrapping_add(1);
+ }
+ out.truncate(len);
+ out
+}
+
+fn signature_body_target_len(thinking: &str) -> usize {
+ let thinking_len = thinking.len();
+ thinking_len.clamp(THINKING_SIGNATURE_BODY_MIN_LEN, THINKING_SIGNATURE_BODY_MAX_LEN)
+}
+
+/// Build a deterministic protobuf envelope matching the field layout of recent
+/// Claude Code signatures observed locally:
+/// outer field-2 payload + outer field-3=1; inner fields 1/2/3/4/5; header
+/// fields 1=12, 3=2, 5=64-byte body, 6=model string, 7=0.
+///
+/// Kiro exposes summarized thinking text but not Anthropic's encrypted
+/// signature. This remains synthetic and is not a cryptographically valid
+/// signature.
+pub fn synthetic_thinking_signature(model: &str, thinking: &str) -> String {
+ let mut header = Vec::new();
+ encode_proto_varint_field(1, THINKING_SIGNATURE_HEADER_KIND, &mut header);
+ encode_proto_varint_field(3, THINKING_SIGNATURE_HEADER_MODE, &mut header);
+ let header_body = derive_deterministic_signature_bytes(
+ model,
+ thinking,
+ b"header-body",
+ THINKING_SIGNATURE_HEADER_BODY_LEN,
+ );
+ encode_proto_bytes_field(5, &header_body, &mut header);
+ encode_proto_bytes_field(6, model.as_bytes(), &mut header);
+ encode_proto_varint_field(7, 0, &mut header);
+
+ let field_2 = derive_deterministic_signature_bytes(
+ model,
+ thinking,
+ b"field-2",
+ THINKING_SIGNATURE_HEADER_NONCE_LEN,
+ );
+ let field_3 = derive_deterministic_signature_bytes(
+ model,
+ thinking,
+ b"field-3",
+ THINKING_SIGNATURE_HEADER_NONCE_LEN,
+ );
+ let field_4 = derive_deterministic_signature_bytes(
+ model,
+ thinking,
+ b"field-4",
+ THINKING_SIGNATURE_HEADER_PROOF_LEN,
+ );
+ let body_len = signature_body_target_len(thinking);
+ let field_5 = derive_deterministic_signature_bytes(model, thinking, b"field-5", body_len);
+ let fixed_payload_len = proto_bytes_field_encoded_len(1, header.len())
+ + proto_bytes_field_encoded_len(2, field_2.len())
+ + proto_bytes_field_encoded_len(3, field_3.len())
+ + proto_bytes_field_encoded_len(4, field_4.len())
+ + proto_bytes_field_encoded_len(5, field_5.len());
+
+ let mut payload = Vec::new();
+ encode_proto_bytes_field(1, &header, &mut payload);
+ encode_proto_bytes_field(2, &field_2, &mut payload);
+ encode_proto_bytes_field(3, &field_3, &mut payload);
+ encode_proto_bytes_field(4, &field_4, &mut payload);
+ encode_proto_bytes_field(5, &field_5, &mut payload);
+ debug_assert_eq!(payload.len(), fixed_payload_len);
+
+ let mut envelope = Vec::new();
+ encode_proto_bytes_field(2, &payload, &mut envelope);
+ encode_proto_varint_field(3, 1, &mut envelope);
+
+ STANDARD.encode(envelope)
+}
diff --git a/llm-access-kiro/src/anthropic/stream/sse_event.rs b/llm-access-kiro/src/anthropic/stream/sse_event.rs
new file mode 100644
index 0000000..2607f47
--- /dev/null
+++ b/llm-access-kiro/src/anthropic/stream/sse_event.rs
@@ -0,0 +1,43 @@
+//! Anthropic-compatible Server-Sent Event value type.
+
+/// A single Server-Sent Event with an event type and JSON data payload.
+#[derive(Debug, Clone)]
+pub struct SseEvent {
+ pub event: String,
+ pub data: serde_json::Value,
+}
+
+impl SseEvent {
+ pub fn new(event: impl Into, data: serde_json::Value) -> Self {
+ Self {
+ event: event.into(),
+ data,
+ }
+ }
+
+ /// Formats this event as a standard SSE text frame (`event: ...\ndata:
+ /// ...\n\n`).
+ pub fn to_sse_string(&self) -> String {
+ format!(
+ "event: {}\ndata: {}\n\n",
+ self.event,
+ serde_json::to_string(&self.data).unwrap_or_default()
+ )
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use serde_json::json;
+
+ use super::SseEvent;
+
+ #[test]
+ fn sse_event_format_is_valid() {
+ let event = SseEvent::new("message_start", json!({"type": "message_start"}));
+ let sse = event.to_sse_string();
+ assert!(sse.starts_with("event: message_start\n"));
+ assert!(sse.contains("data: "));
+ assert!(sse.ends_with("\n\n"));
+ }
+}
diff --git a/llm-access-kiro/src/anthropic/stream/state.rs b/llm-access-kiro/src/anthropic/stream/state.rs
new file mode 100644
index 0000000..834cf69
--- /dev/null
+++ b/llm-access-kiro/src/anthropic/stream/state.rs
@@ -0,0 +1,198 @@
+//! SSE protocol state machine.
+//!
+//! Tracks open content blocks and enforces start/delta/stop sequencing, then
+//! emits the closing `message_delta` + `message_stop` events.
+
+use std::collections::HashMap;
+
+use serde_json::json;
+
+use super::sse_event::SseEvent;
+
+// Tracks the lifecycle of a single content block (text, thinking, tool_use).
+#[derive(Debug, Clone)]
+struct BlockState {
+ block_type: String,
+ started: bool,
+ stopped: bool,
+}
+
+impl BlockState {
+ fn new(block_type: impl Into) -> Self {
+ Self {
+ block_type: block_type.into(),
+ started: false,
+ stopped: false,
+ }
+ }
+}
+
+/// Manages SSE protocol state: tracks which blocks are open, ensures
+/// proper start/delta/stop sequencing, and generates final message events.
+#[derive(Debug)]
+pub struct SseStateManager {
+ message_started: bool,
+ message_delta_sent: bool,
+ active_blocks: HashMap,
+ message_ended: bool,
+ next_block_index: i32,
+ stop_reason: Option,
+ has_tool_use: bool,
+}
+
+impl Default for SseStateManager {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl SseStateManager {
+ pub fn new() -> Self {
+ Self {
+ message_started: false,
+ message_delta_sent: false,
+ active_blocks: HashMap::new(),
+ message_ended: false,
+ next_block_index: 0,
+ stop_reason: None,
+ has_tool_use: false,
+ }
+ }
+
+ /// Whether the block at `index` is currently open and of `expected_type`.
+ pub fn is_block_open_of_type(&self, index: i32, expected_type: &str) -> bool {
+ self.active_blocks.get(&index).is_some_and(|block| {
+ block.started && !block.stopped && block.block_type == expected_type
+ })
+ }
+
+ pub fn next_block_index(&mut self) -> i32 {
+ let index = self.next_block_index;
+ self.next_block_index += 1;
+ index
+ }
+
+ pub fn set_has_tool_use(&mut self, has_tool_use: bool) {
+ self.has_tool_use = has_tool_use;
+ }
+
+ pub fn set_stop_reason(&mut self, reason: impl Into) {
+ self.stop_reason = Some(reason.into());
+ }
+
+ /// Whether any currently-open block is something other than `thinking`.
+ pub fn has_non_thinking_blocks(&self) -> bool {
+ self.active_blocks
+ .values()
+ .any(|block| block.block_type != "thinking")
+ }
+
+ pub fn get_stop_reason(&self) -> String {
+ if let Some(reason) = &self.stop_reason {
+ reason.clone()
+ } else if self.has_tool_use {
+ "tool_use".to_string()
+ } else {
+ "end_turn".to_string()
+ }
+ }
+
+ pub fn handle_message_start(&mut self, event: serde_json::Value) -> Option {
+ if self.message_started {
+ return None;
+ }
+ self.message_started = true;
+ Some(SseEvent::new("message_start", event))
+ }
+
+ pub fn handle_content_block_start(
+ &mut self,
+ index: i32,
+ block_type: &str,
+ data: serde_json::Value,
+ ) -> Vec {
+ let mut events = Vec::new();
+ if block_type == "tool_use" {
+ self.has_tool_use = true;
+ for (block_index, block) in self.active_blocks.iter_mut() {
+ if block.block_type == "text" && block.started && !block.stopped {
+ events.push(SseEvent::new(
+ "content_block_stop",
+ json!({"type":"content_block_stop","index":block_index}),
+ ));
+ block.stopped = true;
+ }
+ }
+ }
+ if let Some(block) = self.active_blocks.get_mut(&index) {
+ if block.started {
+ return events;
+ }
+ block.started = true;
+ } else {
+ let mut block = BlockState::new(block_type);
+ block.started = true;
+ self.active_blocks.insert(index, block);
+ }
+ events.push(SseEvent::new("content_block_start", data));
+ events
+ }
+
+ pub fn handle_content_block_delta(
+ &mut self,
+ index: i32,
+ data: serde_json::Value,
+ ) -> Option {
+ let block = self.active_blocks.get(&index)?;
+ if !block.started || block.stopped {
+ return None;
+ }
+ Some(SseEvent::new("content_block_delta", data))
+ }
+
+ pub fn handle_content_block_stop(&mut self, index: i32) -> Option {
+ let block = self.active_blocks.get_mut(&index)?;
+ if block.stopped {
+ return None;
+ }
+ block.stopped = true;
+ Some(SseEvent::new(
+ "content_block_stop",
+ json!({"type":"content_block_stop","index":index}),
+ ))
+ }
+
+ /// Closes any still-open blocks and emits `message_delta` + `message_stop`.
+ pub fn generate_final_events(
+ &mut self,
+ input_tokens: i32,
+ output_tokens: i32,
+ ) -> Vec {
+ let mut events = Vec::new();
+ for (index, block) in self.active_blocks.iter_mut() {
+ if block.started && !block.stopped {
+ events.push(SseEvent::new(
+ "content_block_stop",
+ json!({"type":"content_block_stop","index":index}),
+ ));
+ block.stopped = true;
+ }
+ }
+ if !self.message_delta_sent {
+ self.message_delta_sent = true;
+ events.push(SseEvent::new(
+ "message_delta",
+ json!({
+ "type":"message_delta",
+ "delta":{"stop_reason":self.get_stop_reason(),"stop_sequence":null},
+ "usage":{"input_tokens":input_tokens,"output_tokens":output_tokens}
+ }),
+ ));
+ }
+ if !self.message_ended {
+ self.message_ended = true;
+ events.push(SseEvent::new("message_stop", json!({"type":"message_stop"})));
+ }
+ events
+ }
+}
diff --git a/llm-access-kiro/src/anthropic/stream/usage.rs b/llm-access-kiro/src/anthropic/stream/usage.rs
new file mode 100644
index 0000000..6fa2652
--- /dev/null
+++ b/llm-access-kiro/src/anthropic/stream/usage.rs
@@ -0,0 +1,131 @@
+//! Input-token accounting for the Anthropic-compatible Kiro endpoint.
+//!
+//! Resolves the user-visible input-token count from the local request estimate
+//! versus Kiro's upstream contextUsage feedback, and shapes the Anthropic
+//! `usage` JSON object.
+
+use llm_access_core::store::DEFAULT_KIRO_CONTEXT_USAGE_MIN_REQUEST_TOKENS;
+use serde_json::json;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum KiroInputTokenSource {
+ UpstreamContextUsage,
+ LocalRequestEstimateFallback,
+}
+
+/// Kiro reports bridge/system prompt scaffolding inside contextUsage. For
+/// small client requests the request-side estimate is the user-visible source
+/// of truth; contextUsage remains useful for large context-window requests.
+pub const KIRO_CONTEXT_USAGE_MIN_REQUEST_TOKENS: u64 =
+ DEFAULT_KIRO_CONTEXT_USAGE_MIN_REQUEST_TOKENS;
+
+pub fn anthropic_usage_json(
+ input_tokens_total: i32,
+ output_tokens: i32,
+ cache_read_input_tokens: i32,
+) -> serde_json::Value {
+ let input_tokens_total = input_tokens_total.max(0);
+ let cache_read_input_tokens = cache_read_input_tokens.max(0).min(input_tokens_total);
+ let non_cached_input_tokens_total = input_tokens_total.saturating_sub(cache_read_input_tokens);
+ let cache_creation_input_tokens =
+ if cache_read_input_tokens == 0 { non_cached_input_tokens_total / 2 } else { 0 };
+ let input_tokens = non_cached_input_tokens_total.saturating_sub(cache_creation_input_tokens);
+ json!({
+ "input_tokens": input_tokens,
+ "output_tokens": output_tokens.max(0),
+ "cache_creation_input_tokens": cache_creation_input_tokens,
+ "cache_read_input_tokens": cache_read_input_tokens,
+ })
+}
+
+pub fn resolve_input_tokens(
+ request_input_tokens: i32,
+ context_input_tokens: Option,
+) -> (i32, KiroInputTokenSource) {
+ resolve_input_tokens_with_threshold(
+ request_input_tokens,
+ context_input_tokens,
+ KIRO_CONTEXT_USAGE_MIN_REQUEST_TOKENS,
+ )
+}
+
+pub fn resolve_input_tokens_with_threshold(
+ request_input_tokens: i32,
+ context_input_tokens: Option,
+ context_usage_min_request_tokens: u64,
+) -> (i32, KiroInputTokenSource) {
+ let request_input = request_input_tokens.max(0);
+ if request_input as u64 <= context_usage_min_request_tokens {
+ return (request_input, KiroInputTokenSource::LocalRequestEstimateFallback);
+ }
+
+ let context_input = context_input_tokens.unwrap_or_default().max(0);
+ if context_input > 0 {
+ (context_input, KiroInputTokenSource::UpstreamContextUsage)
+ } else {
+ (request_input, KiroInputTokenSource::LocalRequestEstimateFallback)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::{resolve_input_tokens, resolve_input_tokens_with_threshold, KiroInputTokenSource};
+
+ #[test]
+ fn resolve_input_tokens_prefers_request_estimate_for_small_requests() {
+ let (input_tokens, source) = resolve_input_tokens(18, Some(4_118));
+
+ assert_eq!(input_tokens, 18);
+ assert_eq!(source, KiroInputTokenSource::LocalRequestEstimateFallback);
+ }
+
+ #[test]
+ fn resolve_input_tokens_prefers_request_estimate_for_inflated_small_context_usage() {
+ let (input_tokens, source) = resolve_input_tokens(148, Some(8_008));
+
+ assert_eq!(input_tokens, 148);
+ assert_eq!(source, KiroInputTokenSource::LocalRequestEstimateFallback);
+ }
+
+ #[test]
+ fn resolve_input_tokens_prefers_request_estimate_for_small_request_when_context_exceeds_local()
+ {
+ let (input_tokens, source) = resolve_input_tokens(1_000, Some(6_000));
+
+ assert_eq!(input_tokens, 1_000);
+ assert_eq!(source, KiroInputTokenSource::LocalRequestEstimateFallback);
+ }
+
+ #[test]
+ fn resolve_input_tokens_uses_context_usage_above_default_threshold() {
+ let (input_tokens, source) = resolve_input_tokens(16_000, Some(20_000));
+
+ assert_eq!(input_tokens, 20_000);
+ assert_eq!(source, KiroInputTokenSource::UpstreamContextUsage);
+ }
+
+ #[test]
+ fn resolve_input_tokens_respects_configured_threshold() {
+ let (input_tokens, source) =
+ resolve_input_tokens_with_threshold(16_000, Some(20_000), 50_000);
+
+ assert_eq!(input_tokens, 16_000);
+ assert_eq!(source, KiroInputTokenSource::LocalRequestEstimateFallback);
+ }
+
+ #[test]
+ fn resolve_input_tokens_keeps_upstream_context_for_large_requests() {
+ let (input_tokens, source) = resolve_input_tokens(60_000, Some(90_000));
+
+ assert_eq!(input_tokens, 90_000);
+ assert_eq!(source, KiroInputTokenSource::UpstreamContextUsage);
+ }
+
+ #[test]
+ fn resolve_input_tokens_falls_back_to_local_request_without_context_usage() {
+ let (input_tokens, source) = resolve_input_tokens(123, None);
+
+ assert_eq!(input_tokens, 123);
+ assert_eq!(source, KiroInputTokenSource::LocalRequestEstimateFallback);
+ }
+}