<a href="https://colab.research.google.com/github/abdullah75f/function_calling_gemini/blob/main/fix_water_conservation_tip.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Cell 1: Installation
!pip install requests google-generativeai pyttsx3 gtts beautifulsoup4 -q
print("Dependencies installed.")

In [None]:
# Cell 2: Imports and Setup (REVISED - Added glm import)
import os
import requests
import random
import subprocess # Needed for set_volume
import json # Needed for function call responses

from gtts import gTTS
from IPython.display import Audio, display # Ensure display is imported
import google.generativeai as genai
from bs4 import BeautifulSoup
import google.ai.generativelanguage as glm # <-- ADD THIS IMPORT

# Check the GenAI version (optional but good practice)
try:
    print(f"Using google-generativeai version: {genai.__version__}")
except Exception as e:
    print(f"Could not check google-generativeai version: {e}")
# Check glm version if possible
try:
    # Note: glm might not have a standard __version__ attribute
    pass
except Exception as e:
    pass

print("Libraries imported.")

In [None]:
# Cell 3: API Key and Model Initialization
_in_colab = False
api_key = None
model = None # Initialize model variable

print("Attempting to configure Gemini...")

# Check if running in Colab and import userdata if available
try:
    from google.colab import userdata
    print("Successfully imported google.colab.userdata. Running in Colab.")
    _in_colab = True
except ImportError:
    userdata = None
    print("Warning: 'google.colab.userdata' not found. Assuming not running in Colab.")

# Get the API Key using the appropriate method
try:
    if _in_colab and userdata:
        print("Attempting to retrieve API key from Colab Secrets...")
        # *** Ensure 'GEMINI_API_KEY' matches the name in your Colab Secrets UI ***
        api_key = userdata.get('GEMINI_API_KEY')
        if not api_key:
            raise ValueError("Secret 'GEMINI_API_KEY' not found or access not enabled in Colab Secrets.")
        print("Successfully retrieved API key from Colab Secrets.")

    else: # Fallback for non-Colab environments
        print("Attempting to retrieve API key from environment variables (os.getenv)...")
        # *** Looks for an ENVIRONMENT VARIABLE named 'GEMINI_API_KEY' ***
        api_key = os.getenv('GEMINI_API_KEY')
        if not api_key:
            env_var_source = "environment variables" if not _in_colab else "Colab Secrets (import failed) or environment variables"
            raise ValueError(f"GEMINI_API_KEY not found in {env_var_source}.")
        else:
            print("Successfully retrieved API key from environment variables.")

    # Configure GenAI and Initialize Model (only if key was found)
    if api_key:
        print("Configuring Gemini with the retrieved API key...")
        genai.configure(api_key=api_key)

        # --- Using 1.5 flash model ---
        model_name = 'gemini-1.5-flash-latest'
        print(f"Initializing Gemini model '{model_name}'...")
        model = genai.GenerativeModel(model_name)
        print(f"Gemini configured and model '{model.model_name}' initialized successfully.")
    else:
        print("ERROR: API Key was not retrieved. Cannot configure Gemini or initialize model.")
        model = None # Ensure model is None if configuration failed

except Exception as e:
    print(f"ERROR: An error occurred during API key retrieval or Gemini configuration: {e}")
    model = None # Ensure model is None if any error occurs

# Final check
if model:
    print("Setup complete. The 'model' variable is ready.")
else:
    print("Setup failed. The 'model' variable is None. Check errors above.")

In [None]:
# Cell 4: Global Variables and Function Definitions (Corrected Scraper)

print("Defining global variables and functions...")

# --- Global Variables ---
DEFAULT_WATER_TIPS_URL = "https://thewaterproject.org/water_conservation_tips"
WATER_TIPS = [] # Global list to store scraped tips
TIP_FILENAME = "water_conservation_tip.txt"
AUDIO_FILENAME = "tip_generated_audio.mp3"
LAST_SCRAPED_URL = ""


# --- Function Definitions (Modified for Function Calling & Corrected Scraper) ---

