In [None]:
google_gemini_api_key = "Your key here"
google_search_api_key = "Your key here"
google_search_engine_id = "Your key here"
google_gmail_client_id = "Your key here"

In [None]:
import base64
import os.path
from email.message import EmailMessage

from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError


SCOPES = ["https://www.googleapis.com/auth/gmail.compose"]


def create_gmail_draft_with_auth(To, From, Subject, Body):
  """
  Handles user authentication and creates a new draft email.

  Args:
    To: Email address of the recipient.
    From: Email address of the sender.
    Subject: The subject of the email.
    Body: The plain text content of the email.
  """
  creds = None

  if os.path.exists("token.json"):
    creds = Credentials.from_authorized_user_file("token.json", SCOPES)

  # If there are no (valid) credentials available, let the user log in.
  if not creds or not creds.valid:
    if creds and creds.expired and creds.refresh_token:
      creds.refresh(Request())
    else:
      # You must have a 'client_secrets.json' file in this directory
      if not os.path.exists("client_secrets.json"):
        print("\n--- ERROR ---")
        print("Missing 'client_secrets.json'.")
        print(
            "Please download it from your Google Cloud Console and place it in this directory."
        )
        print("---------------\n")
        return None

      flow = InstalledAppFlow.from_client_secrets_file(
          "client_secrets.json", SCOPES
      )
      creds = flow.run_local_server(port=0)

    # Save the credentials for the next run
    with open("token.json", "w") as token:
      token.write(creds.to_json())

  if creds:
    try:
      # create gmail api client
      service = build("gmail", "v1", credentials=creds)

      message = EmailMessage()
      message.set_content(Body)
      message["To"] = To
      message["From"] = From
      message["Subject"] = Subject

      # encoded message
      encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()

      create_message = {"message": {"raw": encoded_message}}

      # pylint: disable=E1101
      draft = (
          service.users()
          .drafts()
          .create(userId="me", body=create_message)
          .execute()
      )

      print(f'Draft id: {draft["id"]}\nDraft message: {draft["message"]}')
      return draft

    except HttpError as error:
      print(f"An error occurred: {error}")
      return None
  
  print("Could not obtain credentials.")
  return None




In [28]:
import requests

def google_search(query, api_key, cse_id, num_results=5):
    search_url = "https://www.googleapis.com/customsearch/v1"
    params = {
        'q': query,
        'key': api_key,
        'cx': cse_id,
    }
    response = requests.get(search_url, params=params)
    results = response.json().get('items', [])[:num_results]

    out = []
    for item in results:
        title = item.get("title")
        snippet = item.get("snippet")
        link = item.get("link")
        out.append({"title": title, "content": snippet, "link": link})

    return out

In [29]:
import wikipedia
def wiki_search(query, num_results=5):
    search_results = wikipedia.search(query, results=num_results)
    out = []
    for title in search_results:
        try:
            page = wikipedia.page(title)
            summary = wikipedia.summary(title, sentences=2)
            out.append({"title": title, "content": summary, "link": page.url})
        except wikipedia.DisambiguationError as e:
            continue
        except wikipedia.PageError as e:
            continue
    return out

In [None]:
import requests
import trafilatura
# from bs4 import BeautifulSoup # You can remove this import if you want

def fetch_page_content(url):
    """
    Fetch and extract main article content using trafilatura,
    which is more robust against anti-scraping.
    """
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
    }
    try:
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        
        # Extract the main content from the HTML

        page_text = trafilatura.extract(response.text, include_comments=False, include_tables=False)
        
        if not page_text:
            return {"error": "Could not extract main content from the page."}
        
        return page_text[:4000] # Return content string on success
    except Exception as e:
        print(f"Error fetching {url}: {e}")
        # Return an error dictionary on failure
        return {"error": f"Could not fetch page content. Error: {e}"}

In [36]:
# --- Tool Definitions Cell ---

from google.genai import types

