In [None]:
%pip install langchain_ollama langgraph psycopg2 langchain_core psutil
%pip install -qU duckduckgo-search langchain-community
from langchain_core.messages import (
    AIMessage,
    HumanMessage,
    SystemMessage,
    ToolMessage,
    trim_messages,
)
from langchain_core.messages.utils import count_tokens_approximately
from langchain_core.tools import tool
from langchain_ollama.chat_models import ChatOllama
from bs4 import BeautifulSoup
from bs4.element import Comment
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import tools_condition, ToolNode
import urllib.request
import datetime
import psycopg2
import psutil
from langchain_community.tools import DuckDuckGoSearchRun
from langgraph.graph import MessagesState


def tag_visible(element):
    if element.parent.name in ['style', 'script', 'head', 'title', 'meta', '[document]']:
        return False
    if isinstance(element, Comment):
        return False
    return True

llm = ChatOllama(model="qwen3:1.7b")
tokenlen = 30000
max_length = 12000
# recursion limit
recurse = 50

def smart_truncate(text: str) -> str:
    """Intelligently truncate text at natural boundaries"""
    if len(text) <= max_length:
        return text
    
    # Try to cut at paragraph breaks first
    truncated = text[:max_length]
    
    # Find last paragraph break
    last_para = truncated.rfind('\n\n')
    if last_para > max_length * 0.7:  # If we don't lose too much
        return truncated[:last_para] + "\n\n[Content truncated at paragraph break...]"
    
    # Fallback to sentence breaks
    last_sentence = truncated.rfind('. ')
    if last_sentence > max_length * 0.8:
        return truncated[:last_sentence + 1] + "\n\n[Content truncated...]"
    
    # Hard truncate as last resort
    return truncated + "\n\n[Content truncated...]"

@tool
def scrape(url:str)->str:
    """
    Scrape visible text content from a website URL.
    
    Args:
        url (str): The URL to scrape. Must be a valid HTTP/HTTPS URL.
        
    Returns:
        str: The visible text content wrapped in <text> tags, or error message.
             the text will be truncated if it is too long.
        
    Example:
        >>> scrape("https://example.com")
        "<text>Example Domain This domain is for use in illustrative examples...</text>"
    
    Note:
        - Only scrapes visible text, not HTML markup
        - Has basic error handling for network issues
        - Returns content wrapped in XML-style tags for parsing
    """
    try:
        if not url.startswith(('http://', 'https://')):
            url = 'https://' + url
        
        with urllib.request.urlopen(url, timeout=30) as response:
            html = response.read()
            
        # Extract title and text
        soup = BeautifulSoup(html, 'html.parser')
        title = soup.title
        textfromhtml = soup.get_text()
        
        # Truncate text if too long
        visible_text = smart_truncate(textfromhtml)
            
        return f"""
            <webpage>
                <url>{url}</url>
                <title>{title or "No title found"}</title>
                <content>
                    {visible_text.join().strip()}
                </content>
            </webpage>
            """
        
    except urllib.error.URLError as e:
        return f"URL error occurred: {e}"
    except urllib.error.HTTPError as e:
        return f"HTTP error occurred: {e.code} - {e.reason}"
    except Exception as e:
        return f"An unexpected error occurred: {e}"


@tool
def get_cpu_usage() -> str:
    """
    Get current CPU utilization percentage.
    
    Returns:
        str: CPU usage as a percentage string (e.g., "25.4").
        
    Example:
        >>> get_cpu_usage()
        "15.2"
        
    Note:
        - Returns instantaneous CPU usage (no interval averaging)
        - Single aggregate value across all CPU cores
        - May return "0.0" on very fast calls due to no interval measurement
    """
    try:
        cpu_percent = psutil.cpu_percent(interval=None, percpu=False)
        return str(cpu_percent)
    except psutil.Error as e:
        return f"System monitoring error occurred: {e}"
    except Exception as e:
        return f"An unexpected error occurred: {e}"

@tool
def exec_query(query: str) -> str:
    """
    Execute a SQL query against the PostgreSQL database.
    
    Args:
        query (str): The SQL query to execute. Should be properly formatted SQL.
        
    Returns:
        str: Success message or error description.
        
    Example:
        >>> exec_query("INSERT INTO users (name) VALUES ('Alice')")
        "Query executed successfully"
        
    Warning:
        - Ensure database connection is established first
        - Use commit() to persist changes for INSERT/UPDATE/DELETE operations
        - Be cautious with destructive operations
    """
    global cursor
    
    if cursor is None:
        return "Error: No database connection established. Use connect() first."
        
    try:
        cursor.execute(query)
        return "Query executed successfully"
    except psycopg2.Error as e:
        return f"Database error occurred: {e}"
    except Exception as e:
        return f"An unexpected error occurred: {e}"


