<a href="https://colab.research.google.com/github/danfran/ai-agents/blob/main/data-pipeline/smart_crawler.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install langchain langgraph langchain-tavily langgraph-checkpoint-sqlite langchain-anthropic langchain[google-genai] pydantic

Collecting langgraph
  Downloading langgraph-0.6.6-py3-none-any.whl.metadata (6.8 kB)
Collecting langchain-tavily
  Downloading langchain_tavily-0.2.11-py3-none-any.whl.metadata (22 kB)
Collecting langgraph-checkpoint-sqlite
  Downloading langgraph_checkpoint_sqlite-2.0.11-py3-none-any.whl.metadata (2.6 kB)
Collecting langchain-anthropic
  Downloading langchain_anthropic-0.3.19-py3-none-any.whl.metadata (1.9 kB)
Collecting langgraph-checkpoint<3.0.0,>=2.1.0 (from langgraph)
  Downloading langgraph_checkpoint-2.1.1-py3-none-any.whl.metadata (4.2 kB)
Collecting langgraph-prebuilt<0.7.0,>=0.6.0 (from langgraph)
  Downloading langgraph_prebuilt-0.6.4-py3-none-any.whl.metadata (4.5 kB)
Collecting langgraph-sdk<0.3.0,>=0.2.2 (from langgraph)
  Downloading langgraph_sdk-0.2.2-py3-none-any.whl.metadata (1.5 kB)
Collecting aiosqlite>=0.20 (from langgraph-checkpoint-sqlite)
  Downloading aiosqlite-0.21.0-py3-none-any.whl.metadata (4.3 kB)
Collecting sqlite-vec>=0.1.6 (from langgraph-checkpoint-s

In [6]:
import os
import requests
import re
import urllib.parse
from datetime import datetime
import sqlite3

import pandas as pd # For convenience in database interaction verification
from bs4 import BeautifulSoup # Used directly in the tool, ensure it's imported

from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from typing import TypedDict, List, Dict, Union, Any, Annotated

# --- 0. Environment Setup ---
# Ensure GOOGLE_API_KEY is set. If running in Google Colab, use userdata.get().
# If running locally, set it as an environment variable (e.g., in .env file).
try:
    from google.colab import userdata
    os.environ['GOOGLE_API_KEY'] = userdata.get('GOOGLE_API_KEY')
    # os.environ["TAVILY_API_KEY"] = userdata.get('TAVILY_API_KEY') # Not strictly needed for this example, but common for agents
except ImportError:
    if not os.getenv('GOOGLE_API_KEY'):
        print("GOOGLE_API_KEY not found in environment variables. Please set it.")


# --- 1. Define LangGraph State ---
class AgentState(TypedDict):
    """
    Represents the state of the agent's process.

    Attributes:
        messages (List[Union[HumanMessage, AIMessage, ToolMessage]]): Conversation history including agent's thoughts and tool outputs.
        initial_webpage_url (str): The starting URL provided by the user.
        link_description (str): User's description of the content of the file to find on the page.
        webpage_content (str): The text content of the browsed webpage.
        extracted_links (List[Dict[str, str]]): List of extracted link text and hrefs.
        chosen_download_url (str | None): The URL identified by the agent for download.
        downloaded_filepath (str | None): The local path where the file was saved.
        status (str): Current status of the agent's process (e.g., "initiated", "browsed", "link_identified", "downloaded", "stored", "failed").
        errors (List[str]): List of error messages encountered.
    """
    messages: Annotated[list, add_messages]
    initial_webpage_url: str
    link_description: str
    webpage_content: str
    extracted_links: List[Dict[str, str]]
    chosen_download_url: str | None
    downloaded_filepath: str | None
    status: str
    errors: List[str]

# --- 2. Define Custom Tools ---

@tool
def fetch_webpage_content_and_links(url: str) -> str:
    """
    Fetches the content of a webpage and extracts all visible links,
    returning a formatted string for the LLM to process.
    """
    try:
        print(f"Fetching content from: {url}")
        response = requests.get(url, timeout=15)
        response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)

        soup = BeautifulSoup(response.text, 'html.parser')

        # Extract main text content (a simplified approach)
        page_text = soup.get_text(separator=' ', strip=True)
        # Limit to first N characters to avoid overwhelming LLM with too much content
        content_summary = page_text[:4000] + ("..." if len(page_text) > 4000 else "") # Increased content for better reasoning

        # Extract links
        links = []
        for i, a_tag in enumerate(soup.find_all('a', href=True)):
            href = a_tag['href']
            link_text = a_tag.get_text(strip=True)
            # Make sure href is absolute if it's relative
            if href.startswith('/'):
                href = urllib.parse.urljoin(url, href)
            elif not href.startswith(('http://', 'https://')):
                continue # Skip non-http/https relative links

            # Filter out very short or empty link text
            if link_text and len(link_text) > 2:
                links.append({"id": i, "text": link_text, "href": href})

        # Convert links list to a string for LLM consumption
        # Include an ID for each link to potentially refer back to
        links_str = "\n".join([f"Link ID: {link['id']}, Text: '{link['text']}', URL: '{link['href']}'" for link in links[:100]]) # Increased limit
        if len(links) > 100:
            links_str += f"\n... (and {len(links) - 100} more links)"

        return (f"Webpage content summary (first 4000 chars):\n{content_summary}\n\n"
                f"Extracted links ({len(links)} total, first 100 shown):\n{links_str}")

    except requests.exceptions.RequestException as e:
        return f"Error fetching webpage content from {url}: {e}"
    except Exception as e:
        return f"An unexpected error occurred while parsing webpage: {e}"