tools = [
    types.Tool(
        function_declarations=[
            types.FunctionDeclaration(
                name="google_search",
                description="Search Google using the Custom Search API",
                parameters=types.Schema(
                    type="object",
                    properties={
                        "query": {"type": "string"},
                        "num_results": {"type": "integer"},
                    },
                    required=["query"]
                ),
            )
        ]
    ),
    types.Tool(
        function_declarations=[
            types.FunctionDeclaration(
                name="fetch_page_content",
                description="Fetch and clean readable text from a given webpage URL",
                parameters=types.Schema(
                    type="object",
                    properties={
                        "url": {"type": "string"},
                    },
                    required=["url"]
                ),
            )
        ]
    ),
    types.Tool(
        function_declarations=[
            types.FunctionDeclaration(
                name="wiki_search",
                description="Search Wikipedia for relevant articles",
                parameters=types.Schema(
                    type="object",
                    properties={
                        "query": {"type": "string"},
                        "num_results": {"type": "integer"},
                    },
                    required=["query"]
                ),
            )
        ]
    ),
    types.Tool(
        function_declarations=[
            types.FunctionDeclaration(
                name="create_gmail_draft_with_auth",
                description="Create a Gmail draft email using authenticated user credentials",
                parameters=types.Schema(
                    type="object",
                    properties={
                        "To": {"type": "string"},
                        "From": {"type": "string"},
                        "Subject": {"type": "string"},
                        "Body": {"type": "string"},
                    },
                    required=["To", "From", "Subject", "Body"]
                ),
            )
        ]
    ),
]

print("Tools list defined.")

Tools list defined.


In [None]:
from google import genai
from google.genai import types
import os
import json
import sys
from datetime import datetime
import pprint

# -----------------------------
# Gemini Client and Tool Setup
# (Assumes your 'tools' list is defined in a cell above)
# -----------------------------

client = genai.Client(api_key=google_gemini_api_key)
config = types.GenerateContentConfig(tools=tools)


conversation_history = [
    types.Content(role="user", parts=[
        types.Part(text="""You are a reliable assistant that can use external tools to retrieve accurate,
    up-to-date information from the web. Always start with `Google Search` to find
    relevant sources, then use `fetch_page_content` if needed, and summarize the results.""")
    ]),
    types.Content(role="model", parts=[
        types.Part(text="Okay, I'm ready to help. What would you like to know?")
    ])
]
print("--- Initializing conversation. History loaded with 2 setup messages. ---")


def print_debug_history(history, max_turns=10):
    print("\n" + "="*50)
    print(f"DEBUG: PRINTING LAST {min(len(history), max_turns)} HISTORY TURNS (Max: {max_turns})")
    print("="*50)
    start_index = max(0, len(history) - max_turns)
    debug_history = history[start_index:]
    for i, content in enumerate(debug_history, start=start_index):
        role = content.role
        print(f"\n--- Turn {i} ({role}) ---")
        try:
            if role == "model" and hasattr(content.parts[0], 'function_call') and content.parts[0].function_call.name:
                fc = content.parts[0].function_call
                print(f"Function Call: {fc.name}")
                print(f"Args:\n{pprint.pformat(dict(fc.args))}")
            elif role == "tool" and hasattr(content.parts[0], 'function_response') and content.parts[0].function_response.name:
                 fr = content.parts[0].function_response
                 print(f"Function Response: {fr.name}")
                 response_dict = getattr(fr, 'response', {})
                 if isinstance(response_dict, dict):
                      print(f"Data:\n{pprint.pformat(dict(response_dict))}")
                 else:
                      print(f"Data (raw): {response_dict}")
            else:
                 combined_text = "".join(part.text for part in content.parts if hasattr(part, 'text'))
                 print(combined_text)
        except Exception as e:
            print(f"[Debug print error: {e}] - Raw parts: {content.parts}")
    print("\n" + "="*50 + " END OF HISTORY PRINT " + "="*50 + "\n")