def scrape_water_tips(url):
    """Scrapes water conservation tips from the given URL using a more specific selector."""
    global WATER_TIPS, LAST_SCRAPED_URL
    print(f"[Scraping] Attempting to scrape tips from: {url}")
    scraped_list = []
    try:
        response = requests.get(url, timeout=15)
        response.raise_for_status() # Check for HTTP errors
        soup = BeautifulSoup(response.content, 'html.parser')

        # --- CORRECTED SELECTOR ---
        # Strategy: Find a specific container known to hold the tips, then find the list items within it.
        # Based on inspection of thewaterproject.org (this might need updating if the site changes):
        # Look for a div that likely contains the main content list.
        # Example candidates: a div with specific classes like 'sqs-block-html', or an 'article' tag.
        # Let's try finding a common block type first.
        # Note: Selectors for other URLs (like the Princeton one) might be different!
        # This selector prioritizes the default URL structure.

        potential_containers = soup.find_all('div', class_='sqs-block-html') # Find all potential blocks
        tip_elements = []

        if not potential_containers:
             print(f"[Scraping] Warning: Could not find expected 'div.sqs-block-html' containers on {url}. Falling back to finding all 'li'. This might be inaccurate.")
             # Fallback (less reliable): Find all list items on the page
             tip_elements = soup.find_all('li')
        else:
            print(f"[Scraping] Found {len(potential_containers)} potential 'div.sqs-block-html' containers. Searching for lists within them.")
            found_list_items = False
            for container in potential_containers:
                # Look for an unordered list <ul> directly within the container
                list_tag = container.find('ul')
                if list_tag:
                    items = list_tag.find_all('li', recursive=False) # Find direct children 'li'
                    if items:
                         print(f"[Scraping] Found {len(items)} list items in a 'ul' within a container.")
                         tip_elements.extend(items)
                         found_list_items = True
                         # Optional: break here if you are sure the first list found is the correct one
                         # break

                # If no <ul> found, maybe tips are in <p> tags or direct <li> under the div? (Less likely)
                # You might need more sophisticated logic depending on the exact structure.

            if not found_list_items:
                 print(f"[Scraping] Warning: Found containers, but no 'ul > li' structure within them. Falling back to finding *all* 'li' on page (less accurate).")
                 tip_elements = soup.find_all('li') # Fallback if specific structure not found inside containers

        # --- End CORRECTED SELECTOR ---


        if not tip_elements:
             print(f"[Scraping] Warning: No list items ('li') found using any method on {url}.")
             return False # Indicate failure if no elements found

        # Extract text, ensuring it's not just whitespace or very short strings (like list numbers)
        scraped_list = [tip.get_text(strip=True) for tip in tip_elements if len(tip.get_text(strip=True)) > 5] # Filter short/empty strings

        if not scraped_list:
            print(f"[Scraping] Warning: Found list elements but they contained no usable text after filtering.")
            return False # Indicate failure

        WATER_TIPS = scraped_list # Assign to global variable ONLY if successful
        LAST_SCRAPED_URL = url
        print(f"[Scraping] Successfully scraped and filtered {len(WATER_TIPS)} potential tips from {url}.")
        # print(f"[Scraping] Sample: {WATER_TIPS[:3]}") # Optional: Print sample for verification
        return True # Indicate success

    except requests.exceptions.RequestException as e:
        print(f"[Scraping] Error fetching URL {url}: {e}")
        return False
    except Exception as e:
        import traceback
        print(f"[Scraping] Error during scraping/parsing for {url}: {e}")
        # print(traceback.format_exc()) # Uncomment for detailed parsing errors
        return False

# --- Rest of the functions remain the same as in your previous correct Cell 4 ---

def get_water_conservation_tip(url: str = None): # Allow None, handle default inside
    """
    Retrieves a random water conservation tip. Scrapes if needed.
    Designed for Gemini Function Calling. Returns a dictionary.
    """
    global WATER_TIPS, LAST_SCRAPED_URL, DEFAULT_WATER_TIPS_URL
    print(f"[Function Call] Attempting get_water_conservation_tip(url='{url}')")
    effective_url = url if url else DEFAULT_WATER_TIPS_URL
    print(f"[Function Call] Effective URL: {effective_url}")

    needs_scrape = False
    if not WATER_TIPS or effective_url != LAST_SCRAPED_URL:
        print(f"[Function Call] Tips empty or URL differs. Triggering scrape for '{effective_url}'.")
        needs_scrape = True

    if needs_scrape:
        # ---> ADD LOGGING HERE <---
        print(f"[Function Call] ---> Making HTTP GET request to external endpoint: {effective_url}")
        success = scrape_water_tips(effective_url)
        if not success:
             print("[Function Call Result] Scraping failed.")
             return {"tip": f"Sorry, could not retrieve tips from the source: {effective_url}.", "error": True}

    if WATER_TIPS:
        selected_tip = random.choice(WATER_TIPS)
        # Basic check to filter out obviously non-tip items (like short menu items)
        if len(selected_tip.split()) < 4: # If tip is less than 4 words, try again once
             print(f"[Function Call Result] Tip '{selected_tip}' seems too short. Trying again.")
             if len(WATER_TIPS) > 1: # Avoid infinite loop if only one short item exists
                 remaining_tips = [t for t in WATER_TIPS if t != selected_tip]
                 if remaining_tips:
                     selected_tip = random.choice(remaining_tips)
        print(f"[Function Call Result] Returning random tip: '{selected_tip}'")
        return {"tip": selected_tip, "error": False}
    else:
        print("[Function Call Result] No tips available even after check/scrape.")
        return {"tip": "Sorry, no water conservation tips are currently available.", "error": True}

