Conversation
|
Cursor Agent can help with this pull request. Just |
📝 WalkthroughWalkthroughThis PR introduces Gemini Vision Language Model (VLM) support by adding a new GeminiVLM plugin class that enables multimodal interactions with video frames and text. It includes core implementation, documentation, working examples, and integration tests. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant VLM as GeminiVLM
participant VideoForwarder
participant FrameBuffer
participant Gemini as Gemini API
participant EventSystem
User->>VLM: watch_video_track(track)
VLM->>VideoForwarder: subscribe to frames
VideoForwarder->>FrameBuffer: forward frames
User->>VLM: simple_response(text)
VLM->>FrameBuffer: _get_frames_bytes()
FrameBuffer->>FrameBuffer: convert frames to JPEG
VLM->>VLM: _build_message_parts(text + frames)
VLM->>Gemini: stream GenerateContent(parts)
loop stream chunks
Gemini-->>VLM: ContentChunk
VLM->>EventSystem: emit LLMResponseChunkEvent
end
Gemini-->>VLM: final response
VLM->>EventSystem: emit VLMInferenceCompletedEvent
VLM->>EventSystem: emit LLMResponseCompletedEvent
EventSystem-->>User: response ready
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py`:
- Around line 333-339: The method stop_watching_video_track currently removes
the frame handler but doesn't stop the VideoForwarder background task; modify it
to track ownership when the forwarder is created (e.g., add a boolean like
self._owns_video_forwarder set where you instantiate the VideoForwarder) and in
stop_watching_video_track, after await
self._video_forwarder.remove_frame_handler(self._frame_buffer.append) check if
self._owns_video_forwarder is True and if so call await
self._video_forwarder.stop() (or the forwarder's shutdown method), then set
self._video_forwarder = None and self._owns_video_forwarder = False; reference
symbols: stop_watching_video_track, self._video_forwarder, and the
VideoForwarder creation site to add the ownership flag.
- Around line 154-298: In simple_response, stop catching broad Exception: import
asyncio and the specific SDK errors (e.g., google.genai.errors as genai_errors)
and update the try/except so you first re-raise asyncio.CancelledError, then
catch genai_errors.APIError (handle logging and send LLMErrorEvent/VLMErrorEvent
as currently done), optionally catch ValueError for function-calling argument
issues and handle similarly, and remove or avoid a catch-all Exception handler
so task cancellations aren't masked and only expected SDK/value errors are
handled; keep references to simple_response, self.events.send, VLMErrorEvent,
LLMErrorEvent, and logger.exception when implementing these changes.
| async def simple_response( | ||
| self, | ||
| text: str, | ||
| processors: Optional[list[Processor]] = None, | ||
| participant: Optional[Participant] = None, | ||
| ) -> LLMResponseEvent[Any]: | ||
| """ | ||
| Create a response from text input with video context. | ||
|
|
||
| Args: | ||
| text: The prompt to respond to. | ||
| processors: List of processors (unused). | ||
| participant: Optional participant object for message attribution. | ||
| """ | ||
| user_id = participant.user_id if participant else "user" | ||
| if self._conversation is not None: | ||
| await self._conversation.send_message( | ||
| role="user", user_id=user_id, content=text | ||
| ) | ||
|
|
||
| if self.chat is None: | ||
| config = self._build_config(system_instruction=self._instructions) | ||
| self.chat = self.client.chats.create(model=self.model, config=config) | ||
|
|
||
| frames_count = len(self._frame_buffer) | ||
| inference_id = str(uuid.uuid4()) | ||
|
|
||
| self.events.send( | ||
| VLMInferenceStartEvent( | ||
| plugin_name=PLUGIN_NAME, | ||
| inference_id=inference_id, | ||
| model=self.model, | ||
| frames_count=frames_count, | ||
| ) | ||
| ) | ||
|
|
||
| self.events.send( | ||
| LLMRequestStartedEvent( | ||
| plugin_name=PLUGIN_NAME, | ||
| model=self.model, | ||
| streaming=True, | ||
| ) | ||
| ) | ||
|
|
||
| request_start_time = time.perf_counter() | ||
| first_token_time: Optional[float] = None | ||
|
|
||
| try: | ||
| parts = await self._build_message_parts(text) | ||
| cfg = None | ||
| if ( | ||
| self.thinking_level | ||
| or self.media_resolution | ||
| or self._base_config is not None | ||
| ): | ||
| cfg = self._build_config() | ||
|
|
||
| if cfg is None: | ||
| iterator = await self.chat.send_message_stream(message=parts) | ||
| else: | ||
| iterator = await self.chat.send_message_stream( | ||
| message=parts, config=cfg | ||
| ) | ||
|
|
||
| text_parts: list[str] = [] | ||
| final_chunk: Optional[GenerateContentResponse] = None | ||
| item_id = str(uuid.uuid4()) | ||
|
|
||
| idx = 0 | ||
| async for chunk in iterator: | ||
| final_chunk = chunk | ||
| chunk_text = self._extract_text_from_chunk(chunk) | ||
| if chunk_text: | ||
| if first_token_time is None: | ||
| first_token_time = time.perf_counter() | ||
|
|
||
| self.events.send( | ||
| LLMResponseChunkEvent( | ||
| plugin_name=PLUGIN_NAME, | ||
| content_index=idx, | ||
| item_id=item_id, | ||
| delta=chunk_text, | ||
| ) | ||
| ) | ||
| text_parts.append(chunk_text) | ||
| idx += 1 | ||
|
|
||
| total_text = "".join(text_parts) | ||
| latency_ms = (time.perf_counter() - request_start_time) * 1000 | ||
| ttft_ms: Optional[float] = None | ||
| if first_token_time is not None: | ||
| ttft_ms = (first_token_time - request_start_time) * 1000 | ||
|
|
||
| input_tokens, output_tokens = self._extract_usage_tokens(final_chunk) | ||
|
|
||
| self.events.send( | ||
| VLMInferenceCompletedEvent( | ||
| plugin_name=PLUGIN_NAME, | ||
| inference_id=inference_id, | ||
| model=self.model, | ||
| text=total_text, | ||
| latency_ms=latency_ms, | ||
| input_tokens=input_tokens, | ||
| output_tokens=output_tokens, | ||
| frames_processed=frames_count, | ||
| ) | ||
| ) | ||
|
|
||
| self.events.send( | ||
| LLMResponseCompletedEvent( | ||
| plugin_name=PLUGIN_NAME, | ||
| original=final_chunk, | ||
| text=total_text, | ||
| item_id=item_id, | ||
| latency_ms=latency_ms, | ||
| time_to_first_token_ms=ttft_ms, | ||
| input_tokens=input_tokens, | ||
| output_tokens=output_tokens, | ||
| total_tokens=(input_tokens or 0) + (output_tokens or 0) | ||
| if input_tokens or output_tokens | ||
| else None, | ||
| model=self.model, | ||
| ) | ||
| ) | ||
|
|
||
| return LLMResponseEvent(final_chunk, total_text) | ||
| except Exception as exc: | ||
| logger.exception(f'Failed to get a response from the model "{self.model}"') | ||
| self.events.send( | ||
| events.LLMErrorEvent( | ||
| plugin_name=PLUGIN_NAME, | ||
| error_message=str(exc), | ||
| event_data=exc, | ||
| ) | ||
| ) | ||
| self.events.send( | ||
| VLMErrorEvent( | ||
| plugin_name=PLUGIN_NAME, | ||
| inference_id=inference_id, | ||
| error=exc, | ||
| context="api_request", | ||
| ) | ||
| ) | ||
| return LLMResponseEvent(original=None, text="", exception=exc) | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, find the file and check if it exists
fd -t f "gemini_vlm.py" --exec wc -l {} \;Repository: GetStream/Vision-Agents
Length of output: 175
🌐 Web query:
google-genai Python SDK AsyncClient send_message_stream exceptions documentation
💡 Result:
-
Async streaming chat API (docs):
AsyncChat.send_message_stream()is documented under Chats → Send Message (Asynchronous Streaming). Usage is:chat = client.aio.chats.create(model="gemini-…") async for chunk in await chat.send_message_stream("tell me a story"): print(chunk.text)
(Note the
awaitbeforechat.send_message_stream(...)in the current docs.) [1] -
Exceptions to catch (what the SDK exposes): the SDK defines a base API exception
google.genai.errors.APIError(and subclasses) and expects you to catch that for API-level failures (4xx/5xx, etc.). [2] -
Minimal pattern:
from google.genai.errors import APIError try: async for chunk in await chat.send_message_stream("hi"): ... except APIError as e: # inspect e / decide retry vs fail ...
[1][2]
Sources: [1] Google Gen AI Python SDK docs (Chats / async streaming) (googleapis.github.io), [2] python-genai issue referencing from google.genai.errors import APIError (github.com)
Citations:
- 1: https://googleapis.github.io/python-genai/?utm_source=openai
- 2: APIError and subclasses are not unpicklable googleapis/python-genai#1144
🌐 Web query:
google-genai Python SDK exception types and error handling
💡 Result:
Exception types in google-genai (Python)
The SDK’s own exceptions live in google.genai.errors:
errors.APIError: base class for API responses that are not HTTP 200; exposescode,status,message, anddetails(the parsed error JSON), plus the underlying HTTP response object. [2]errors.ClientError: raised for HTTP 4xx responses. [3]errors.ServerError: raised for HTTP 5xx responses. [3]
The SDK also defines a few function-calling helper exceptions:
UnknownFunctionCallArgumentError,UnsupportedFunctionError,FunctionInvocationError(allValueErrorsubclasses). [3]
Basic error handling pattern
from google import genai
from google.genai import errors
client = genai.Client(api_key="...")
try:
client.models.generate_content(model="...", contents="Hello")
except errors.ClientError as e: # 4xx (bad request, auth, quota, etc.)
print(e.code, e.status, e.message)
# e.details has the raw error JSON
except errors.ServerError as e: # 5xx (transient backend issues)
print(e.code, e.status, e.message)
except errors.APIError as e: # anything else non-200
print(e.code, e.status, e.message)This matches the SDK’s documented guidance to catch errors.APIError and inspect e.code / e.message. [1]
Retry guidance (practical)
- Retry transient failures (commonly
ServerError/ 5xx; and some 4xx like 429 RESOURCE_EXHAUSTED if you hit rate limits) with exponential backoff. - Don’t retry non-transient
ClientErrorcases like 400 (invalid argument) or 401/403 (auth/config) until you fix the request/credentials.
“Non-SDK” exceptions you should still expect
Even with the above, your code may also need to handle:
- network/timeout errors from the HTTP transport (the SDK uses
httpx). [2] ValueError-family exceptions from function-calling argument conversion / invocation. [3]
Sources: PyPI google-genai docs (error handling example) [1]; SDK exception definitions and 4xx/5xx mapping [2][3].
🏁 Script executed:
# Read the file to see imports and the actual exception handler
cat -n ./plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py | head -50Repository: GetStream/Vision-Agents
Length of output: 1837
🏁 Script executed:
# Check the exception handling lines around 154-298
sed -n '1,30p' ./plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.pyRepository: GetStream/Vision-Agents
Length of output: 1010
🏁 Script executed:
# Look for imports to see what's available
rg "^import|^from" ./plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py | head -30Repository: GetStream/Vision-Agents
Length of output: 834
🏁 Script executed:
# Find the exact exception handler mentioned
sed -n '240,270p' ./plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.pyRepository: GetStream/Vision-Agents
Length of output: 1273
🏁 Script executed:
# Read the exact exception handler area (lines 275-298)
sed -n '275,298p' ./plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.pyRepository: GetStream/Vision-Agents
Length of output: 879
🏁 Script executed:
# Search for any imports of google.genai.errors or genai_errors in the file
rg "genai.*error|google.*error" ./plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py -iRepository: GetStream/Vision-Agents
Length of output: 49
🏁 Script executed:
# Check the whole simple_response method to see line numbers
sed -n '154,298p' ./plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py | grep -n "except"Repository: GetStream/Vision-Agents
Length of output: 274
Handle cancellation and catch specific SDK exceptions instead of broad Exception catch.
Catching Exception masks asyncio.CancelledError and violates the exception handling guideline. Please:
- Re-raise
asyncio.CancelledErrorto allow task cancellation - Import and catch specific exceptions from
google.genai.errors - Optionally catch
ValueErrorfor function-calling argument errors
Suggested fix
+ except asyncio.CancelledError:
+ raise
+ except Exception as exc:
logger.exception(f'Failed to get a response from the model "{self.model}"')With the import:
+from google.genai import errors as genai_errorsThen catch genai_errors.APIError (which covers ClientError and ServerError subclasses) instead of broad Exception.
🤖 Prompt for AI Agents
In `@plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py` around lines 154 -
298, In simple_response, stop catching broad Exception: import asyncio and the
specific SDK errors (e.g., google.genai.errors as genai_errors) and update the
try/except so you first re-raise asyncio.CancelledError, then catch
genai_errors.APIError (handle logging and send LLMErrorEvent/VLMErrorEvent as
currently done), optionally catch ValueError for function-calling argument
issues and handle similarly, and remove or avoid a catch-all Exception handler
so task cancellations aren't masked and only expected SDK/value errors are
handled; keep references to simple_response, self.events.send, VLMErrorEvent,
LLMErrorEvent, and logger.exception when implementing these changes.
| async def stop_watching_video_track(self) -> None: | ||
| if self._video_forwarder is not None: | ||
| await self._video_forwarder.remove_frame_handler(self._frame_buffer.append) | ||
| self._video_forwarder = None | ||
| logger.info( | ||
| f"🛑 Stopped video forwarding to {PLUGIN_NAME} (participant left)" | ||
| ) |
There was a problem hiding this comment.
Stop owned VideoForwarder to avoid background task leaks.
When this instance creates the forwarder, stop_watching_video_track removes the handler but never stops the forwarder, so its background task can keep running. Track ownership and stop it when owned.
🛠️ Suggested fix (within this method)
async def stop_watching_video_track(self) -> None:
if self._video_forwarder is not None:
await self._video_forwarder.remove_frame_handler(self._frame_buffer.append)
+ if self._owns_video_forwarder:
+ await self._video_forwarder.stop()
self._video_forwarder = None
+ self._owns_video_forwarder = False
logger.info(
f"🛑 Stopped video forwarding to {PLUGIN_NAME} (participant left)"
)# In __init__ (outside this range)
+ self._owns_video_forwarder = False
# In watch_video_track (outside this range)
if shared_forwarder:
self._video_forwarder = shared_forwarder
+ self._owns_video_forwarder = False
else:
self._video_forwarder = VideoForwarder(
cast(VideoStreamTrack, track),
max_buffer=10,
fps=self._fps,
name=f"{PLUGIN_NAME}_forwarder",
)
self._video_forwarder.start()
+ self._owns_video_forwarder = True🤖 Prompt for AI Agents
In `@plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py` around lines 333 -
339, The method stop_watching_video_track currently removes the frame handler
but doesn't stop the VideoForwarder background task; modify it to track
ownership when the forwarder is created (e.g., add a boolean like
self._owns_video_forwarder set where you instantiate the VideoForwarder) and in
stop_watching_video_track, after await
self._video_forwarder.remove_frame_handler(self._frame_buffer.append) check if
self._owns_video_forwarder is True and if so call await
self._video_forwarder.stop() (or the forwarder's shutdown method), then set
self._video_forwarder = None and self._owns_video_forwarder = False; reference
symbols: stop_watching_video_track, self._video_forwarder, and the
VideoForwarder creation site to add the ownership flag.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@plugins/gemini/example/gemini_vlm_agent_example.py`:
- Around line 34-37: Replace the hardcoded string check in on_participant_joined
with a comparison against the current agent's user id (use agent.agent_user.id)
so the handler doesn't react to the agent itself; locate the async def
on_participant_joined(event: CallSessionParticipantJoinedEvent) and change the
condition that compares event.participant.user.id to use agent.agent_user.id
(the same id set when creating the agent in create_agent) and keep the rest of
the logic (sleep + agent.simple_response) unchanged.
🧹 Nitpick comments (3)
plugins/gemini/example/gemini_vlm_agent_example.py (3)
12-12:load_dotenv()at module level causes side effects on import.This runs whenever the module is imported, not just when executed directly. Move it inside the
if __name__ == "__main__":block so importing this module (e.g., for testing or reuse) doesn't silently mutate the environment.Proposed fix
-load_dotenv() - - async def create_agent(**kwargs) -> Agent:if __name__ == "__main__": + load_dotenv() Runner(AgentLauncher(create_agent=create_agent, join_call=join_call)).cli()
15-26: Missing Google-style docstring oncreate_agent.Both
create_agentandjoin_callare the two public, exported entry points of this example. They should have docstrings per the project's guideline.Proposed docstring
async def create_agent(**kwargs) -> Agent: + """Create a Gemini Vision-based agent. + + Builds an Agent configured with a Gemini VLM, ElevenLabs TTS, + Deepgram STT, and a GetStream edge. + + Args: + **kwargs: Additional keyword arguments forwarded by AgentLauncher. + + Returns: + A configured Agent instance. + """ vlm = gemini.VLM()As per coding guidelines, "Docstrings should follow the Google style guide for docstrings".
29-40: Missing Google-style docstring onjoin_call.This is the second exported async function and serves as the call-join entry point wired into
AgentLauncher. A brief docstring would help consumers understand the event subscription and the flow.As per coding guidelines, "Docstrings should follow the Google style guide for docstrings".
| async def on_participant_joined(event: CallSessionParticipantJoinedEvent): | ||
| if event.participant.user.id != "gemini-vision-agent": | ||
| await asyncio.sleep(2) | ||
| await agent.simple_response("Describe the scene.") |
There was a problem hiding this comment.
Hardcoded agent ID is fragile — use agent.agent_user.id instead.
The string "gemini-vision-agent" duplicates the ID set in create_agent (line 20). If the ID is ever changed in one place but not the other, the agent will react to its own join events. The agent parameter is right there — use it.
Proposed fix
`@agent.events.subscribe`
async def on_participant_joined(event: CallSessionParticipantJoinedEvent):
- if event.participant.user.id != "gemini-vision-agent":
+ if event.participant.user.id != agent.agent_user.id:
await asyncio.sleep(2)
await agent.simple_response("Describe the scene.")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def on_participant_joined(event: CallSessionParticipantJoinedEvent): | |
| if event.participant.user.id != "gemini-vision-agent": | |
| await asyncio.sleep(2) | |
| await agent.simple_response("Describe the scene.") | |
| async def on_participant_joined(event: CallSessionParticipantJoinedEvent): | |
| if event.participant.user.id != agent.agent_user.id: | |
| await asyncio.sleep(2) | |
| await agent.simple_response("Describe the scene.") |
🤖 Prompt for AI Agents
In `@plugins/gemini/example/gemini_vlm_agent_example.py` around lines 34 - 37,
Replace the hardcoded string check in on_participant_joined with a comparison
against the current agent's user id (use agent.agent_user.id) so the handler
doesn't react to the agent itself; locate the async def
on_participant_joined(event: CallSessionParticipantJoinedEvent) and change the
condition that compares event.participant.user.id to use agent.agent_user.id
(the same id set when creating the agent in create_agent) and keep the rest of
the logic (sleep + agent.simple_response) unchanged.
Co-authored-by: nash <nash@getstream.io>
Co-authored-by: nash <nash@getstream.io>
f7553c0 to
5091602
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/moondream/example/moondream_vlm_example.py (1)
6-11:⚠️ Potential issue | 🟡 MinorDuplicate import:
CallSessionParticipantJoinedEventis imported twice, and line 11 shadows line 6.The newly added import on line 6 (
from getstream.models) is immediately shadowed by the existing import on line 11 (from vision_agents.plugins.getstream). The line-6 import is dead code — a bell jar of unreachable intent. Remove it.Proposed fix
-from getstream.models import CallSessionParticipantJoinedEvent -
🤖 Fix all issues with AI agents
In `@plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py`:
- Around line 116-152: The _build_config method currently assigns config =
base_config or self._base_config and then mutates it, which leaks state; fix by
creating a copy (e.g., copy.deepcopy) of whichever config is chosen before
setting fields so the original GenerateContentConfig instance is not mutated.
Update _build_config to import copy, set config = copy.deepcopy(base_config) if
base_config is not None, elif self._base_config is not None then config =
copy.deepcopy(self._base_config), else config = GenerateContentConfig(), and
then apply changes to the copied config (references: _build_config,
self._base_config, base_config, GenerateContentConfig).
🧹 Nitpick comments (1)
plugins/gemini/tests/test_gemini_vlm.py (1)
23-34: Fixture type hint should beAsyncGeneratorrather thanVLMsince it yields.This is a minor typing nit — pytest handles it fine at runtime, but static type checkers would flag the annotation.
✏️ Proposed fix
+from collections.abc import AsyncGenerator + `@pytest.fixture` -async def vlm() -> VLM: +async def vlm() -> AsyncGenerator[VLM, None]:
| def _build_config( | ||
| self, | ||
| system_instruction: Optional[str] = None, | ||
| base_config: Optional[GenerateContentConfig] = None, | ||
| ) -> GenerateContentConfig: | ||
| """ | ||
| Build GenerateContentConfig with Gemini 3 features. | ||
|
|
||
| Args: | ||
| system_instruction: Optional system instruction to include. | ||
| base_config: Optional base config to extend. | ||
|
|
||
| Returns: | ||
| GenerateContentConfig with configured Gemini settings. | ||
| """ | ||
| if base_config is not None: | ||
| config = base_config | ||
| elif self._base_config is not None: | ||
| config = self._base_config | ||
| else: | ||
| config = GenerateContentConfig() | ||
|
|
||
| effective_instruction = ( | ||
| system_instruction if system_instruction else self._instructions | ||
| ) | ||
| if effective_instruction: | ||
| config.system_instruction = effective_instruction | ||
|
|
||
| if self.thinking_level: | ||
| from google.genai.types import ThinkingConfig | ||
|
|
||
| config.thinking_config = ThinkingConfig(thinking_level=self.thinking_level) | ||
|
|
||
| if self.media_resolution: | ||
| config.media_resolution = self.media_resolution | ||
|
|
||
| return config |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Locate the file and verify it exists
find . -name "gemini_vlm.py" -type fRepository: GetStream/Vision-Agents
Length of output: 126
🏁 Script executed:
# Read the specific lines mentioned (116-152) to confirm the code matches
sed -n '116,152p' plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.pyRepository: GetStream/Vision-Agents
Length of output: 1286
🏁 Script executed:
# Check the broader context - __init__ and how _base_config is set
sed -n '1,150p' plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py | head -80Repository: GetStream/Vision-Agents
Length of output: 2660
🏁 Script executed:
# Search for all usages of _base_config in the file
rg "_base_config" plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py -n -B2 -A2Repository: GetStream/Vision-Agents
Length of output: 820
🌐 Web query:
google-genai Python GenerateContentConfig copy method or model_copy
💡 Result:
In google-genai, google.genai.types.GenerateContentConfig is a Pydantic BaseModel type, so you copy it using Pydantic’s APIs. [1]
Recommended (Pydantic v2): model_copy
from google.genai import types
base = types.GenerateContentConfig(temperature=0.2, max_output_tokens=256)
cfg2 = base.model_copy(update={"temperature": 0.7}) # shallow copy + override
cfg3 = base.model_copy(deep=True, update={"seed": 1234}) # deep copy (nested models too)model_copy(update=..., deep=...) is the supported method in Pydantic v2. [2]
Legacy: copy(...) (deprecated in Pydantic v2)
BaseModel.copy(...) still exists but is deprecated in Pydantic v2 in favor of model_copy. [2]
If you need include/exclude semantics
Per Pydantic guidance, do:
data = base.model_dump(include=..., exclude=..., round_trip=True)
data |= {"temperature": 0.7}
cfg = type(base).model_validate(data)[2]
Sources: Google Gen AI Python SDK type docs for GenerateContentConfig (BaseModel) [1]; Pydantic docs for model_copy and copy deprecation [2].
[1] (googleapis.github.io)
[2] (docs.pydantic.dev)
Citations:
- 1: https://googleapis.github.io/python-genai/genai.html
- 2: https://docs.pydantic.dev/dev/api/base_model/?utm_source=openai
🏁 Script executed:
# Search for how GenerateContentConfig is used elsewhere in the codebase
rg "GenerateContentConfig" --type py -A3 -B1 | head -100Repository: GetStream/Vision-Agents
Length of output: 8922
🏁 Script executed:
# Check if there are any imports of copy module or usage patterns in the file
rg "copy\.|model_copy\|\.copy\(" plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.pyRepository: GetStream/Vision-Agents
Length of output: 49
🏁 Script executed:
# Search for other instances where config objects might be copied or cloned
rg "deepcopy|model_copy|copy\(" plugins/gemini/ -t py | head -20Repository: GetStream/Vision-Agents
Length of output: 292
_build_config mutates self._base_config in place — this leaks state across calls.
When base_config is None, the method falls back to self._base_config and directly sets attributes on it (lines 142, 147, 150). Since this is a shared, long-lived object, every call to _build_config mutates it. If a user passes one GenerateContentConfig to multiple GeminiVLM instances, they cross-contaminate.
Deep-copy the config before mutating to isolate changes:
🛠️ Proposed fix — deep-copy before mutating
+ import copy
+
def _build_config(
self,
system_instruction: Optional[str] = None,
base_config: Optional[GenerateContentConfig] = None,
) -> GenerateContentConfig:
if base_config is not None:
- config = base_config
+ config = copy.deepcopy(base_config)
elif self._base_config is not None:
- config = self._base_config
+ config = copy.deepcopy(self._base_config)
else:
config = GenerateContentConfig()🤖 Prompt for AI Agents
In `@plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py` around lines 116 -
152, The _build_config method currently assigns config = base_config or
self._base_config and then mutates it, which leaks state; fix by creating a copy
(e.g., copy.deepcopy) of whichever config is chosen before setting fields so the
original GenerateContentConfig instance is not mutated. Update _build_config to
import copy, set config = copy.deepcopy(base_config) if base_config is not None,
elif self._base_config is not None then config =
copy.deepcopy(self._base_config), else config = GenerateContentConfig(), and
then apply changes to the copied config (references: _build_config,
self._base_config, base_config, GenerateContentConfig).
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py`:
- Around line 162-163: The current chat session (self.chat) can remain in a
broken state if send_message_stream fails, so modify the error handling in the
method that calls send_message_stream (used by simple_response) to reset
self.chat = None in the except/failure path; specifically, after catching errors
from self.client.chats.create(...) / send_message_stream, set self.chat = None
and rethrow or handle the error so the next call to simple_response will
recreate the chat via the block that does "if self.chat is None: self.chat =
self.client.chats.create(model=self.model, config=self._config)".
- Around line 198-215: The code sets first_token_time but never populates
is_first_chunk or time_to_first_token_ms on the LLMResponseChunkEvent; fix by
adding a sent_first_chunk boolean (init False) and when _extract_text_from_chunk
yields the first non-empty chunk compute time_to_first_token_ms =
int((time.perf_counter() - first_token_time)*1000) (ensure first_token_time is
set when first token seen), set is_first_chunk=True and include
time_to_first_token_ms in the LLMResponseChunkEvent payload sent via
self.events.send (use same event creation site where LLMResponseChunkEvent is
constructed), then set sent_first_chunk = True for subsequent chunks so those
fields are not included.
🧹 Nitpick comments (1)
plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py (1)
95-103: Consider storing_base_configdefensively when constructed from**kwargs.When
kwargsis provided (line 98),GenerateContentConfig(**kwargs)could raise aValidationErrorif invalid keys are passed. This would bubble up from__init__with a potentially confusing traceback. This is acceptable behavior (fail-fast), but worth noting that invalid kwargs won't produce a helpful error message pointing to theGeminiVLMconstructor.Also,
self.chat(line 102) is typedOptional[Any]— consider using the SDK'sAsyncChattype for better IDE support and type safety.
| if self.chat is None: | ||
| self.chat = self.client.chats.create(model=self.model, config=self._config) |
There was a problem hiding this comment.
Chat session is never reset on error — a failed stream may leave the chat in a broken state.
If send_message_stream fails partway (e.g., the API returns an error mid-stream), self.chat retains the partially-corrupted session. The next call to simple_response will reuse it, potentially causing repeated failures. Consider resetting self.chat = None in the except block so a fresh session is created on the next call.
🤖 Prompt for AI Agents
In `@plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py` around lines 162 -
163, The current chat session (self.chat) can remain in a broken state if
send_message_stream fails, so modify the error handling in the method that calls
send_message_stream (used by simple_response) to reset self.chat = None in the
except/failure path; specifically, after catching errors from
self.client.chats.create(...) / send_message_stream, set self.chat = None and
rethrow or handle the error so the next call to simple_response will recreate
the chat via the block that does "if self.chat is None: self.chat =
self.client.chats.create(model=self.model, config=self._config)".
| idx = 0 | ||
| async for chunk in iterator: | ||
| final_chunk = chunk | ||
| chunk_text = self._extract_text_from_chunk(chunk) | ||
| if chunk_text: | ||
| if first_token_time is None: | ||
| first_token_time = time.perf_counter() | ||
|
|
||
| self.events.send( | ||
| LLMResponseChunkEvent( | ||
| plugin_name=PLUGIN_NAME, | ||
| content_index=idx, | ||
| item_id=item_id, | ||
| delta=chunk_text, | ||
| ) | ||
| ) | ||
| text_parts.append(chunk_text) | ||
| idx += 1 |
There was a problem hiding this comment.
First-chunk metadata (is_first_chunk, time_to_first_token_ms) never emitted on the LLMResponseChunkEvent.
You compute first_token_time (line 203-204) but never pass is_first_chunk=True or time_to_first_token_ms to LLMResponseChunkEvent. Consumers relying on these fields for TTFT metrics will never see them.
Proposed fix
if first_token_time is None:
first_token_time = time.perf_counter()
+ ttft = (first_token_time - request_start_time) * 1000
self.events.send(
LLMResponseChunkEvent(
plugin_name=PLUGIN_NAME,
content_index=idx,
item_id=item_id,
delta=chunk_text,
+ is_first_chunk=(first_token_time is not None and idx == 0),
+ time_to_first_token_ms=ttft if (first_token_time is not None and idx == 0) else None,
)
)A cleaner approach: track a boolean sent_first_chunk and set the fields on the first text-bearing chunk.
🤖 Prompt for AI Agents
In `@plugins/gemini/vision_agents/plugins/gemini/gemini_vlm.py` around lines 198 -
215, The code sets first_token_time but never populates is_first_chunk or
time_to_first_token_ms on the LLMResponseChunkEvent; fix by adding a
sent_first_chunk boolean (init False) and when _extract_text_from_chunk yields
the first non-empty chunk compute time_to_first_token_ms =
int((time.perf_counter() - first_token_time)*1000) (ensure first_token_time is
set when first token seen), set is_first_chunk=True and include
time_to_first_token_ms in the LLMResponseChunkEvent payload sent via
self.events.send (use same event creation site where LLMResponseChunkEvent is
constructed), then set sent_first_chunk = True for subsequent chunks so those
fields are not included.
Add
GeminiVLMto the Gemini plugin to enable multimodal (text + video) interactions with Gemini 3 Vision models.The
GeminiVLMbuffers video frames, converts them to JPEG, and sends them alongside text prompts to Gemini 3 Vision models, leveraging features likethinking_levelandmedia_resolutionfor enhanced multimodal processing.Summary by CodeRabbit
Release Notes
New Features
Documentation
Tests