diff --git a/src/google/adk/models/anthropic_llm.py b/src/google/adk/models/anthropic_llm.py index 6c20b1b9a5..a598aa567c 100644 --- a/src/google/adk/models/anthropic_llm.py +++ b/src/google/adk/models/anthropic_llm.py @@ -30,6 +30,7 @@ from typing import Union from anthropic import AnthropicVertex +from anthropic import AsyncAnthropicVertex from anthropic import NOT_GIVEN from anthropic import types as anthropic_types from google.genai import types @@ -84,8 +85,20 @@ def part_to_message_block( anthropic_types.ImageBlockParam, anthropic_types.ToolUseBlockParam, anthropic_types.ToolResultBlockParam, + dict, # For thinking blocks ]: - if part.text: + # Handle thinking blocks (must check thought=True BEFORE text) + # Thinking is stored as Part(text=..., thought=True, thought_signature=...) + if part.text and hasattr(part, 'thought') and part.thought: + thinking_block = {"type": "thinking", "thinking": part.text} + if hasattr(part, 'thought_signature') and part.thought_signature: + # thought_signature is stored as bytes in Part, but API expects base64 string + thinking_block["signature"] = base64.b64encode(part.thought_signature).decode('utf-8') + logger.debug(f"Including signature with thinking block") + else: + logger.warning(f"No signature found for thinking block - this may cause API errors") + return thinking_block + elif part.text: return anthropic_types.TextBlockParam(text=part.text, type="text") elif part.function_call: assert part.function_call.name @@ -139,14 +152,26 @@ def part_to_message_block( def content_to_message_param( content: types.Content, ) -> anthropic_types.MessageParam: - message_block = [] + thinking_blocks = [] + other_blocks = [] + for part in content.parts or []: # Image data is not supported in Claude for model turns. if _is_image_part(part): logger.warning("Image data is not supported in Claude for model turns.") continue - message_block.append(part_to_message_block(part)) + block = part_to_message_block(part) + + # Separate thinking blocks from other blocks + # Anthropic requires thinking blocks to come FIRST in assistant messages + if isinstance(block, dict) and block.get("type") == "thinking": + thinking_blocks.append(block) + else: + other_blocks.append(block) + + # Thinking blocks MUST come first (Anthropic API requirement) + message_block = thinking_blocks + other_blocks return { "role": to_claude_role(content.role), @@ -166,7 +191,84 @@ def content_block_to_part( ) part.function_call.id = content_block.id return part - raise NotImplementedError("Not supported yet.") + + # Handle thinking blocks from Anthropic extended thinking feature + # Thinking blocks have a 'thinking' attribute containing the reasoning text + if hasattr(content_block, "thinking"): + thinking_text = content_block.thinking + signature = getattr(content_block, 'signature', None) + logger.info(f"Received thinking block ({len(thinking_text)} chars, signature={'present' if signature else 'missing'})") + # Return as Part with thought=True and preserve signature (standard GenAI format) + return types.Part(text=thinking_text, thought=True, thought_signature=signature) + + # Alternative check: some versions may use type attribute + if ( + hasattr(content_block, "type") + and getattr(content_block, "type", None) == "thinking" + ): + thinking_text = str(content_block) + signature = getattr(content_block, 'signature', None) + logger.info( + f"Received thinking block via type check ({len(thinking_text)} chars, signature={'present' if signature else 'missing'})" + ) + # Return as Part with thought=True and preserve signature (standard GenAI format) + return types.Part(text=thinking_text, thought=True, thought_signature=signature) + + raise NotImplementedError( + f"Not supported yet: {type(content_block).__name__}" + ) + + +def streaming_event_to_llm_response( + event: anthropic_types.MessageStreamEvent, +) -> Optional[LlmResponse]: + """Convert Anthropic streaming events to ADK LlmResponse format. + + Args: + event: Anthropic streaming event + + Returns: + LlmResponse or None if event should be skipped + """ + # Handle content block deltas + if event.type == "content_block_delta": + delta = event.delta + + # Text delta + if delta.type == "text_delta": + return LlmResponse( + content=types.Content( + role="model", + parts=[types.Part.from_text(text=delta.text)], + ), + partial=True, + ) + + # Thinking delta + elif delta.type == "thinking_delta": + return LlmResponse( + content=types.Content( + role="model", + parts=[types.Part(text=delta.thinking, thought=True)], + ), + partial=True, + ) + + # Handle message deltas (usage updates) + elif event.type == "message_delta": + if hasattr(event, "usage"): + input_tokens = getattr(event.usage, "input_tokens", 0) or 0 + output_tokens = getattr(event.usage, "output_tokens", 0) or 0 + return LlmResponse( + usage_metadata=types.GenerateContentResponseUsageMetadata( + prompt_token_count=input_tokens, + candidates_token_count=output_tokens, + total_token_count=input_tokens + output_tokens, + ), + ) + + # Ignore start/stop events + return None def message_to_generate_content_response( @@ -250,10 +352,12 @@ class Claude(BaseLlm): Attributes: model: The name of the Claude model. max_tokens: The maximum number of tokens to generate. + extra_headers: Optional extra headers to pass to the Anthropic API. """ model: str = "claude-3-5-sonnet-v2@20241022" max_tokens: int = 8192 + extra_headers: Optional[dict[str, str]] = None @classmethod @override @@ -283,19 +387,124 @@ async def generate_content_async( if llm_request.tools_dict else NOT_GIVEN ) - # TODO(b/421255973): Enable streaming for anthropic models. - message = self._anthropic_client.messages.create( - model=llm_request.model, - system=llm_request.config.system_instruction, - messages=messages, - tools=tools, - tool_choice=tool_choice, - max_tokens=self.max_tokens, - ) - yield message_to_generate_content_response(message) + + # Extract and convert thinking config from ADK to Anthropic format + thinking = NOT_GIVEN + + if llm_request.config and llm_request.config.thinking_config: + budget = llm_request.config.thinking_config.thinking_budget + if budget: + if budget == -1: + raise ValueError( + "Unlimited thinking budget (-1) is not supported with Claude." + ) + elif budget > 0: + + thinking = {"type": "enabled", "budget_tokens": budget} + logger.info( + f"Extended thinking enabled (budget: {budget} tokens)" + ) + else: + logger.warning(f"Budget not given! budget={budget}") + else: + logger.warning(f"No thinking_config found in llm_request.config") + + # Use extra headers if provided + extra_headers = self.extra_headers or NOT_GIVEN + + if stream: + # Use streaming mode + logger.info( + f"Using streaming mode (stream={stream}, " + f"has_thinking={thinking != NOT_GIVEN}, " + f"large_max_tokens={self.max_tokens >= 8192})" + ) + + # Accumulators for text and thinking + accumulated_text = "" + accumulated_thinking = "" + + async with self._anthropic_client.messages.stream( + model=llm_request.model, + system=llm_request.config.system_instruction, + messages=messages, + tools=tools, + tool_choice=tool_choice, + max_tokens=self.max_tokens, + thinking=thinking, + extra_headers=extra_headers, + ) as anthropic_stream: + # Process streaming events + async for event in anthropic_stream: + # Convert Anthropic event to LlmResponse + if llm_response := streaming_event_to_llm_response(event): + # Track accumulated content + is_thought = False + if llm_response.content and llm_response.content.parts: + for part in llm_response.content.parts: + if part.text: + if hasattr(part, "thought") and part.thought: + accumulated_thinking += part.text + is_thought = True + else: + accumulated_text += part.text + + # If we have accumulated thinking and now getting text, + # yield the accumulated thinking first + # NOTE: This partial response is for UI display only + # The final response with signature will be yielded after the stream ends + if accumulated_thinking and accumulated_text and not is_thought: + yield LlmResponse( + content=types.Content( + role="model", + parts=[ + types.Part(text=accumulated_thinking, thought=True) + ], + ), + partial=True, + ) + accumulated_thinking = "" # Reset after yielding + + # Yield partial response (but skip individual thought deltas) + if not is_thought: + yield llm_response + + # Get final message with complete content blocks (includes signatures) + final_message = await anthropic_stream.get_final_message() + + # Build final response from complete content blocks to preserve thinking signatures + # IMPORTANT: Use final_message.content instead of accumulated strings + # because accumulated strings don't have signatures + if final_message.content: + parts = [content_block_to_part(cb) for cb in final_message.content] + input_tokens = final_message.usage.input_tokens + output_tokens = final_message.usage.output_tokens + yield LlmResponse( + content=types.Content(role="model", parts=parts), + usage_metadata=types.GenerateContentResponseUsageMetadata( + prompt_token_count=input_tokens, + candidates_token_count=output_tokens, + total_token_count=input_tokens + output_tokens, + ), + ) + + else: + # Non-streaming mode + logger.info("Using non-streaming mode") + message = await self._anthropic_client.messages.create( + model=llm_request.model, + system=llm_request.config.system_instruction, + messages=messages, + tools=tools, + tool_choice=tool_choice, + max_tokens=self.max_tokens, + thinking=thinking, + extra_headers=extra_headers, + ) + yield message_to_generate_content_response(message) @cached_property - def _anthropic_client(self) -> AnthropicVertex: + def _anthropic_client(self) -> AsyncAnthropicVertex: if ( "GOOGLE_CLOUD_PROJECT" not in os.environ or "GOOGLE_CLOUD_LOCATION" not in os.environ @@ -305,7 +514,7 @@ def _anthropic_client(self) -> AnthropicVertex: " Anthropic on Vertex." ) - return AnthropicVertex( + return AsyncAnthropicVertex( project_id=os.environ["GOOGLE_CLOUD_PROJECT"], region=os.environ["GOOGLE_CLOUD_LOCATION"], ) diff --git a/tests/unittests/models/test_anthropic_llm.py b/tests/unittests/models/test_anthropic_llm.py index a81fbc7252..f892c8ddf2 100644 --- a/tests/unittests/models/test_anthropic_llm.py +++ b/tests/unittests/models/test_anthropic_llm.py @@ -293,8 +293,10 @@ async def test_function_declaration_to_tool_param( @pytest.mark.asyncio async def test_generate_content_async( - claude_llm, llm_request, generate_content_response, generate_llm_response + llm_request, generate_content_response, generate_llm_response ): + # Use max_tokens < 8192 to trigger non-streaming mode + claude_llm = Claude(model="claude-3-5-sonnet-v2@20241022", max_tokens=4096) with mock.patch.object(claude_llm, "_anthropic_client") as mock_client: with mock.patch.object( anthropic_llm, diff --git a/tests/unittests/models/test_anthropic_streaming.py b/tests/unittests/models/test_anthropic_streaming.py new file mode 100644 index 0000000000..0f0f05e5dc --- /dev/null +++ b/tests/unittests/models/test_anthropic_streaming.py @@ -0,0 +1,132 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for Anthropic streaming with thinking support.""" + +import os +from unittest import mock + +from anthropic import types as anthropic_types +from google.adk.models.anthropic_llm import Claude +from google.adk.models.anthropic_llm import streaming_event_to_llm_response +from google.adk.models.llm_request import LlmRequest +from google.genai import types +import pytest + + +@pytest.fixture +def claude_llm(): + return Claude(model="claude-opus-4-1@20250805", max_tokens=16000) + + +@pytest.fixture +def llm_request_with_thinking(): + return LlmRequest( + model="claude-opus-4-1@20250805", + contents=[ + types.Content( + role="user", + parts=[types.Part.from_text(text="Explain quantum computing")], + ) + ], + config=types.GenerateContentConfig( + system_instruction="You are a helpful assistant", + thinking_config=types.ThinkingConfig( + include_thoughts=True, thinking_budget=-1 + ), + ), + ) + + +def test_streaming_event_text_delta(): + """Test that text_delta events are converted correctly.""" + + class TextDelta: + type = "text_delta" + text = "Hello " + + class ContentBlockDeltaEvent: + type = "content_block_delta" + delta = TextDelta() + + event = ContentBlockDeltaEvent() + llm_response = streaming_event_to_llm_response(event) + + assert llm_response is not None + assert llm_response.partial is True + assert llm_response.content is not None + assert llm_response.content.role == "model" + assert len(llm_response.content.parts) == 1 + assert llm_response.content.parts[0].text == "Hello " + assert ( + not hasattr(llm_response.content.parts[0], "thought") + or not llm_response.content.parts[0].thought + ) + + +def test_streaming_event_thinking_delta(): + """Test that thinking_delta events are converted with thought=True.""" + + class ThinkingDelta: + type = "thinking_delta" + thinking = "Let me think... " + + class ContentBlockDeltaEvent: + type = "content_block_delta" + delta = ThinkingDelta() + + event = ContentBlockDeltaEvent() + llm_response = streaming_event_to_llm_response(event) + + assert llm_response is not None + assert llm_response.partial is True + assert llm_response.content is not None + assert llm_response.content.role == "model" + assert len(llm_response.content.parts) == 1 + assert llm_response.content.parts[0].text == "Let me think... " + assert hasattr(llm_response.content.parts[0], "thought") + assert llm_response.content.parts[0].thought is True + + +def test_streaming_event_message_delta_usage(): + """Test that message_delta events with usage are converted correctly.""" + + class Usage: + input_tokens = 100 + output_tokens = 50 + + class MessageDeltaEvent: + type = "message_delta" + usage = Usage() + + event = MessageDeltaEvent() + llm_response = streaming_event_to_llm_response(event) + + assert llm_response is not None + assert llm_response.usage_metadata is not None + assert llm_response.usage_metadata.prompt_token_count == 100 + assert llm_response.usage_metadata.candidates_token_count == 50 + assert llm_response.usage_metadata.total_token_count == 150 + + +def test_streaming_event_ignored(): + """Test that start/stop events are ignored.""" + + class MessageStartEvent: + type = "message_start" + + event = MessageStartEvent() + llm_response = streaming_event_to_llm_response(event) + + assert llm_response is None diff --git a/tests/unittests/models/test_anthropic_thinking.py b/tests/unittests/models/test_anthropic_thinking.py new file mode 100644 index 0000000000..dbe3415f1b --- /dev/null +++ b/tests/unittests/models/test_anthropic_thinking.py @@ -0,0 +1,473 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for Anthropic Extended Thinking configuration.""" + +import os +from unittest import mock + +from anthropic import NOT_GIVEN +from anthropic import types as anthropic_types +from google.adk.models.anthropic_llm import Claude +from google.adk.models.anthropic_llm import content_block_to_part +from google.adk.models.llm_request import LlmRequest +from google.genai import types +import pytest + + +@pytest.fixture +def claude_llm(): + return Claude(model="claude-opus-4-1@20250805", max_tokens=4096) + + +@pytest.fixture +def base_llm_request(): + return LlmRequest( + model="claude-opus-4-1@20250805", + contents=[ + types.Content( + role="user", parts=[types.Part.from_text(text="What is 2+2?")] + ) + ], + config=types.GenerateContentConfig( + system_instruction="You are a helpful assistant" + ), + ) + + +@pytest.mark.asyncio +async def test_thinking_budget_automatic_raises_error( + claude_llm, base_llm_request +): + """Test that thinking_budget=-1 raises ValueError.""" + base_llm_request.config.thinking_config = types.ThinkingConfig( + include_thoughts=True, thinking_budget=-1 + ) + + with pytest.raises( + ValueError, + match="Unlimited thinking budget \\(-1\\) is not supported with Claude", + ): + async for response in claude_llm.generate_content_async( + base_llm_request, stream=False + ): + pass + + +@pytest.mark.asyncio +async def test_thinking_budget_disabled(claude_llm, base_llm_request): + """Test that thinking_budget=0 disables thinking.""" + base_llm_request.config.thinking_config = types.ThinkingConfig( + include_thoughts=True, thinking_budget=0 + ) + + with mock.patch.object(claude_llm, "_anthropic_client") as mock_client: + mock_client.messages.create = mock.AsyncMock( + return_value=anthropic_types.Message( + id="test", + content=[anthropic_types.TextBlock(text="4", type="text")], + model="claude-opus-4-1@20250805", + role="assistant", + stop_reason="end_turn", + type="message", + usage=anthropic_types.Usage(input_tokens=10, output_tokens=2), + ) + ) + + responses = [] + async for response in claude_llm.generate_content_async( + base_llm_request, stream=False + ): + responses.append(response) + + # Verify non-streaming mode was used (thinking disabled) + mock_client.messages.create.assert_called_once() + # Thinking should be NOT_GIVEN when disabled + call_kwargs = mock_client.messages.create.call_args.kwargs + assert call_kwargs.get("thinking") == NOT_GIVEN + + +@pytest.mark.asyncio +async def test_thinking_budget_specific_value(claude_llm, base_llm_request): + """Test that thinking_budget with specific value is used.""" + base_llm_request.config.thinking_config = types.ThinkingConfig( + include_thoughts=True, thinking_budget=5000 + ) + + with mock.patch.object(claude_llm, "_anthropic_client") as mock_client: + mock_client.messages.create = mock.AsyncMock( + return_value=anthropic_types.Message( + id="test", + content=[], + model="claude-opus-4-1@20250805", + role="assistant", + stop_reason="end_turn", + type="message", + usage=anthropic_types.Usage(input_tokens=10, output_tokens=20), + ) + ) + + responses = [] + async for response in claude_llm.generate_content_async( + base_llm_request, stream=False + ): + responses.append(response) + + # Verify thinking parameter was passed with 5000 tokens + mock_client.messages.create.assert_called_once() + call_kwargs = mock_client.messages.create.call_args.kwargs + assert call_kwargs["thinking"] == {"type": "enabled", "budget_tokens": 5000} + + +@pytest.mark.asyncio +async def test_thinking_budget_minimum_value(claude_llm, base_llm_request): + """Test that thinking budget of 1024 tokens (minimum) works.""" + base_llm_request.config.thinking_config = types.ThinkingConfig( + include_thoughts=True, thinking_budget=1024 + ) + + with mock.patch.object(claude_llm, "_anthropic_client") as mock_client: + mock_client.messages.create = mock.AsyncMock( + return_value=anthropic_types.Message( + id="test", + content=[], + model="claude-opus-4-1@20250805", + role="assistant", + stop_reason="end_turn", + type="message", + usage=anthropic_types.Usage(input_tokens=10, output_tokens=20), + ) + ) + + responses = [] + async for response in claude_llm.generate_content_async( + base_llm_request, stream=False + ): + responses.append(response) + + # Verify thinking parameter was passed with 1024 tokens + mock_client.messages.create.assert_called_once() + call_kwargs = mock_client.messages.create.call_args.kwargs + assert call_kwargs["thinking"] == {"type": "enabled", "budget_tokens": 1024} + + +def test_thinking_block_parsing(): + """Test that thinking blocks are parsed as Part(thought=True).""" + import base64 + + # Create a mock thinking block with 'thinking' attribute + class ThinkingBlock: + + def __init__(self): + self.thinking = "Let me think about this step by step..." + # Signature must be base64 encoded + self.signature = base64.b64encode(b"mock_signature_123").decode("utf-8") + + thinking_block = ThinkingBlock() + part = content_block_to_part(thinking_block) + + assert part.text == "Let me think about this step by step..." + assert part.thought is True + # Signature is stored as bytes (decoded from base64 input) + assert part.thought_signature == b"mock_signature_123" + + +def test_thinking_block_type_check(): + """Test that thinking blocks with type attribute are parsed correctly.""" + import base64 + + # Create a mock thinking block with 'type' attribute + class ThinkingBlockWithType: + + def __init__(self): + self.type = "thinking" + # Signature must be base64 encoded + self.signature = base64.b64encode(b"mock_signature_456").decode("utf-8") + + def __str__(self): + return "Thinking content via type check" + + thinking_block = ThinkingBlockWithType() + part = content_block_to_part(thinking_block) + + assert part.text == "Thinking content via type check" + assert part.thought is True + # Signature is stored as bytes (decoded from base64 input) + assert part.thought_signature == b"mock_signature_456" + + +@pytest.mark.asyncio +async def test_no_thinking_config(claude_llm, base_llm_request): + """Test that requests without thinking_config work normally.""" + # No thinking_config set + + with mock.patch.object(claude_llm, "_anthropic_client") as mock_client: + mock_client.messages.create = mock.AsyncMock( + return_value=anthropic_types.Message( + id="test", + content=[anthropic_types.TextBlock(text="4", type="text")], + model="claude-opus-4-1@20250805", + role="assistant", + stop_reason="end_turn", + type="message", + usage=anthropic_types.Usage(input_tokens=10, output_tokens=2), + ) + ) + + responses = [] + async for response in claude_llm.generate_content_async( + base_llm_request, stream=False + ): + responses.append(response) + + # Verify non-streaming mode was used without thinking + mock_client.messages.create.assert_called_once() + call_kwargs = mock_client.messages.create.call_args.kwargs + assert call_kwargs.get("thinking") == NOT_GIVEN + + +@pytest.mark.asyncio +async def test_thinking_with_streaming(claude_llm, base_llm_request): + """Test that thinking works in streaming mode.""" + base_llm_request.config.thinking_config = types.ThinkingConfig( + include_thoughts=True, thinking_budget=2048 + ) + + with mock.patch.object(claude_llm, "_anthropic_client") as mock_client: + mock_stream = mock.AsyncMock() + mock_stream.__aenter__ = mock.AsyncMock(return_value=mock_stream) + mock_stream.__aexit__ = mock.AsyncMock() + mock_stream.__aiter__ = mock.AsyncMock(return_value=iter([])) + mock_stream.get_final_message = mock.AsyncMock( + return_value=anthropic_types.Message( + id="test", + content=[], + model="claude-opus-4-1@20250805", + role="assistant", + stop_reason="end_turn", + type="message", + usage=anthropic_types.Usage(input_tokens=10, output_tokens=20), + ) + ) + mock_client.messages.stream = mock.Mock(return_value=mock_stream) + + responses = [] + async for response in claude_llm.generate_content_async( + base_llm_request, stream=True + ): + responses.append(response) + + # Verify streaming mode was used with thinking + mock_client.messages.stream.assert_called_once() + call_kwargs = mock_client.messages.stream.call_args.kwargs + assert call_kwargs["thinking"] == {"type": "enabled", "budget_tokens": 2048} + + +@pytest.mark.asyncio +async def test_interleaved_thinking_disabled_by_default( + claude_llm, base_llm_request +): + """Test that beta header is NOT sent when interleaved thinking is disabled.""" + base_llm_request.config.thinking_config = types.ThinkingConfig( + include_thoughts=True, thinking_budget=2048 + ) + + with mock.patch.object(claude_llm, "_anthropic_client") as mock_client: + mock_client.messages.create = mock.AsyncMock( + return_value=anthropic_types.Message( + id="test", + content=[], + model="claude-opus-4-1@20250805", + role="assistant", + stop_reason="end_turn", + type="message", + usage=anthropic_types.Usage(input_tokens=10, output_tokens=20), + ) + ) + + responses = [] + async for response in claude_llm.generate_content_async( + base_llm_request, stream=False + ): + responses.append(response) + + # Verify beta header is NOT_GIVEN (default behavior) + mock_client.messages.create.assert_called_once() + call_kwargs = mock_client.messages.create.call_args.kwargs + assert call_kwargs.get("extra_headers") == NOT_GIVEN + + +@pytest.mark.asyncio +async def test_interleaved_thinking_streaming(base_llm_request): + """Test that beta header is sent in streaming mode when provided.""" + # Create Claude with beta headers + claude_llm = Claude( + model="claude-opus-4-1@20250805", + max_tokens=4096, + extra_headers={"anthropic-beta": "interleaved-thinking-2025-05-14"}, + ) + + base_llm_request.config.thinking_config = types.ThinkingConfig( + include_thoughts=True, thinking_budget=2048 + ) + + with mock.patch.object(claude_llm, "_anthropic_client") as mock_client: + mock_stream = mock.AsyncMock() + mock_stream.__aenter__ = mock.AsyncMock(return_value=mock_stream) + mock_stream.__aexit__ = mock.AsyncMock() + mock_stream.__aiter__ = mock.AsyncMock(return_value=iter([])) + mock_stream.get_final_message = mock.AsyncMock( + return_value=anthropic_types.Message( + id="test", + content=[], + model="claude-opus-4-1@20250805", + role="assistant", + stop_reason="end_turn", + type="message", + usage=anthropic_types.Usage(input_tokens=10, output_tokens=20), + ) + ) + mock_client.messages.stream = mock.Mock(return_value=mock_stream) + + responses = [] + async for response in claude_llm.generate_content_async( + base_llm_request, stream=True + ): + responses.append(response) + + # Verify beta header was sent + mock_client.messages.stream.assert_called_once() + call_kwargs = mock_client.messages.stream.call_args.kwargs + assert call_kwargs["extra_headers"] == { + "anthropic-beta": "interleaved-thinking-2025-05-14" + } + + +@pytest.mark.asyncio +async def test_interleaved_thinking_non_streaming(base_llm_request): + """Test that beta header is sent in non-streaming mode when provided.""" + # Create Claude with beta headers + claude_llm = Claude( + model="claude-opus-4-1@20250805", + max_tokens=4096, + extra_headers={"anthropic-beta": "interleaved-thinking-2025-05-14"}, + ) + + base_llm_request.config.thinking_config = types.ThinkingConfig( + include_thoughts=True, thinking_budget=2048 + ) + + with mock.patch.object(claude_llm, "_anthropic_client") as mock_client: + mock_client.messages.create = mock.AsyncMock( + return_value=anthropic_types.Message( + id="test", + content=[], + model="claude-opus-4-1@20250805", + role="assistant", + stop_reason="end_turn", + type="message", + usage=anthropic_types.Usage(input_tokens=10, output_tokens=20), + ) + ) + + responses = [] + async for response in claude_llm.generate_content_async( + base_llm_request, stream=False + ): + responses.append(response) + + # Verify beta header was sent + mock_client.messages.create.assert_called_once() + call_kwargs = mock_client.messages.create.call_args.kwargs + assert call_kwargs["extra_headers"] == { + "anthropic-beta": "interleaved-thinking-2025-05-14" + } + + +@pytest.mark.asyncio +async def test_extra_headers_sent_regardless_of_thinking(base_llm_request): + """Test that extra headers are sent even when thinking is disabled.""" + # Create Claude with beta headers + claude_llm = Claude( + model="claude-opus-4-1@20250805", + max_tokens=4096, + extra_headers={"anthropic-beta": "interleaved-thinking-2025-05-14"}, + ) + + # No thinking_config set - thinking disabled + base_llm_request.config.thinking_config = types.ThinkingConfig( + include_thoughts=True, thinking_budget=0 # Disabled + ) + + with mock.patch.object(claude_llm, "_anthropic_client") as mock_client: + mock_client.messages.create = mock.AsyncMock( + return_value=anthropic_types.Message( + id="test", + content=[anthropic_types.TextBlock(text="4", type="text")], + model="claude-opus-4-1@20250805", + role="assistant", + stop_reason="end_turn", + type="message", + usage=anthropic_types.Usage(input_tokens=10, output_tokens=2), + ) + ) + + responses = [] + async for response in claude_llm.generate_content_async( + base_llm_request, stream=False + ): + responses.append(response) + + # Verify beta header is sent even when thinking is disabled + mock_client.messages.create.assert_called_once() + call_kwargs = mock_client.messages.create.call_args.kwargs + assert call_kwargs.get("extra_headers") == { + "anthropic-beta": "interleaved-thinking-2025-05-14" + } + + +@pytest.mark.asyncio +async def test_thinking_blocks_preserved_in_assistant_messages(base_llm_request): + """Test that thinking blocks from previous assistant turn are preserved.""" + from google.adk.models.anthropic_llm import content_to_message_param + + # Create content with thinking block and tool use + content = types.Content( + role="model", + parts=[ + types.Part( + text="Let me calculate this step by step...", thought=True + ), + types.Part.from_function_call( + name="calculator", args={"expression": "2+2"} + ), + ], + ) + + message_param = content_to_message_param(content) + + # Verify message structure + assert message_param["role"] == "assistant" + assert len(message_param["content"]) == 2 + + # Verify thinking block comes FIRST + assert message_param["content"][0]["type"] == "thinking" + assert ( + message_param["content"][0]["thinking"] + == "Let me calculate this step by step..." + ) + + # Verify tool use comes after + assert message_param["content"][1]["type"] == "tool_use" + assert message_param["content"][1]["name"] == "calculator"