def save_tip(tip: str):
    """
    Saves the provided water conservation tip to a text file.
    Designed for Gemini Function Calling. Returns a dictionary.
    """
    global TIP_FILENAME
    print(f"[Function Call] Attempting save_tip(tip='{tip[:50]}...')")
    if not tip or not isinstance(tip, str) or len(tip.strip()) == 0:
        print("[Function Call Result] Invalid or empty tip provided.")
        return {"status": "Failed: No valid tip provided to save.", "filename": TIP_FILENAME, "error": True}
    try:
        with open(TIP_FILENAME, "w", encoding='utf-8') as file: file.write(tip)
        msg = f"Tip successfully saved to {TIP_FILENAME}"
        print(f"[Function Call Result] {msg}")
        return {"status": msg, "filename": TIP_FILENAME, "error": False}
    except Exception as e:
        msg = f"Error saving tip to file '{TIP_FILENAME}': {e}"
        print(f"[Function Call Result] {msg}")
        return {"status": msg, "filename": TIP_FILENAME, "error": True}

def set_volume(level: int):
    """
    Sets the system volume using pactl (Linux specific).
    Designed for Gemini Function Calling. Returns a dictionary.
    """
    print(f"[Function Call] Attempting set_volume(level={level})")
    if not isinstance(level, int) or not 0 <= level <= 150:
         msg = "Error: Volume level must be an integer between 0 and 150."
         print(f"[Function Call Result] {msg}")
         return {"status": msg, "error": True}
    if os.name != 'posix':
        msg = "Warning: Volume control via 'pactl' might not work on this OS (non-Linux)."
        print(f"[Function Call Result] {msg}")
        return {"status": msg, "error": False}
    try:
        command = ["pactl", "set-sink-volume", "@DEFAULT_SINK@", f"{level}%"]
        print(f"Executing command: {' '.join(command)}")
        result = subprocess.run(command, capture_output=True, text=True, check=False, timeout=5)
        if result.returncode == 0:
            msg = f"Attempted to set volume to {level}% via pactl."
            print(f"[Function Call Result] {msg}")
            return {"status": msg, "level_set": level, "error": False}
        else:
            error_message = result.stderr.strip() or result.stdout.strip() or "Unknown pactl error"
            msg = f"Error setting volume via pactl (Code {result.returncode}): {error_message}"
            print(f"[Function Call Result] {msg}")
            return {"status": msg, "error": True}
    except Exception as e:
        msg = f"An unexpected error occurred setting volume: {e}"
        print(f"[Function Call Result] {msg}")
        return {"status": msg, "error": True}

def tell_tip():
    """
    Reads the water conservation tip from the file aloud using gTTS.
    Designed for Gemini Function Calling. Returns a dictionary.
    """
    global TIP_FILENAME, AUDIO_FILENAME
    print("[Function Call] Attempting tell_tip()")
    try:
        if not os.path.exists(TIP_FILENAME):
            msg = f"Error: Tip file '{TIP_FILENAME}' not found. Cannot read tip."
            print(f"[Function Call Result] {msg}")
            return {"status": msg, "error": True}
        with open(TIP_FILENAME, "r", encoding='utf-8') as file: tip = file.read().strip()
        if not tip:
             msg = f"Error: Tip file '{TIP_FILENAME}' is empty."
             print(f"[Function Call Result] {msg}")
             return {"status": msg, "error": True}
        if os.path.exists(AUDIO_FILENAME):
            try: os.remove(AUDIO_FILENAME)
            except OSError as e: print(f"Warning: Could not remove existing audio file {AUDIO_FILENAME}: {e}")
        print(f"Generating audio for tip: '{tip[:60]}...'")
        tts = gTTS(text=f"Here is the saved water conservation tip: {tip}", lang='en', slow=False)
        tts.save(AUDIO_FILENAME)
        print(f"Audio saved to {AUDIO_FILENAME}. Attempting to play...")
        display(Audio(AUDIO_FILENAME, autoplay=True))
        msg = f"Audio for the tip in '{TIP_FILENAME}' generated and playback started."
        print(f"[Function Call Result] {msg}")
        return {"status": msg, "audio_file": AUDIO_FILENAME, "error": False}
    except Exception as e:
        msg = f"Error during text-to-speech or file reading process: {e}"
        print(f"[Function Call Result] {msg}")
        return {"status": msg, "error": True}