@tool
def execmany(query:str, varslist:list):
    """
    Executes many queries against an SQL database, with the variables supplied (wraps cursor.execmany from psycopg)
    
    Args:
        - query (str): should be properly formatted sql
        - varslist (list): a list of variables
        
    Returns: 
        - a success message or an error description
        
    Warning:
        - Ensure database connection is established first
        - Use commit() to persist changes for INSERT/UPDATE/DELETE operations
        - Be cautious with destructive operations
        
    """
    try:
        cursor.executemany(query, varslist)
        return "Queries executed succesfully"
    except Exception as e:
        return "An error occured: "+str(e)

@tool
def fetchall()->list:
    """this runs cursor.fetchall against your sql database"""
    try:
        return cursor.fetchall()
    except Exception as e:
        return "An error occured: "+str(e)

@tool
def fetch1()->str:
    """this runs cursor.fetch1 against your sql database"""
    try:
        return cursor.fetch1()
    except Exception as e:
        return "An error occured: "+str(e)

@tool
def reconnect():
    """disconnect and reconnect to sql database"""
    try:
        close()
        connect()
    except Exception as e:
        return "An error occured: "+str(e)
    
@tool
def close():
    """close sql connection"""
    try:
        cursor.close()
        connection.close()
    except Exception as e:
        return "An error occured: "+str(e)
        
@tool
def connect():
    """open sql connection"""
    global cursor

    try:
        connection = psycopg2.connect("dbname = 'mnemnosyne' user='postgres' password='kitten samsara kaboodle nirvana' host='localhost' port='5432'")
        cursor = connection.cursor()
    except Exception as e:
        return "An error occured: "+str(e)
        
@tool
def commit():
    """commit changes to sql db"""
    try:
        connection.commit()
    except Exception as e:
        return "An error occured: "+str(e)

# Increase the recursion limit in the configuration
config = {
    "recursion_limit": recurse  # Increase the limit as needed
}
    
@tool
def selfprompt(msg:str):
    """generate a custom system message to yourself, may let you change your behavior, use judiciously."""
    messages.append(SystemMessage(role="system", content=msg))
    llm_with_tools.invoke(state["messages"], config=config)

class ToDo:
    def __init__(self, item:str):
        content = item
        status = PENDING
        
    
@tool
def adjust_token_length(msg:str):
    """adjust own token length. use with caution"""
    try:
        tokenlen = int(str)
    except Exception as e:
        return "An error occured: "+str(e)


search = DuckDuckGoSearchRun()

tools = [exec_query, execmany, fetchall, fetch1, search, get_cpu_usage, reconnect, close, commit, connect, selfprompt, adjust_token_length, scrape]
llm_with_tools = llm.bind_tools(tools)

sys_msg = SystemMessage("You are a helpful assistant. Your name is Charlotte. Please be systematic, thorough, and concise.")

def assistant(state: MessagesState):
    return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"], config=config)]}

def trim():
    trim_messages(
    messages,
    # Keep the last <= n_count tokens of the messages.
    strategy="last",
    # Remember to adjust based on your model
    # or else pass a custom token_counter
    token_counter=count_tokens_approximately,
    # Most chat models expect that chat history starts with either:
    # (1) a HumanMessage or
    # (2) a SystemMessage followed by a HumanMessage
    # Remember to adjust based on the desired conversation
    # length
    max_tokens=tokenlen,
    # Most chat models expect that chat history starts with either:
    # (1) a HumanMessage or
    # (2) a SystemMessage followed by a HumanMessage
    start_on=("system"),
    # Most chat models expect that chat history ends with either:
    # (1) a HumanMessage or
    # (2) a ToolMessage
    end_on=("human", "tool"),
    # Usually, we want to keep the SystemMessage
    # if it's present in the original history.
    # The SystemMessage has special instructions for the model.
    include_system=True,
    allow_partial=False,
    )

builder = StateGraph(MessagesState)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "assistant")
builder.add_conditional_edges("assistant", tools_condition)
builder.add_edge("tools", "assistant")
graph = builder.compile()

messages = []

result = graph.invoke({"messages": messages}, config=config)

for msg in result["messages"]:
    msg.pretty_print()

trim()

while True:
    try:
        user_input = input("User: ")
        messages.append(HumanMessage(content=user_input, additional_kwargs={}, response_metadata={}))
        result = graph.invoke({"messages": messages}, config=config)
        for msg in result["messages"]:
            msg.pretty_print()
        if user_input.lower() in ["quit", "exit", "q"]:
            print("Goodbye!")
            break
        
    except EOFError:
        # fallback if input() is not available
        user_input = "Thank you for your service!"
        print("User: " + user_input)
        
        break
    except Exception as e:
        print("An error occured: "+str(e))