<a href="https://colab.research.google.com/github/Qarza/Qarza/blob/main/Python_MCP_Server_Skeleton_for_Gemini_AI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import asyncio
import json
import os
import aiohttp # Required: pip install aiohttp

# --- Configuration ---
# Gemini API Key: Best practice is to use an environment variable.
# In the Canvas environment, the API key might be injected or handled differently.
# For local development, you might set it like: export GEMINI_API_KEY="YOUR_API_KEY"
# If no API key is provided, the Gemini API call will likely fail.
# For models like gemini-2.0-flash, an API key might not be strictly needed if running in a managed environment
# that provides it automatically. Otherwise, it's required.
API_KEY = os.getenv("GEMINI_API_KEY", "") # Default to empty string

# Gemini API Endpoint - using gemini-2.0-flash as an example
# Adjust the model name (e.g., gemini-pro) as needed.
GEMINI_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key={API_KEY}"

# Server Configuration
HOST = '0.0.0.0'  # Listen on all available network interfaces
PORT = 8888       # Port for the server to listen on

# --- Gemini API Interaction ---
async def get_gemini_response(session: aiohttp.ClientSession, user_prompt: str, chat_history: list = None) -> str:
    """
    Asynchronously sends a prompt to the Gemini API and returns the response.

    Args:
        session: An aiohttp.ClientSession object for making HTTP requests.
        user_prompt: The prompt text from the user.
        chat_history: Optional list of previous messages for conversational context.
                      Example: [{"role": "user", "parts": [{"text": "Hello"}]},
                                {"role": "model", "parts": [{"text": "Hi there!"}]}]

    Returns:
        The text response from Gemini AI, or an error message.
    """
    if not chat_history:
        chat_history = []

    # Add the current user prompt to the history
    chat_history.append({"role": "user", "parts": [{"text": user_prompt}]})

    payload = {
        "contents": chat_history
        # You can add generationConfig here if needed, e.g.,
        # "generationConfig": {
        #   "temperature": 0.7,
        #   "maxOutputTokens": 1000
        # }
    }

    headers = {
        'Content-Type': 'application/json'
    }

    print(f"Sending to Gemini: {json.dumps(payload, indent=2)}") # Log the request payload

    try:
        async with session.post(GEMINI_API_URL, json=payload, headers=headers) as response:
            response_data = await response.json()
            print(f"Received from Gemini: {json.dumps(response_data, indent=2)}") # Log the response

            if response.status == 200:
                if response_data.get("candidates") and \
                   response_data["candidates"][0].get("content") and \
                   response_data["candidates"][0]["content"].get("parts") and \
                   response_data["candidates"][0]["content"]["parts"][0].get("text"):

                    gemini_text_response = response_data["candidates"][0]["content"]["parts"][0]["text"]
                    # Add Gemini's response to chat history for context in next turn
                    chat_history.append({"role": "model", "parts": [{"text": gemini_text_response}]})
                    return gemini_text_response
                else:
                    error_message = "Error: Unexpected response structure from Gemini API."
                    print(error_message)
                    print("Full response:", response_data)
                    # chat_history.pop() # Remove user prompt if API call failed structurally
                    return error_message
            else:
                error_message = f"Error: Gemini API request failed with status {response.status}."
                print(error_message)
                print("Error details:", response_data)
                # chat_history.pop() # Remove user prompt if API call failed
                return error_message
    except aiohttp.ClientConnectorError as e:
        error_message = f"Error: Could not connect to Gemini API. {e}"
        print(error_message)
        # chat_history.pop()
        return error_message
    except json.JSONDecodeError:
        error_message = "Error: Could not decode JSON response from Gemini API."
        print(error_message)
        # chat_history.pop()
        return error_message
    except Exception as e:
        error_message = f"An unexpected error occurred while calling Gemini API: {e}"
        print(error_message)
        # chat_history.pop()
        return error_message

# --- Client Handling ---
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    """
    Handles a single client connection.
    Receives messages, sends them to Gemini, and returns responses.
    """
    client_address = writer.get_extra_info('peername')
    print(f"Accepted connection from {client_address}")

    # Initialize chat history for this client session
    # This simple example keeps history in memory per connection.
    # For persistence or shared history, you'd need a database or other storage.
    client_chat_history = []

    # Create a single aiohttp session for the duration of this client connection
    async with aiohttp.ClientSession() as session:
        try:
            while True:
                # 1. Read data from the client
                #    We expect clients to send UTF-8 encoded messages followed by a newline.
                data = await reader.readline()
                if not data:
                    print(f"Client {client_address} disconnected (no data).")
                    break # Connection closed by client

                message = data.decode('utf-8').strip()
                if not message: # Ignore empty messages
                    continue

                print(f"Received from {client_address}: {message}")

                if message.lower() == 'exit':
                    print(f"Client {client_address} requested exit.")
                    writer.write("Goodbye!\n".encode('utf-8'))
                    await writer.drain()
                    break

                # 2. Get response from Gemini AI
                #    The client_chat_history is passed to maintain conversation context.
                #    It will be updated by get_gemini_response.
                ai_response = await get_gemini_response(session, message, client_chat_history)

                # 3. Send AI response back to the client
                print(f"Sending to {client_address}: {ai_response}")
                writer.write((ai_response + '\n').encode('utf-8'))
                await writer.drain()

        except ConnectionResetError:
            print(f"Client {client_address} connection reset.")
        except asyncio.CancelledError:
            print(f"Client handler for {client_address} cancelled.")
        except Exception as e:
            print(f"Error handling client {client_address}: {e}")
            try:
                # Try to inform the client about the error
                error_msg = f"Server error: {e}\n"
                writer.write(error_msg.encode('utf-8'))
                await writer.drain()
            except Exception as ex_send:
                print(f"Could not send error to client {client_address}: {ex_send}")
        finally:
            print(f"Closing connection with {client_address}")
            writer.close()
            await writer.wait_closed()

# --- Main Server Logic ---
async def main():
    """
    Starts the TCP server.
    """
    server = await asyncio.start_server(
        handle_client,
        HOST,
        PORT
    )

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

if __name__ == '__main__':
    print("Starting MCP Server...")
    print(f"Gemini API Key Used: {'Yes' if API_KEY else 'No (Using default or environment-provided)'}")
    print(f"Gemini API URL: {GEMINI_API_URL}")
    print(f"Listening on {HOST}:{PORT}")
    print("Clients can connect via TCP (e.g., using netcat/telnet). Send 'exit' to close a connection.")
    print("Make sure you have 'aiohttp' installed: pip install aiohttp")

    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nServer shutting down...")
    except Exception as e:
        print(f"Failed to start server: {e}")