print("Global variables and functions defined (scraper improved).")

In [None]:
# Cell 5: Define Tools for Gemini (REVISED - tell_tip parameters omitted)
print("\n--- Defining Tools for Gemini ---")

available_functions = {
    "get_water_conservation_tip": get_water_conservation_tip,
    "save_tip": save_tip,
    "set_volume": set_volume,
    "tell_tip": tell_tip,
}

function_declarations = [
    glm.FunctionDeclaration(
        name="get_water_conservation_tip",
        description="Retrieves a random water conservation tip, optionally from a specific URL.",
        parameters=glm.Schema( type=glm.Type.OBJECT, properties={
                'url': glm.Schema( type=glm.Type.STRING, description=f"Optional URL. Defaults to {DEFAULT_WATER_TIPS_URL}.") }
        )
    ),
    glm.FunctionDeclaration(
        name="save_tip",
        description=f"Saves a given text string (tip) to the file '{TIP_FILENAME}'.",
        parameters=glm.Schema( type=glm.Type.OBJECT, properties={
                'tip': glm.Schema( type=glm.Type.STRING, description="The tip text to save.") },
            required=['tip']
        )
    ),
     glm.FunctionDeclaration(
        name="set_volume",
        description="Sets system audio volume (Linux/pactl only).",
        parameters=glm.Schema( type=glm.Type.OBJECT, properties={
                'level': glm.Schema( type=glm.Type.INTEGER, description="Volume percentage (0-100).") },
            required=['level']
        )
    ),
     # --- FIX for tell_tip ---
     glm.FunctionDeclaration(
        name="tell_tip",
        description=f"Reads the tip from '{TIP_FILENAME}' aloud using TTS."
        # No 'parameters' field needed if the function takes no arguments from the model
    ),
]

tools_list = function_declarations
print("Function declarations created (tell_tip parameters omitted).")

In [None]:
# Cell 6: Function Calling Interaction Loop (MODIFIED send_message calls)
import google.ai.generativelanguage as glm # Optional: for type hints

print("\n--- Starting Function Calling Interaction ---")

# Check if the model is available before proceeding
if not model:
    print("ERROR: Gemini model ('model' variable) is not initialized. Cannot run Function Calling example. Check Cell 3.")
