In [None]:
!pip install -q -U google-generativeai python-a2a

In [None]:
import os

# Replace 'YOUR_GEMINI_API_KEY' with your actual API key
os.environ["GOOGLE_API_KEY"] = "GOOGLE_API_KEY"

In [None]:
from python_a2a import A2AServer, Message, TextContent, MessageRole, run_server
import google.generativeai as genai

# Configure the Gemini API client
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])

class GeminiChatAgent(A2AServer):
    def _init_(self, **kwargs):
        super()._init_(**kwargs)
        self.model = genai.GenerativeModel(model_name="models/gemini-2.0-flash")

    def handle_message(self, message):
        if message.content.type == "text":
            user_input = message.content.text
            try:
                response = self.model.generate_content(user_input)
                reply = response.text
            except Exception as e:
                reply = f"Error: {str(e)}"
            return Message(
                content=TextContent(text=reply),
                role=MessageRole.AGENT,
                parent_message_id=message.message_id,
                conversation_id=message.conversation_id
            )
        else:
            return Message(
                content=TextContent(text="Unsupported message type."),
                role=MessageRole.AGENT,
                parent_message_id=message.message_id,
                conversation_id=message.conversation_id
            )

In [None]:
import threading

def run_in_background():
    run_server(GeminiChatAgent(), host="0.0.0.0", port=5000)

thread = threading.Thread(target=run_in_background)
thread.start()

In [None]:
from python_a2a import A2AClient, Message, TextContent, MessageRole

# Create a client to communicate with the chatbot server
client = A2AClient("http://localhost:5000/a2a")

# Send a message to the chatbot
message = Message(
    content=TextContent(text="Hello, how are you?"),
    role=MessageRole.USER
)
response = client.send_message(message)

# Print the chatbot's response
print(f"Chatbot: {response.content.text}")

**Using pyngrok**

In [None]:
# Install required packages
!pip install -q python-a2a pyngrok

# ==== Setup and Run A2A Agent + Client in Google Colab ====

from python_a2a import A2AServer, Message, TextContent, MessageRole, A2AClient, run_server
from pyngrok import ngrok
import threading
import time

# Replace with your ngrok authtoken
ngrok.set_auth_token("...")

# Kill any existing tunnels
ngrok.kill()

# Define A2A EchoAgent
class EchoAgent(A2AServer):
    def handle_message(self, message):
        if message.content.type == "text":
            return Message(
                content=TextContent(text=f"Echo: {message.content.text}"),
                role=MessageRole.AGENT,
                parent_message_id=message.message_id,
                conversation_id=message.conversation_id
            )

# Start agent using run_server in a thread
def start_agent():
    agent = EchoAgent()
    run_server(agent, host="0.0.0.0", port=5000)

threading.Thread(target=start_agent).start()

# Wait for the server to start
time.sleep(3)

# Start ngrok tunnel
public_url = ngrok.connect(5000)
print("Public URL for A2A Agent:", public_url.public_url + "/a2a")

# A2A CLIENT: send message to agent
client = A2AClient(public_url.public_url + "/a2a")
message = Message(content=TextContent(text="Hello, agent!"), role=MessageRole.USER)
response = client.send_message(message)
print("Agent says:", response.content.text)


In [None]:
from pyngrok import ngrok

tunnels = ngrok.get_tunnels()
print("Existing tunnels:", tunnels)

for tunnel in tunnels:
    print(f"Disconnecting tunnel: {tunnel.public_url}")
    ngrok.disconnect(tunnel.public_url)

print("All tunnels disconnected.")


In [None]:
!pip install python-a2a pyngrok flask nest_asyncio

import nest_asyncio
from flask import Flask, Blueprint, request, jsonify
from python_a2a import A2AServer, A2AClient, Message, TextContent, MessageRole
from pyngrok import ngrok
import threading
import time

nest_asyncio.apply()

class EchoAgent(A2AServer):
    def handle_message(self, message):
        if message.content.type == "text":
            return Message(
                content=TextContent(text=f"Echo: {message.content.text}"),
                role=MessageRole.AGENT,
                parent_message_id=message.message_id,
                conversation_id=message.conversation_id
            )

def create_agent_blueprint(agent_instance):
    bp = Blueprint("echo_agent", __name__)

    @bp.route("/message", methods=["POST"])
    def message():
        data = request.get_json(force=True)
        message = Message.from_dict(data)
        response = agent_instance.handle_message(message)
        return jsonify(response.to_dict())

    return bp

app = Flask(__name__)
agent = EchoAgent()
blueprint = create_agent_blueprint(agent)
app.register_blueprint(blueprint, url_prefix="/a2a")

# Kill any existing tunnels
for tunnel in ngrok.get_tunnels():
    ngrok.disconnect(tunnel.public_url)

def run_flask():
    app.run(host="0.0.0.0", port=5000)

flask_thread = threading.Thread(target=run_flask)
flask_thread.start()

time.sleep(5)  # wait for Flask

public_url = ngrok.connect(5000)
print("Public URL:", public_url)

a2a_url = f"{public_url}/a2a"

client = A2AClient(a2a_url)
message = Message(
    content=TextContent(text="Hello from Colab!"),
    role=MessageRole.USER
)
response = client.send_message(message)

def create_agent_blueprint(agent_instance):
    bp = Blueprint("echo_agent", __name__)

    @bp.route("/message", methods=["POST"])
    def message():
        data = request.get_json(force=True)
        print("Received message data:", data)  # Debug input

        try:
            message = Message.from_dict(data)
            response = agent_instance.handle_message(message)
            print("Response message:", response.to_dict())  # Debug output
            return jsonify(response.to_dict())
        except Exception as e:
            print("Error in handling message:", e)
            from python_a2a import ErrorContent, MessageRole, Message
            error_content = ErrorContent(error=str(e))
            error_message = Message(
                content=error_content,
                role=MessageRole.AGENT,
                parent_message_id=data.get("message_id"),
                conversation_id=data.get("conversation_id")
            )
            return jsonify(error_message.to_dict()), 500

    return bp