@tool
def download_file(url: str, filename: str | None = None) -> str:
    """
    Downloads a file from a given URL and saves it locally.
    Attempts to infer filename from URL or Content-Disposition header.
    Args:
        url (str): The URL of the file to download. This URL MUST be the exact, full URL
                   identified by the agent from the webpage.
        filename (str | None): Optional. The name to save the file as.
                                If None, attempts to infer from URL or headers.
    Returns:
        str: A message indicating success or failure of the download.
    """
    try:
        print(f"Attempting to download file from: {url}")
        response = requests.get(url, stream=True, timeout=30)
        response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)

        # Determine the filename to save the file
        if filename is None:
            # Try to get filename from Content-Disposition header
            if "Content-Disposition" in response.headers:
                cd = response.headers['Content-Disposition']
                fname_match = re.findall(r'filename\*?=([^;]+)', cd)
                if fname_match:
                    filename_header = fname_match[0].strip().replace('"', '')
                    if filename_header.lower().startswith("utf-8''"):
                        filename = urllib.parse.unquote(filename_header[7:])
                    else:
                        filename = filename_header

            # If still no filename, try from URL path
            if filename is None:
                parsed_url = urllib.parse.urlparse(url)
                path_segments = parsed_url.path.split('/')
                potential_filename = path_segments[-1] if path_segments[-1] else "downloaded_file"
                filename = potential_filename.split('?')[0].split('#')[0]
                if not filename:
                    filename = "downloaded_file"

        # Add a generic extension if none found/inferred and Content-Type isn't clear
        if '.' not in filename and 'Content-Type' in response.headers:
            content_type = response.headers['Content-Type'].split(';')[0]
            if 'image/jpeg' in content_type: filename += '.jpg'
            elif 'image/png' in content_type: filename += '.png'
            elif 'application/pdf' in content_type: filename += '.pdf'
            elif 'text/csv' in content_type: filename += '.csv'
            elif 'text/plain' in content_type: filename += '.txt'
            # Add more types as needed

        if '.' not in filename: # Final fallback if no extension
            filename += '.bin'

        print(f"Saving file as: {filename}")
        with open(filename, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        return f"Successfully downloaded file to '{filename}'."
    except requests.exceptions.RequestException as e:
        return f"Error downloading file from {url}: {e}"
    except Exception as e:
        return f"An unexpected error occurred during download: {e}"

@tool
def save_download_metadata(url_discovered: str, filename: str) -> str:
    """
    Stores metadata about a downloaded file (timestamp, URL, filename) into an SQLite database.
    """
    db_path = 'download_logs.db'
    conn = None
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()

        cursor.execute('''
            CREATE TABLE IF NOT EXISTS downloads (
                timestamp TEXT,
                url_discovered TEXT,
                filename TEXT
            )
        ''')

        current_timestamp = datetime.now().isoformat()
        cursor.execute("INSERT INTO downloads (timestamp, url_discovered, filename) VALUES (?, ?, ?)",
                       (current_timestamp, url_discovered, filename))
        conn.commit()
        return f"Metadata saved to '{db_path}': URL='{url_discovered}', File='{filename}'."
    except Exception as e:
        return f"Error saving metadata to DB: {e}"
    finally:
        if conn:
            conn.close()

# --- 3. Initialize LLM and Agent Executor ---
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash") # Use a model capable of tool calling

# All tools available to the agent
tools = [fetch_webpage_content_and_links, download_file, save_download_metadata]

# Define the agent's prompt
# The prompt is critical for guiding the LLM's reasoning to identify the correct URL.
agent_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """You are a highly intelligent and meticulous web crawling agent.
            Your primary task is to find and download a specific file from a given webpage
            based on a detailed description of the file's content. After downloading, you
            must log the action to a database.

            **Here's your step-by-step reasoning process:**

            1.  **Understand the Request:** Carefully read the user's initial request.
                You will be given an `initial_webpage_url` to visit and a `link_description`
                that describes the *content* of the file you need to find.
            2.  **Browse the Page:** Use the `fetch_webpage_content_and_links` tool
                with the `initial_webpage_url`. Pay close attention to the returned
                `Webpage content summary` and the `Extracted links` (their text and URLs).
            3.  **Identify the Target URL:** Based on the `link_description` provided by the user,
                analyze the `Extracted links` from the webpage. Your goal is to find the
                **exact URL** (`href`) that most accurately matches the description of the file's content.
                Consider both the link's visible text and the overall webpage content summary
                to determine relevance. If a link's text directly contains keywords from the description,
                that's a strong candidate. Look for context clues.
                **Crucially, you must select the *full, absolute URL* of the file.**
            4.  **Download the File:** Once you have confidently identified the correct URL,
                call the `download_file` tool. Pass the identified URL to its `url` argument.
                You can optionally suggest a `filename` for the downloaded file that
                reflects its content (e.g., "HM_Land_Registry_Price_Paid_Data.csv").
            5.  **Log the Download:** After receiving confirmation from the `download_file` tool
                that the download was successful, use the `save_download_metadata` tool.
                Provide the `url_discovered` (the one you just downloaded) and the `filename`
                that the `download_file` tool reported.
            6.  **Summarize and Confirm:** Finally, provide a concise and clear summary
                of your actions and the outcome to the user, confirming whether the file was found, downloaded,
                and logged successfully. If you couldn't find the link or encountered any errors,
                explain what happened.
            """,
        ),
        # CORRECTED: Explicitly define placeholders for chat_history, input, and agent_scratchpad
        MessagesPlaceholder(variable_name="chat_history"),
        ("human", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ]
)