else:
    # Start a chat session
    chat = model.start_chat(history=[]) # You can manage history here if needed

    # --- Define the user's request ---
    # Try different prompts to test the function calling logic:
    # user_prompt = "Get me a water saving tip and tell it to me."
    # user_prompt = "Find a tip about saving water in the garden, save it, then tell me what you saved."
    user_prompt = "Please set volume to 70%, then get a water conservation tip from 'https://environment.princeton.edu/news/10-simple-ways-conserve-water', save the tip you find, and finally read the saved tip aloud."
    # user_prompt = "Fetch a random water conservation tip for me, save it to the standard file, and then announce the tip using text-to-speech."

    print(f"\nUser Prompt: '{user_prompt}'")

    # Send the initial prompt with tools enabled
    try:
        print("\n>> Sending initial prompt to Gemini with tools...")
        # *** Use tools_list instead of tools ***
        response = chat.send_message(user_prompt, tools=tools_list)

        # --- Loop to handle function calls ---
        while True:
            # Check if the response contains a function call request
            try:
                function_call = response.candidates[0].content.parts[0].function_call
                if not function_call: # Exit loop if no function call is present
                    break
            except (AttributeError, IndexError): # Handle cases where response structure is unexpected
                 print("<< No function call found in the response or response structure invalid.")
                 break

            # ** Step 1: Process the FunctionCall request **
            # Use glm type for better structure if available, otherwise fallback
            try:
                api_request = glm.FunctionCall(name=function_call.name, args=dict(function_call.args))
            except NameError: # Fallback if glm not imported or fails
                api_request = type('obj', (object,), {'name':function_call.name, 'args': dict(function_call.args)})()

            print(f"\n<< Gemini requested function call: {api_request.name}({api_request.args})")

            # ** Step 2: Look up and execute your Python function **
            function_name = api_request.name
            if function_name in available_functions:
                func_to_call = available_functions[function_name]
                args = api_request.args

                # Call the function safely
                try:
                    print(f"   Executing function: {function_name}...")
                    function_response_data = func_to_call(**args) # Unpack args
                    print(f"   Function returned: {function_response_data}")

                    # ** Step 3: Send the function's result back to the model **
                    print(f">> Sending function response back to Gemini...")
                    # Construct response part (handle potential type errors)
                    try:
                         function_response_part = glm.Part(function_response=glm.FunctionResponse(name=function_name, response=function_response_data))
                    except NameError: # Fallback if glm not imported or fails
                         function_response_part = genai.Part.from_function_response(name=function_name, response=function_response_data)

                    # *** Use tools_list instead of tools ***
                    response = chat.send_message(part=function_response_part, tools=tools_list)

                except Exception as e:
                    print(f"   ERROR executing function {function_name}: {e}")
                    print(f">> Sending error response back to Gemini...")
                    try:
                        error_part = glm.Part(function_response=glm.FunctionResponse(name=function_name, response={ "error": True, "status": f"Python Error executing {function_name}: {str(e)}" }))
                    except NameError:
                        error_part = genai.Part.from_function_response(name=function_name, response={ "error": True, "status": f"Python Error executing {function_name}: {str(e)}" })

                    # *** Use tools_list instead of tools ***
                    response = chat.send_message(part=error_part, tools=tools_list)
                    print("   Exiting loop due to function execution error.")
                    break # Exit loop on error

            else:
                print(f"   ERROR: Gemini requested unknown function: {function_name}")
                print("   Exiting loop due to unknown function request.")
                break # Exit loop

        # ** Step 4: Model returns the final text response **
        print("\n<< Gemini Final Response:")
        try:
           # Check if there's text content before trying to print
           if response.candidates and response.candidates[0].content and response.candidates[0].content.parts:
               final_text = "".join(part.text for part in response.candidates[0].content.parts if hasattr(part, 'text'))
               if final_text:
                   print(final_text)
               else:
                   print("[No text content in the final response parts]")
           else:
                print("[Final response structure invalid or empty]")
        except Exception as e:
             print(f"Warning: Could not extract text from final response. Error: {e}")
             # print("Full final candidate:", response.candidates[0]) # Uncomment for debugging

    except Exception as e:
        import traceback
        print(f"\nERROR: An unexpected error occurred during the chat interaction: {e}")
        print(traceback.format_exc()) # Print full traceback for debugging


# --- End of Interaction ---
print("\n--- Script Finished ---")

In [None]:
# Cell 7: Original Usage Examples (Manual Function Calls - Adapted)

print("\n--- Starting Original Usage Examples (Manual Calls) ---")

# Check if the model is available for Example 1's text generation part
if not model:
    print("WARNING: Gemini model ('model') not initialized. Skipping Example 1 (LLM Tip Generation).")
else:
    # --- Example 1: Generate a unique tip using the LLM (Text Generation Only) ---
    print("\n--- Example 1: Generating a unique tip via LLM Text Gen ---")
    prompt_llm = "generate a highly practical water conservation tip for bathroom use"
    try:
        print(f"Sending simple text generation prompt to Gemini: '{prompt_llm}'")
        # NOTE: This call does NOT use function calling tools
        response_text_gen = model.generate_content(prompt_llm)

        # Safely access generated text
        generated_tip_text = None
        if response_text_gen.candidates and response_text_gen.candidates[0].content and response_text_gen.candidates[0].content.parts:
             generated_tip_text = "".join(part.text for part in response_text_gen.candidates[0].content.parts if hasattr(part, 'text'))

        if generated_tip_text:
            print("\n--- LLM Generated Tip (Text Only) ---")
            print(generated_tip_text)
            print("---------------------------------------\n")

            # --- Example 1b: Manually save and speak the generated tip ---
            print("--- Manually saving and speaking the LLM tip ---")
            save_result = save_tip(generated_tip_text) # Call modified save_tip
            print(f"Save status: {save_result['status']}")

            if not save_result['error']: # Check error flag from save_tip's return dict
                speak_result = tell_tip() # Call modified tell_tip
                print(f"Speak status: {speak_result['status']}")
            else:
                 print("Skipping speaking because saving failed.")
        else:
            print("Could not generate a tip with the LLM via simple text generation.")
            try:
                 print(f"LLM Response Feedback: {response_text_gen.prompt_feedback}")
            except Exception: pass # Ignore if feedback isn't available

    except Exception as e:
        print(f"ERROR during Example 1 (LLM Text Gen): {e}")

# --- End of Examples ---
print("\n--- Original Usage Examples Finished ---")