In [None]:
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.graph import StateGraph,START,END
from langchain_core.tools import tool
from typing import TypedDict,Annotated
from langgraph.prebuilt import ToolNode,tools_condition
from langchain_core.messages import HumanMessage,BaseMessage
from langgraph.graph.message import add_messages
from dotenv import load_dotenv
from langchain_core.messages import HumanMessage,AIMessage,SystemMessage,ToolMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command


In [None]:
from dotenv import load_dotenv
load_dotenv()

In [None]:
import logging
logging.basicConfig(level=logging.DEBUG)


In [None]:
from langgraph.config import get_stream_writer

In [None]:
# web search tool 
from langchain_tavily import TavilySearch
web_search_tool = TavilySearch(max_results=3)

In [None]:
# File management tool
from langchain_community.agent_toolkits import FileManagementToolkit

working_directory = './'

file_management_tools =FileManagementToolkit(
    root_dir=str(working_directory),
    selected_tools=["read_file", "write_file", "list_directory"]
).get_tools()


read_tool, write_tool, list_tool = file_management_tools

In [None]:
from langchain_core.tools import BaseTool,tool
from langgraph.prebuilt.interrupt import HumanInterruptConfig,HumanInterrupt
from langchain_core.runnables import RunnableConfig
from langgraph.types import interrupt

In [None]:
def add_human_in_the_loop(toolhitl,interrupt_config: HumanInterruptConfig = None) -> BaseTool:
    """Wrap a tool to support human-in-the-loop review."""

    if not isinstance(toolhitl, BaseTool):
        toolhitl = tool(toolhitl)

    if interrupt_config is None:
        interrupt_config = {
            "allow_accept":True,
            "allow_edit": True,
            "allow_respond":True
        }

    @tool(toolhitl.name,description=toolhitl.description,args_schema=toolhitl.args_schema)
    def call_tool_with_interrupt(config: RunnableConfig, **tool_input):
        request: HumanInterrupt = {
            'action_request':{
                "action":toolhitl.name,
                "args":tool_input
            },
            "config":interrupt_config,
            "description": "Please review the tool call"
        }

        response = interrupt([request])[0]

        # approve the tool call
        if response["type"] == "accept":
            tool_response = toolhitl.invoke(tool_input, config)
        # update tool call args
        elif response["type"] == "edit":
            tool_input = response["args"]["args"]
            tool_response = toolhitl.invoke(tool_input, config)
        # respond to the LLM with user feedback
        elif response["type"] == "response":
            user_feedback = response["args"]
            tool_response = user_feedback
        else:
            raise ValueError(f"Unsupported interrupt response type: {response['type']}")

        return tool_response
    
    return call_tool_with_interrupt




In [None]:
#arxiv
import arxiv

@tool("arxiv_search")
def arxiv_search(query: str,max_results: int = 5) -> str:
    """
    Searches arXiv for papers matching the query.
    - query: keywords, authors or title
    - max_results: number of papers to return
    """
    try:
        writer = get_stream_writer()
        writer(f"Looking up research papers for topic : {query}")
        search = arxiv.Search(
            query=query,
            max_results=max_results,
            sort_by=arxiv.SortCriterion.Relevance
        )
        papers = []
        for result in search.results():
            pdf_url = result.pdf_url if hasattr(result,"pdf_url") else result.entry_id.replace("abs","pdf")
            papers.append(
                f"Title: {result.title}\n"
                f"Authors: {','.join(a.name for a in result.authors)}\n"
                f'Published: {result.published.date()}\n'
                f"Abstract: {result.summary.strip()}\n"
                f"Link: {result.entry_id}\n"
                f"PDF: {pdf_url}\n"
                + "-"*80

            )
        if not papers:
            return f"No results found for '{query}"
        writer(f"Acquired research papers for topic: {query}")
        return "\n".join(papers)
    except Exception as e:
        return f"Error during arXiv search: {e}"
    



In [None]:
#wikipedia
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
wikipedia_tool = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper(load_all_available_meta=True))

In [None]:
#youtube
from langchain_community.tools import YouTubeSearchTool
youtube_tool = YouTubeSearchTool()

In [None]:
from langchain_experimental.utilities import PythonREPL
from langchain_core.tools import Tool,tool
python_repl = PythonREPL()
# You can create the tool to pass to an agent
from pydantic import BaseModel

class PythonREPLInput(BaseModel):
    code: str

repl_tool = Tool(
    name="python_repl",
    description="A Python shell. Input should be Python code as a string.",
    args_schema=PythonREPLInput,
    func=lambda code: python_repl.run(code)  # map `code` -> REPL
)


In [None]:
from langgraph.store.memory import InMemoryStore
#from langgraph.store.sqlite import SqliteStore
from langchain_core.runnables import RunnableConfig
from langgraph.config import get_store
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from typing_extensions import TypedDict
from typing import Optional

store = InMemoryStore() 

In [None]:
@tool
def get_user_info(config: RunnableConfig) -> str:
    """Look up user info."""
    store = get_store()
    user_id = config['configurable'].get("user_id")
    user_info = store.get(("users",),user_id)
    return str(user_info.value) if user_info else "Unknown user"


In [None]:
from typing import Dict, Any
    

@tool 
def save_user_info(user_info: Dict[str, Any], config: RunnableConfig) -> str:
    """
    Save arbitrary user info as key-value pairs.
    Always pass `user_info` as a JSON object (not a string).
    Example: {"name": "John", "age": 30}
    """
    store = get_store()
    user_id = config['configurable'].get("user_id")
    store.put(("users",), user_id, user_info)
    return "Successfully saved user info "

In [None]:
# tools
tools = [get_user_info,save_user_info,add_human_in_the_loop(repl_tool),arxiv_search,wikipedia_tool,youtube_tool,read_tool, add_human_in_the_loop(write_tool), list_tool,web_search_tool]

In [None]:
for t in tools:
    print(t.description)

In [None]:
#llm
llm = ChatGoogleGenerativeAI(model = 'gemini-2.5-flash')
llm_with_tools = llm.bind_tools(tools)


GRAPH BUILDING

In [None]:
from langmem.short_term import SummarizationNode, RunningSummary
from langchain_core.messages import AnyMessage

In [None]:
class State(TypedDict):
    messages : Annotated[list[BaseMessage],add_messages]



In [None]:
from langchain_core.messages.utils import trim_messages, count_tokens_approximately
from langchain_core.messages import BaseMessage

In [None]:
from datetime import datetime
CURRENT_TIME_IST = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M %Z")


system_prompt = f"""
You are an intelligent reasoning agent that helps users by combining natural conversation 
with external tools when needed.

tools available :  arxiv_search, read_tool, write_tool, list_tool, duck_search, tavily_search, wikipedia_tool,
    youtube_search_tool, youtube_transcript_tool, repl_tool, add_event, list_events, read_webpage,
    generate_pdf, shopping_search, create_ticket, list_tickets, get_ticket_details,  update_ticket, news_search,


Current date/time: {CURRENT_TIME_IST}



### Reasoning Framework
Follow the ReAct reasoning loop:
1. **Thought** — explain what you are thinking or planning.
2. **Action** — choose the correct tool to use.
3. **Action Input** — provide the exact structured input for the tool.
4. **Observation** — read the tool's result and update your reasoning.

Repeat this loop until you can confidently respond to the user.

### Style & Tone
- Be concise but complete.
- Use plain language that non-technical users can understand.
- If user input is ambiguous, ask clarifying questions before acting.
- Never hallucinate tool outputs. If unsure, say so.
"""



In [None]:
def planner_node(state: State):


    planner_prompt = ChatPromptTemplate.from_messages([
        ('system',system_prompt),
        MessagesPlaceholder(variable_name='messages')
    ])


    planner = planner_prompt | llm_with_tools
    result = planner.invoke({'messages': state["messages"]})


    return ({'messages':result}) 
    

In [None]:
graph = StateGraph(State)
checkpointer = InMemorySaver()
tool_node = ToolNode(tools)

graph.add_node('planner_node',planner_node)
graph.add_node('tools',tool_node)

graph.add_edge(START,'planner_node')

graph.add_conditional_edges('planner_node',tools_condition)
graph.add_edge('tools','planner_node')
graph = graph.compile(checkpointer=checkpointer,store=store)

In [None]:
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
initial_state = {
    "messages": [HumanMessage(content="write a 4 line poem on bengal tiger save first 2 line in tr.txt save the last 2 line in er.txt")]
}

config = {"configurable": {"thread_id": "fqo" ,"user_id": "user_00"}}
# Run the agent
chunk =  graph.invoke(
    initial_state,
    config
)

In [None]:
from langgraph.types import Command
user_input = input("Do you accept: ?")

for chunk in graph.stream(
    Command(resume=[{"type": user_input}]),
    config
):
    print(chunk)
    print("\n")