<a href="https://colab.research.google.com/github/harjeet88/agentic_ai/blob/main/autogen/gemini/openai_gemini_collab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [19]:
#!pip install -q autogen-agentchat[gemini]~=0.2
!pip install -U -q autogen-agentchat autogen-ext[gemini]
!pip install -U -q autogen-ext[openai]

In [3]:
import os
from typing import AsyncGenerator, Sequence

from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage
from autogen_core import CancellationToken
from autogen_core.model_context import UnboundedChatCompletionContext
from autogen_core.models import AssistantMessage, RequestUsage, UserMessage
from google import genai
from google.genai import types

In [18]:
from google.colab import userdata
GOOGLE_API_KEY=userdata.get('GOOGLE_API_KEY')
OPENAI_API_KEY=userdata.get('OPENAI_API_KEY')
GEMINI_API_KEY=GOOGLE_API_KEY

In [31]:
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
os.environ["GEMINI_API_KEY"] = GEMINI_API_KEY
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

In [36]:
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import TextMentionTermination,MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console

In [8]:
class GeminiAssistantAgent(BaseChatAgent):
    def __init__(
        self,
        name: str,
        description: str = "An agent that provides assistance with ability to use tools.",
        model: str = "gemini-1.5-flash-002",
        api_key: str = GEMINI_API_KEY,
        system_message: str
        | None = "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed.",
    ):
        super().__init__(name=name, description=description)
        self._model_context = UnboundedChatCompletionContext()
        self._model_client = genai.Client(api_key=api_key)
        self._system_message = system_message
        self._model = model

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        final_response = None
        async for message in self.on_messages_stream(messages, cancellation_token):
            if isinstance(message, Response):
                final_response = message

        if final_response is None:
            raise AssertionError("The stream should have returned the final result.")

        return final_response

    async def on_messages_stream(
        self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
    ) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
        # Add messages to the model context
        for msg in messages:
            await self._model_context.add_message(msg.to_model_message())

        # Get conversation history
        history = [
            (msg.source if hasattr(msg, "source") else "system")
            + ": "
            + (msg.content if isinstance(msg.content, str) else "")
            + "\n"
            for msg in await self._model_context.get_messages()
        ]
        # Generate response using Gemini
        response = self._model_client.models.generate_content(
            model=self._model,
            contents=f"History: {history}\nGiven the history, please provide a response",
            config=types.GenerateContentConfig(
                system_instruction=self._system_message,
                temperature=0.3,
            ),
        )

        # Create usage metadata
        usage = RequestUsage(
            prompt_tokens=response.usage_metadata.prompt_token_count,
            completion_tokens=response.usage_metadata.candidates_token_count,
        )

        # Add response to model context
        await self._model_context.add_message(AssistantMessage(content=response.text, source=self.name))

        # Yield the final response
        yield Response(
            chat_message=TextMessage(content=response.text, source=self.name, models_usage=usage),
            inner_messages=[],
        )

    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        """Handle the reset event."""
        await self._model_context.reset()

In [17]:
gemini_assistant = GeminiAssistantAgent("gemini_assistant")
await Console(gemini_assistant.run_stream(task="What is the capital of New York?"))

---------- TextMessage (user) ----------
What is the capital of New York?
---------- TextMessage (gemini_assistant) ----------
Albany
TERMINATE



TaskResult(messages=[TextMessage(source='user', models_usage=None, metadata={}, created_at=datetime.datetime(2025, 6, 30, 5, 55, 34, 723882, tzinfo=datetime.timezone.utc), content='What is the capital of New York?', type='TextMessage'), TextMessage(source='gemini_assistant', models_usage=RequestUsage(prompt_tokens=46, completion_tokens=5), metadata={}, created_at=datetime.datetime(2025, 6, 30, 5, 55, 35, 251833, tzinfo=datetime.timezone.utc), content='Albany\nTERMINATE\n', type='TextMessage')], stop_reason=None)

In [29]:
from autogen_ext.models.openai import OpenAIChatCompletionClient


In [32]:
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")

In [33]:
# Create the primary agent.
primary_agent = AssistantAgent(
    "primary",
    model_client=model_client,
    system_message="You are a helpful AI assistant.",
)

In [34]:
# Create a critic agent based on our new GeminiAssistantAgent.
gemini_critic_agent = GeminiAssistantAgent(
    "gemini_critic",
    system_message="Provide constructive feedback. Respond with 'APPROVE' to when your feedbacks are addressed.",
)



In [37]:
# Define a termination condition that stops the task if the critic approves or after 10 messages.
termination = TextMentionTermination("APPROVE") | MaxMessageTermination(10)

In [39]:
# Create a team with the primary and critic agents.
team = RoundRobinGroupChat([primary_agent, gemini_critic_agent], termination_condition=termination)

In [42]:
await Console(team.run_stream(task="Write a Haiku poem with 4 lines about the fall season."))
await model_client.close()

---------- TextMessage (user) ----------
Write a Haiku poem with 4 lines about the fall season.


ERROR:autogen_core:Error processing publish message for primary_6955e1d4-905b-449a-9d55-8012d536745a/6955e1d4-905b-449a-9d55-8012d536745a
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/openai/_base_client.py", line 1519, in request
    response = await self._client.send(
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/httpx/_client.py", line 1616, in send
    raise RuntimeError("Cannot send a request, as the client has been closed.")
RuntimeError: Cannot send a request, as the client has been closed.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/autogen_core/_single_threaded_agent_runtime.py", line 605, in _on_message
    return await agent.on_message(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/autogen_core/_base_agent.py", line 119, in on_message
    return await s

RuntimeError: APIConnectionError: Connection error.
Traceback:
Traceback (most recent call last):

  File "/usr/local/lib/python3.11/dist-packages/openai/_base_client.py", line 1519, in request
    response = await self._client.send(
               ^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/dist-packages/httpx/_client.py", line 1616, in send
    raise RuntimeError("Cannot send a request, as the client has been closed.")

RuntimeError: Cannot send a request, as the client has been closed.


The above exception was the direct cause of the following exception:


Traceback (most recent call last):

  File "/usr/local/lib/python3.11/dist-packages/autogen_agentchat/teams/_group_chat/_chat_agent_container.py", line 79, in handle_request
    async for msg in self._agent.on_messages_stream(self._message_buffer, ctx.cancellation_token):

  File "/usr/local/lib/python3.11/dist-packages/autogen_agentchat/agents/_assistant_agent.py", line 852, in on_messages_stream
    async for inference_output in self._call_llm(

  File "/usr/local/lib/python3.11/dist-packages/autogen_agentchat/agents/_assistant_agent.py", line 981, in _call_llm
    model_result = await model_client.create(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/dist-packages/autogen_ext/models/openai/_openai_client.py", line 624, in create
    result: Union[ParsedChatCompletion[BaseModel], ChatCompletion] = await future
                                                                     ^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/dist-packages/openai/resources/chat/completions/completions.py", line 2028, in create
    return await self._post(
           ^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/dist-packages/openai/_base_client.py", line 1784, in post
    return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/dist-packages/openai/_base_client.py", line 1551, in request
    raise APIConnectionError(request=request) from err

openai.APIConnectionError: Connection error.