# Create the LangChain tool-calling agent
agent = create_tool_calling_agent(llm, tools, agent_prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)


# --- 4. Define LangGraph Nodes ---
# Each node is a step in our graph.

def call_agent_executor(state: AgentState) -> AgentState:
    """
    Calls the LangChain AgentExecutor to run the main logic.
    The agent will use its tools based on the current state.
    """
    print("\n--- Node: Calling Agent Executor ---")
    new_state = state.copy()
    try:
        # Pass the current 'input' and 'chat_history' explicitly to the agent executor.
        # The agent_executor internally manages 'agent_scratchpad' based on its turns.
        result = agent_executor.invoke({
            "input": new_state['messages'][-1].content, # Last message is the current user input
            "chat_history": new_state['messages'][:-1] # All messages before the last one form chat history
        })

        # Capture the entire message history from the agent_executor's run
        # For simplicity, we'll append the last AI message from the agent's output.
        if result and "output" in result:
             # The result['output'] is the agent's final answer string.
             # We add it as an AIMessage to the state's messages.
             new_state['messages'].append(AIMessage(content=result['output']))
             new_state['status'] = "agent_completed"
             print("Agent Executor completed its turn.")
        else:
             new_state['errors'].append("Agent Executor returned no output.")
             new_state['status'] = "failed"

    except Exception as e:
        error_msg = f"Error during agent execution: {e}"
        print(error_msg)
        new_state['errors'].append(error_msg)
        new_state['status'] = "failed"

    return new_state