# --- Main Interaction Loop ---
while True:
    print("\n" + "-"*70)
    if conversation_history and conversation_history[-1].role != "model":
        print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        print(f"DEBUG ERROR: Expected last turn to be 'model', but got '{conversation_history[-1].role}'.")
        print("Attempting to recover by adding placeholder model response.")
        print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        conversation_history.append(types.Content(role="model", parts=[types.Part(text="[Error recovery placeholder]")]))
        # Optionally break or raise an error here if recovery is not desired

    user_prompt = input("USER: ")
    print(user_prompt)
    if user_prompt.lower() in ["exit", "quit"]:
        print("Exiting conversation.")
        break

    print(f"DEBUG: Appending USER turn.")
    conversation_history.append(
        types.Content(role="user", parts=[types.Part(text=user_prompt)])
    )

    # --- Start Interaction ---
    try:
        #print_debug_history(conversation_history, max_turns=10)
        print("DEBUG: Calling generate_content (Cycle Start)...")
        response = client.models.generate_content(
            model="gemini-2.5-flash",
            contents=conversation_history[-10:], # Send recent history
            config=config,
        )

        if not response.candidates:
            raise ValueError("Model returned no candidates.") # Treat as an error

        model_response_content = response.candidates[0].content

        # --- Process Potential Function Calls ---
        if response.function_calls:
            print("DEBUG: Model response IS a function call.")
            conversation_history.append(model_response_content) # Append the function call request

            calls = response.function_calls
            print(f"\n--- Model requested {len(calls)} tool call(s) ---")
            tool_results_parts = []
            all_calls_valid = True # Flag to track if we should proceed

            for call in calls:
                func_name = call.name
                args = dict(call.args)
                print(f"Executing: {func_name}({args})")
                response_data = {}
                missing_arg = False

                # Argument Validation
                required_args = []
                if func_name == "google_search": required_args = ["query"]
                elif func_name == "fetch_page_content": required_args = ["url"]
                elif func_name == "wiki_search": required_args = ["query"]
                elif func_name == "create_gmail_draft_with_auth": required_args = ["To", "From", "Subject", "Body"]

                for req_arg in required_args:
                    if req_arg not in args or not args[req_arg]:
                        print(f"ERROR: Missing required argument '{req_arg}' for tool '{func_name}'.")
                        response_data = {"content": {"error": f"Missing required argument: {req_arg}"}}
                        missing_arg = True
                        all_calls_valid = False # Mark that at least one call failed validation
                        break

                if missing_arg:
                    # Append error for this specific call
                    tool_results_parts.append(types.Part(function_response=types.FunctionResponse(name=func_name, response=response_data)))
                    # Continue processing other calls if any, but we won't make the final model call if any failed
                    continue # Go to the next function call

                # Execute Tool if Arguments are Valid
                try:
                    if func_name == "google_search":
                        results = google_search(query=args["query"], api_key=google_search_api_key, cse_id=google_search_engine_id, num_results=args.get("num_results", 5))
                        response_data = {"content": results}
                    elif func_name == "fetch_page_content":
                        content = fetch_page_content(url=args["url"])
                        response_data = {"content": content}
                    elif func_name == "wiki_search":
                         results = wiki_search(query=args["query"], num_results=args.get("num_results", 3))
                         response_data = {"content": results}
                    elif func_name == "create_gmail_draft_with_auth":
                          draft = create_gmail_draft_with_auth(To=args["To"], From=args["From"], Subject=args["Subject"], Body=args["Body"])
                          response_data = {"content": f"Draft created successfully: {draft}"}
                    else:
                        print(f"Unknown function call: {func_name}")
                        response_data = {"content": {"error": f"Unknown tool {func_name}"}}
                except Exception as e:
                     print(f"ERROR executing tool {func_name}: {e}")
                     response_data = {"content": {"error": f"Error during tool execution: {e}"}}
                     all_calls_valid = False # Mark failure

                tool_results_parts.append(
                    types.Part(function_response=types.FunctionResponse(
                        name=func_name,
                        response=response_data
                    ))
                )
            # --- End For Loop (Processing Calls) ---

            print(f"DEBUG: Appending TOOL turn.")
            conversation_history.append(
                types.Content(role="tool", parts=tool_results_parts)
            )

            if not all_calls_valid:
                 print("DEBUG: One or more tool calls failed validation or execution. Skipping final model call for this turn.")
                 # Add a placeholder model response indicating the tool failure
                 model_response_content = types.Content(role="model", parts=[types.Part(text="I encountered an error trying to use my tools. Please check the arguments or try again.")])
                 conversation_history.append(model_response_content)
                 # Go directly to final print for this turn

            else:
                 print(f"\n--- Added {len(tool_results_parts)} tool results. Re-prompting model... ---")
                 #print_debug_history(conversation_history, max_turns=10)
                 print("DEBUG: Calling generate_content (after tool)...")
                 response = client.models.generate_content(
                     model="gemini-2.5-flash",
                     contents=conversation_history[-10:],
                     config=config,
                 )

                 if not response.candidates:
                     raise ValueError("Model returned no candidates after tool call.")

                 model_response_content = response.candidates[0].content

                 # Check if the response *after* the tool call is ANOTHER function call
                 if response.function_calls:
                      print("DEBUG WARNING: Model requested another function call immediately after a tool response. This might loop, appending placeholder.")
                      # Append a placeholder to avoid potential infinite loops and history corruption
                      model_response_content = types.Content(role="model", parts=[types.Part(text="[Unexpected function call loop detected]")])

                 # Append the final text response (or the warning placeholder)
                 print("DEBUG: Appending final MODEL turn (after tool).")
                 conversation_history.append(model_response_content)

        # --- End If Function Call Block ---
        else:
             # --- It was a Text Response ---
             print("DEBUG: Model response is TEXT.")
             conversation_history.append(model_response_content) # Append the text response
             # No further action needed, proceed to final print

    # --- Error Handling for the whole interaction block ---
    except Exception as e:
        print(f"\n--- CRITICAL ERROR during model interaction: {e} ---")
        # Ensure a model response is added, even if it's an error
        model_response_content = types.Content(role="model", parts=[types.Part(text=f"[Critical error during processing: {e}]")])
        # Only append if the last turn wasn't already this error
        if not conversation_history or conversation_history[-1] is not model_response_content:
             conversation_history.append(model_response_content)

    # --- Output Final Response ---
    final_model_content = conversation_history[-1] # Should always be the last model response
    final_text = ""

    if final_model_content.role == "model":
        for part in final_model_content.parts:
             if hasattr(part, 'text'):
                  final_text += part.text
    else:
        print(f"DEBUG: ERROR - Last history item was '{final_model_content.role}', expected 'model'!")
        final_text = "[Error in history processing - check logs]"


    print("\n--- MODEL RESPONSE ---")
    if final_text:
        print(final_text)
    else:
         # Check if it ended because of a function call that wasn't expected
         try:
              if final_model_content.parts[0].function_call.name:
                   print("[Error: Loop ended unexpectedly on a function call - check logs]")
         except: # No function call, expected case if text is empty
              print("[Model had no text response or an error occurred - check logs]")

--- Initializing conversation. History loaded with 2 setup messages. ---

----------------------------------------------------------------------
DEBUG: Appending USER turn.
DEBUG: Calling generate_content (Cycle Start)...
DEBUG: Model response IS a function call.

--- Model requested 1 tool call(s) ---
Executing: google_search({'query': 'RTX 5070'})
DEBUG: Appending TOOL turn.

--- Added 1 tool results. Re-prompting model... ---
DEBUG: Calling generate_content (after tool)...
DEBUG: Appending final MODEL turn (after tool).

--- MODEL RESPONSE ---
The NVIDIA GeForce RTX 5070 is a high-end graphics card that is part of NVIDIA's 50-series, based on the Blackwell architecture. It is built on a 5 nm process and utilizes the GB205 graphics processor. The card is expected to be launched around March or April 2025 and features 12GB of GDDR7 VRAM. There is also a mention of an RTX 5070 Ti.

----------------------------------------------------------------------
DEBUG: Appending USER turn.
DEBUG: