In [None]:
!pip install langchain openai


In [None]:
!pip install langchain-experimental


In [None]:
!pip install langchain openai langchain-experimental


In [None]:
!pip install -U langchain langgraph google-api-python-client langchain[google-genai]

In [None]:
import os

def _set_env(key):
    value = os.environ.get(key)
    if value is None:
        value = input(f"Please enter your {key}: ")
        os.environ[key] = value

# Set keys
_set_env("GOOGLE_API_KEY")   # From Google Cloud Console
_set_env("GOOGLE_CSE_ID")    # From Programmable Search Engine (CSE)

In [None]:
from langchain.tools import Tool
from googleapiclient.discovery import build

def google_search(query: str, max_results: int = 3):
    service = build("customsearch", "v1", developerKey=os.environ["GOOGLE_API_KEY"])
    res = service.cse().list(q=query, cx=os.environ["GOOGLE_CSE_ID"], num=max_results).execute()
    results = res.get("items", [])
    return "\n".join([f"{item['title']}: {item['link']}" for item in results])

# Wrap as a LangChain tool
google_tool = Tool(
    name="GoogleSearch",
    func=google_search,
    description="Use this tool to search the web using Google Search"
)

tools = [google_tool]

In [None]:
!pip install langchain openai google-auth google-auth-oauthlib google-auth-httplib2 google-api-python-client


In [None]:
import os
import openai
from langchain.agents import Tool
from langchain.agents.agent_toolkits import create_retriever_tool
from langchain.agents.agent_types import AgentType
from langchain.chains.llm_math.base import LLMMathChain


In [None]:
import langchain.tools
dir(langchain.tools)


In [None]:
import os
import openai
from langchain.agents import create_openai_functions_agent
from langchain.agents import AgentExecutor
from langchain.agents import Tool
from google.auth.transport.requests import Request
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build


In [None]:
from google.oauth2 import service_account
from googleapiclient.discovery import build
import datetime

# Path to your service account key file
SERVICE_ACCOUNT_FILE = '/content/credentials.json'

# Scopes required for Google Calendar
SCOPES = ['https://www.googleapis.com/auth/calendar.readonly']

# Authenticate
credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_FILE, scopes=SCOPES)

# Build the service
service = build('calendar', 'v3', credentials=credentials)

# Get upcoming events
now = datetime.datetime.utcnow().isoformat() + 'Z'
events_result = service.events().list(
    calendarId='primary', timeMin=now,
    maxResults=5, singleEvents=True,
    orderBy='startTime').execute()
events = events_result.get('items', [])

for event in events:
    start = event['start'].get('dateTime', event['start'].get('date'))
    print(start, event['summary'])


Wrap Calendar code in a Function

In [None]:
def get_upcoming_events():
    from google.oauth2 import service_account
    from googleapiclient.discovery import build
    import datetime

    SERVICE_ACCOUNT_FILE = '/content/inbound-credentials.json'
    SCOPES = ['https://www.googleapis.com/auth/calendar.readonly']

    credentials = service_account.Credentials.from_service_account_file(
        SERVICE_ACCOUNT_FILE, scopes=SCOPES)

    service = build('calendar', 'v3', credentials=credentials)

    now = datetime.datetime.utcnow().isoformat() + 'Z'
    events_result = service.events().list(
        calendarId='primary', timeMin=now,
        maxResults=5, singleEvents=True,
        orderBy='startTime').execute()
    events = events_result.get('items', [])

    if not events:
        return "No upcoming events found."

    response = ""
    for event in events:
        start = event['start'].get('dateTime', event['start'].get('date'))
        summary = event.get('summary', 'No title')
        response += f"{start}: {summary}\n"
    return response.strip()


Turning a function of calendar into langchain tool

In [None]:
from langchain.agents import Tool

calendar_tool = Tool(
    name="Google Calendar",
    func=lambda _: get_upcoming_events(),
    description="Use this to check upcoming calendar events."
)


Add Tool to the Agent


In [None]:
!pip install -U langchain langchain-community google-auth google-api-python-client

import os
from langchain.agents import initialize_agent, AgentType
from langchain.tools import Tool
from langchain_google_genai import ChatGoogleGenerativeAI

# Optional: Set API key environment variables (if not set elsewhere)
os.environ["GOOGLE_API_KEY"] = "your api key here"  # Replace with actual key

# 1. Initialize the Gemini model
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)

# 2. Define a dummy calendar tool for testing (replace with real implementation)
def dummy_calendar_tool(query: str) -> str:
    return "You have a meeting today at 3 PM."

calendar_tool = Tool(
    name="GoogleCalendarTool",
    func=dummy_calendar_tool,
    description="Useful for checking calendar events."
)

# 3. Initialize the agent
agent = initialize_agent(
    tools=[calendar_tool],
    llm=llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

# 4. Run agent with a user prompt
response = agent.run("Do I have any meetings today?")
print("\nFinal Response:\n", response)


In [None]:
from langchain.chat_models import init_chat_model

# If not already set
os.environ["GOOGLE_API_KEY"] = os.environ.get("GOOGLE_API_KEY")

# Initialize Gemini model
llm = init_chat_model("google_genai:gemini-2.0-flash")
llm_with_tools = llm.bind_tools(tools)

In [None]:
from langchain.chat_models import init_chat_model

# If not already set
os.environ["GOOGLE_API_KEY"] ="your api key here"
os.environ.get("GOOGLE_API_KEY")

# Initialize Gemini model
llm = init_chat_model("google_genai:gemini-2.0-flash")
llm_with_tools = llm.bind_tools(tools)

In [None]:
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list, add_messages]

graph_builder = StateGraph(State)

def chatbot(state: State):
    return {"messages": [llm_with_tools.invoke(state["messages"])]}

graph_builder.add_node("chatbot", chatbot)

In [None]:
import json
from langchain_core.messages import ToolMessage

class BasicToolNode:
    def __init__(self, tools: list) -> None:
        self.tools_by_name = {tool.name: tool for tool in tools}

    def __call__(self, inputs: dict):
        if messages := inputs.get("messages", []):
            message = messages[-1]
        else:
            raise ValueError("No message found in input")
        outputs = []
        for tool_call in message.tool_calls:
            tool_result = self.tools_by_name[tool_call["name"]].invoke(
                tool_call["args"]
            )
            outputs.append(
                ToolMessage(
                    content=json.dumps(tool_result),
                    name=tool_call["name"],
                    tool_call_id=tool_call["id"],
                )
            )
        return {"messages": outputs}

tool_node = BasicToolNode(tools=[google_tool])
graph_builder.add_node("tools", tool_node)

In [None]:
def route_tools(state: State):
    if isinstance(state, list):
        ai_message = state[-1]
    elif messages := state.get("messages", []):
        ai_message = messages[-1]
    else:
        raise ValueError("No messages found in input state to tool_edge: {state}")
    if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
        return "tools"
    return END

graph_builder.add_conditional_edges("chatbot", route_tools, {"tools": "tools", END: END})
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")
graph = graph_builder.compile()

In [None]:
import os

def _set_env(key):
    value = os.environ.get(key)
    if value is None:
        value = input(f"Please enter your {key}: ")
        os.environ[key] = value

# Set keys
_set_env("GOOGLE_API_KEY")   # From Google Cloud Console
_set_env("GOOGLE_CSE_ID")    # From Programmable Search Engine (CSE)

In [None]:
# Replace complex graph definition with minimal one for fast reply
from langgraph.graph import StateGraph
from langchain_core.runnables import RunnableLambda
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list, add_messages]

def simple_llm_node(state):
    user_input = state["messages"][-1]["content"]
    response = {"role": "assistant", "content": f"Echo: {user_input}"}
    return {"messages": state["messages"] + [response]}

# Pass the State type object for `StateGraph`
graph = StateGraph(State)
graph.add_node("echo", RunnableLambda(simple_llm_node))
graph.set_entry_point("echo")
graph.set_finish_point("echo")
graph = graph.compile()

In [None]:
import asyncio

async def stream_graph_updates_async(user_input: str):
    print("Assistant: ", end='', flush=True)
    async for event in graph.astream({"messages": [{"role": "user", "content": user_input}]}):
        message = event.get("messages", [])
        if message:
            content = message[-1].get("content", "")
            if content:
                sys.stdout.write(content)
                sys.stdout.flush()
    print()

async def main_async():
    while True:
        user_input = input("User: ")
        if user_input.lower() in ["exit", "quit", "q"]:
            print("Goodbye!")
            break
        await stream_graph_updates_async(user_input)

# Instead of asyncio.run(), get the current event loop and run the coroutine until it completes
loop = asyncio.get_event_loop()
# Check if the loop is already running, if not, run it forever
if not loop.is_running():
    try:
        loop.run_until_complete(main_async())
    finally:
        loop.close()
else:
    # If the loop is already running, schedule the coroutine to be executed
    asyncio.ensure_future(main_async())

In [None]:
import sys

def stream_graph_updates(user_input: str):
    print("Assistant: ", end='', flush=True)
    try:
        for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}):
            message = event.get("messages", [])
            if message:
                content = message[-1].get("content", "")
                if content:
                    sys.stdout.write(content)
                    sys.stdout.flush()
        print()  # Newline after response finishes
    except Exception as e:
        print(f"\n[Error while streaming response: {e}]")

def main():
    while True:
        try:
            user_input = input("User: ")
            if user_input.lower() in ["quit", "exit", "q"]:
                print("Goodbye!")
                break
            stream_graph_updates(user_input)
        except KeyboardInterrupt:
            print("\nGoodbye!")
            break
        except Exception as e:
            print(f"\nUnexpected error: {e}")
            break

main()