# --- 5. Define LangGraph Workflow ---
workflow = StateGraph(AgentState)

# Add the single main node which runs the agent executor
workflow.add_node("agent_node", call_agent_executor)

# Set the entry point to the agent node
workflow.set_entry_point("agent_node")

# The agent node decides when it's done, which for this example means the workflow ends.
# In a more complex scenario, the agent might decide to loop or transition to another
# specific non-agent node for post-processing.
workflow.add_edge("agent_node", END)


# Compile the workflow
app = workflow.compile()

# --- 6. Example Usage ---

if __name__ == "__main__":
    # For a real public agent, you'd use a live URL.
    # For this local example, we'll use a publicly accessible HTML page
    # from the UK government that *might* contain a link relevant to the description.
    # Note: Finding an exact match for the complex description on a random page
    # is challenging without very precise prompt engineering and a real dataset.
    # This URL is illustrative. A real scenario might require more specific scraping.

    # Example URL for HM Land Registry data (a common public data source in the UK)
    # This is a sample URL, the agent's ability to find the exact file depends heavily
    # on the real content of this page and the LLM's reasoning capabilities.
    initial_page_url = "https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads"

    # User's highly descriptive request for the file content
    link_description = "the file that includes standard and additional price paid data transactions received at HM Land Registry from 1 January 1995 to the most current monthly data."

    # User's initial input to kick off the process
    user_query = (
        f"I need to find a specific dataset. "
        f"Please go to the webpage: {initial_page_url}. "
        f"On that page, identify and download {link_description}. "
        "After downloading, please log the download information into the database."
    )

    initial_state: AgentState = {
        "messages": [HumanMessage(content=user_query)],
        "initial_webpage_url": initial_page_url,
        "link_description": link_description,
        "webpage_content": "", # This will be populated by the tool
        "extracted_links": [], # This will be populated by the tool
        "chosen_download_url": None,
        "downloaded_filepath": None,
        "status": "initiated",
        "errors": []
    }

    print("\n--- Starting Web ETL Agent Process ---")
    final_state = app.invoke(initial_state)

    print("\n--- Web ETL Agent Process Finished ---")
    print(f"Final Status: {final_state['status']}")
    if final_state['errors']:
        print("Errors encountered:")
        for error in final_state['errors']:
            print(f"- {error}")

    # Verify database contents
    db_path = 'download_logs.db'
    if os.path.exists(db_path):
        print(f"\n--- Verifying Database Contents from '{db_path}' ---")
        conn = None
        try:
            conn = sqlite3.connect(db_path)
            df_logs = pd.read_sql_query("SELECT * FROM downloads", conn)
            print(df_logs)
        except Exception as e:
            print(f"Error reading database: {e}")
        finally:
            if conn:
                conn.close()

    # The dummy HTML file is not needed if using a live public URL
    # if os.path.exists("dummy_webpage.html"):
    #     os.remove("dummy_webpage.html")
    #     print(f"\nCleaned up 'dummy_webpage.html'.")




--- Starting Web ETL Agent Process ---

--- Node: Calling Agent Executor ---


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `fetch_webpage_content_and_links` with `{'url': 'https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads'}`


[0mFetching content from: https://www.gov.uk/government/statistical-data-sets/price-paid-data-downloads
[36;1m[1;3mWebpage content summary (first 4000 chars):
Price Paid Data - GOV.UK Cookies on GOV.UK We use some essential cookies to make this website work. We’d like to set additional cookies to understand how you use GOV.UK, remember your settings and improve government services. We also use cookies set by other sites to help us deliver content from their services. You have accepted additional cookies. You can change your cookie settings at any time. You have rejected additional cookies. You can change your cookie settings at any time. Accept additional cookies Reject additional cookies View cookies Hide