In [1]:
import datetime
import os
import json

from datetime import timezone
from datetime import date, timedelta
from dateutil.relativedelta import relativedelta, MO, TU, WE, TH, FR, SA, SU
from tzlocal import get_localzone

from typing import TypedDict, Annotated, List, Optional, Literal
import operator
from dotenv import load_dotenv
from pydantic import BaseModel, Field

from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

import base64

from langchain_core.tools import tool
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage
from langchain_groq import ChatGroq
from langchain_tavily import TavilySearch
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.sqlite import SqliteSaver


from IPython.display import Image, display
from dateutil.parser import parse, ParserError

In [2]:
load_dotenv()

groq_api_key = os.getenv("GROQ_API_KEY")
tavily_api_key = os.getenv("TAVILY_API_KEY") 

SCOPES = ["https://www.googleapis.com/auth/calendar", "https://www.googleapis.com/auth/gmail.readonly"]

if not all([groq_api_key, tavily_api_key]):
    raise ValueError("One or more required API keys (GROQ, TAVILY) are missing from the .env file!")


llm = ChatGroq(model="llama3-70b-8192", api_key=groq_api_key, temperature=0)
print("Groq LLM (Llama3-70b) configured and ready.")

Groq LLM (Llama3-70b) configured and ready.


In [3]:
def _get_google_credentials():
    """Gets valid Google API credentials, refreshing if necessary."""

    creds = None

    if os.path.exists("token.json"):
        creds = Credentials.from_authorized_user_file("token.json", SCOPES)

    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES)
            creds = flow.run_local_server(port=0)
        with open("token.json", "w") as token:
            token.write(creds.to_json())

    return creds


def _fetch_google_events(start_date: datetime.datetime, end_date: datetime.datetime) -> List[dict]:
    """Internal function to fetch events from Google Calendar."""

    try:
        creds = _get_google_credentials()
        service = build("calendar", "v3", credentials=creds)
        
        events_result = service.events().list(
            calendarId="primary",
            timeMin=start_date.isoformat(),
            timeMax=end_date.isoformat(),
            maxResults=250,
            singleEvents=True,
            orderBy="startTime"
        ).execute()
        
        return events_result.get("items", [])

    except HttpError as error:
        print(f"!!! Google Calendar API Error: {error}")
        return [{"error": f"An API error occurred: {error}"}]

    except Exception as e:
        print(f"!!! An unexpected error occurred: {e}")
        return [{"error": f"An unexpected error occurred: {e}"}]


def _get_email_body(parts):
    """Recursively search for text/plain parts in email body."""

    body = ""
    if parts:
        for part in parts:
            if part['mimeType'] == 'text/plain' and 'data' in part['body']:
                body += base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
            elif 'parts' in part:
                body += _get_email_body(part['parts'])

    return body


def _fetch_gmail_messages(query: str, max_results: int = 10) -> List[dict]:
    """Internal function to fetch and sort emails from Gmail using batch processing."""
    
    try:
        creds = _get_google_credentials()
        service = build("gmail", "v1", credentials=creds)
        message_list = service.users().messages().list(userId='me', q=query, maxResults=max_results).execute()
        messages = message_list.get('messages', [])
        if not messages: return []

        output_emails = []

        def gmail_callback(request_id, response, exception):
            if exception is None:
                headers = response['payload']['headers']
                output_emails.append({
                    "id": response['id'],
                    "sender": next((h['value'] for h in headers if h['name'] == 'From'), 'Unknown Sender'),
                    "subject": next((h['value'] for h in headers if h['name'] == 'Subject'), 'No Subject'),
                    "date": next((h['value'] for h in headers if h['name'] == 'Date'), 'Unknown Date'),
                    "body": _get_email_body(response['payload'].get('parts', []))[:2000],
                    "internalDate": response['internalDate']
                })

        batch_request = service.new_batch_http_request(callback=gmail_callback)
        for msg in messages:
            batch_request.add(service.users().messages().get(userId='me', id=msg['id'], format='full'))
        
        batch_request.execute()
        output_emails.sort(key=lambda e: int(e['internalDate']), reverse=True)
        return output_emails

    except HttpError as error:
        print(f"!!! Gmail API Error: {error}")
        return [{"error": f"An error occurred with the Gmail API: {error}"}]

    except Exception as e:
        print(f"!!! An unexpected error occurred: {e}")
        return [{"error": f"An unexpected error occurred: {e}"}]



In [4]:
@tool
def list_upcoming_events(limit: Optional[int] = 15) -> str:
    """Lists the user's upcoming Google Calendar events from today onwards."""

    print(f"--- Tool: list_upcoming_events called with limit={limit} ---")

    now = datetime.datetime.now(timezone.utc)
    search_end = now + datetime.timedelta(days=90)

    google_events = _fetch_google_events(start_date=now, end_date=search_end)

    if not google_events or all('error' in e for e in google_events):
        return json.dumps([{"message": "No upcoming events found."}])

    valid_events = [e for e in google_events if 'error' not in e]

    return json.dumps(valid_events[:limit], indent=2)

@tool
def get_events_for_day(natural_language_date: str) -> str:
    """Finds and lists all Google Calendar events for a specific day (like 'tomorrow', 'this Sunday')."""

    print(f"--- Tool: get_events_for_day called with query: '{natural_language_date}' ---")

    try:
        target_date = parse(natural_language_date).date()
        start_of_day = datetime.datetime.combine(target_date, datetime.time.min).replace(tzinfo=get_localzone())
        end_of_day = datetime.datetime.combine(target_date, datetime.time.max).replace(tzinfo=get_localzone())

        google_events = _fetch_google_events(start_date=start_of_day.astimezone(timezone.utc), end_date=end_of_day.astimezone(timezone.utc))
        valid_events = [e for e in google_events if 'error' not in e]

        if not valid_events:
            return json.dumps([])
        return json.dumps(valid_events, indent=2)

    except Exception as e:
        return json.dumps([{"error": f"An error occurred while parsing the date: {e}"}])

@tool
def tavily_search(query: str) -> str:
    """A search engine tool to find real-time information online."""

    print(f"--- Tool: tavily_search called with query: '{query}' ---")

    try:
        search = TavilySearch(max_results=3, api_key=tavily_api_key)
        results = search.invoke(query)
        return json.dumps(results, indent=2)

    except Exception as e:
        return f"Error occurred during web search: {e}"

@tool
def create_google_event(summary: str, start_time_iso: str, end_time_iso: str, description: Optional[str] = None, location: Optional[str] = None, attendees: Optional[List[str]] = None) -> str:
    """Creates a new event on the user's primary Google Calendar."""

    print(f"--- Tool: create_google_event called with: summary='{summary}' ---")

    try:
        creds = _get_google_credentials()
        service = build("calendar", "v3", credentials=creds)
        local_timezone = str(get_localzone())
        event_body = { 'summary': summary, 'start': {'dateTime': start_time_iso, 'timeZone': local_timezone}, 'end': {'dateTime': end_time_iso, 'timeZone': local_timezone} }

        if description:
            event_body['description'] = description

        if location:
            event_body['location'] = location

        if attendees:
            event_body['attendees'] = [{'email': email} for email in attendees]

        created_event = service.events().insert(calendarId='primary', body=event_body).execute()

        return f"Event '{summary}' was created successfully."

    except Exception as e:
        return f"An error occurred while creating the event: {e}"

@tool
def delete_google_event(event_id: str, summary: str) -> str:
    """Deletes an event from the calendar using its unique ID."""

    print(f"--- Tool: delete_google_event called for ID: {event_id} ---")

    try:
        creds = _get_google_credentials()
        service = build("calendar", "v3", credentials=creds)
        service.events().delete(calendarId='primary', eventId=event_id).execute()

        return f"The event '{summary}' was successfully deleted."

    except Exception as e:
        return f"An error occurred while deleting the event: {e}"

@tool
def get_calendar_summary(time_range: str) -> str:
    """Provides a summary of Google Calendar events for a given time range."""

    print(f"--- Tool: get_calendar_summary called for range: {time_range} ---")

    today = datetime.date.today()

    if time_range == "today":
        start_date, end_date = today, today
    elif time_range == "tomorrow":
        start_date, end_date = today + timedelta(days=1), today + timedelta(days=1)
    elif time_range == "this week":
        start_date = today - timedelta(days=today.weekday()); end_date = start_date + timedelta(days=6)
    elif time_range == "next week":
        start_date = today - timedelta(days=today.weekday()) + timedelta(days=7); end_date = start_date + timedelta(days=6)
    else:
        return json.dumps([{"error": "Unsupported time range."}])

    start_datetime = datetime.datetime.combine(start_date, datetime.time.min).astimezone(timezone.utc)
    end_datetime = datetime.datetime.combine(end_date, datetime.time.max).astimezone(timezone.utc)
    events = _fetch_google_events(start_date=start_datetime, end_date=end_datetime)

    return json.dumps(events, indent=2)

@tool
def update_google_event(event_id: str, summary: Optional[str] = None, start_time_iso: Optional[str] = None, end_time_iso: Optional[str] = None, description: Optional[str] = None, location: Optional[str] = None, attendees: Optional[List[str]] = None) -> str:
    """Updates an existing Google Calendar event using its unique ID."""

    print(f"--- Tool: update_google_event called for ID: {event_id} ---")

    try:
        creds = _get_google_credentials()
        service = build("calendar", "v3", credentials=creds)
        update_body = {}

        if summary: update_body['summary'] = summary
        # ... other fields ...
        updated_event = service.events().patch(calendarId='primary', eventId=event_id, body=update_body).execute()

        return f"Event '{updated_event.get('summary')}' was updated successfully."

    except Exception as e:
        return f"An error occurred while updating the event: {e}"


@tool
def read_and_summarize_emails(sender: Optional[str] = None, subject: Optional[str] = None, keywords: Optional[str] = None, status: Optional[str] = "unread", time_range: Optional[str] = None, max_results: int = 25) -> str:
    """Reads and summarizes emails from Gmail using advanced search queries."""

    print(f"--- Tool: read_and_summarize_emails called ---")

    query_parts = []
    if status == "unread":
        query_parts.append("is:unread")
    elif status == "read":
        query_parts.append("is:read")
    if sender:
        query_parts.append(f"from:{sender}")
    if subject:
        query_parts.append(f"subject:({subject})")
    if keywords:
        query_parts.append(keywords)
    if time_range:
        query_parts.append(f"newer_than:{time_range}")

    query = " ".join(query_parts)
    emails = _fetch_gmail_messages(query=query, max_results=max_results)
    
    if not emails or all('error' in e for e in emails):
        return "No emails found matching the criteria or an error occurred."

    summaries = [f"- From: {e['sender']}, Subject: {e['subject']}" for e in emails]
    return "Here is a summary of the most recent emails:\n\n" + "\n".join(summaries)


In [5]:
calendar_tools = [
    list_upcoming_events,
    get_events_for_day,
    create_google_event,
    delete_google_event,
    get_calendar_summary,
    update_google_event
]

email_tools = [read_and_summarize_emails]

search_tools = [tavily_search]
    
print(f"Defined {len(calendar_tools)} tools for the Calendar Expert.")
print(f"Defined {len(email_tools)} tools for the Email Expert.")
print(f"Defined {len(search_tools)} tools for the Search Expert.")

Defined 6 tools for the Calendar Expert.
Defined 1 tools for the Email Expert.
Defined 1 tools for the Search Expert.


In [6]:
class AgentState(TypedDict):
    messages: Annotated[list, operator.add]
    route: Literal["calendar", "email", "search", "conversational"]
    error: str


class RouteQuery(BaseModel):
    """Route the user's query to the correct expert."""

    route: Literal["calendar", "email", "search", "conversational"] = Field(
        description="""The category to route the user's request to, based on their intent.
        - 'calendar': For listing, creating, deleting, updating events, or asking about the calendar.
        - 'email': For reading, searching, or summarizing emails.
        - 'search': For general knowledge, weather, news, or anything requiring a web search.
        - 'conversational': For greetings, thank yous, or simple chit-chat."""
    )


def router_node(state: AgentState):
    """
    Analyzes the user's latest message and routes the task to the appropriate expert.
    """

    print("--- Router Node ---")

    structured_llm_router = llm.with_structured_output(RouteQuery)
    
    user_message = state['messages'][-1].content
    
    route_decision = structured_llm_router.invoke(
        f"Classify the following user request: '{user_message}'"
    )
    
    print(f"Router Decision: '{route_decision.route}'")

    return {"route": route_decision.route}


calendar_system_prompt = f"""
You are a calendar management expert. Today's date is: {datetime.date.today().isoformat()}.
Your task is to handle the user's calendar-related requests (creating, listing, deleting, updating events).
- To create an event, the title, start time, AND duration MUST be known. If any are missing, ASK the user.
- Before deleting or updating an event, you MUST use a search tool to find its 'id'.
- You MUST ask the user for CONFIRMATION before deleting an event.
"""

email_system_prompt = """
You are an email assistant. Your task is to find, read, and summarize emails according to the user's request.
Effectively use the filters (sender, subject, keywords) in the `read_and_summarize_emails` tool for accurate results.
"""

search_system_prompt = """
You are a research expert. Your task is to answer the user's questions about general knowledge, news, weather, or facts
by using the `tavily_search` tool. Do NOT answer based on your own knowledge.
"""


In [7]:
llm_with_calendar_tools = llm.bind_tools(calendar_tools)
calendar_tool_node = ToolNode(calendar_tools)

def calendar_agent_node(state: AgentState):
    print("--- Calendar Expert Node ---")

    messages = [SystemMessage(content=calendar_system_prompt)] + state['messages']
    response = llm_with_calendar_tools.invoke(messages)
    return {"messages": [response]}

llm_with_email_tools = llm.bind_tools(email_tools)
email_tool_node = ToolNode(email_tools)

def email_agent_node(state: AgentState):

    print("--- Email Expert Node ---")
    messages = [SystemMessage(content=email_system_prompt)] + state['messages']
    response = llm_with_email_tools.invoke(messages)
    return {"messages": [response]}

llm_with_search_tools = llm.bind_tools(search_tools)
search_tool_node = ToolNode(search_tools)

def search_agent_node(state: AgentState):

    print("--- Search Expert Node ---")
    messages = [SystemMessage(content=search_system_prompt)] + state['messages']
    response = llm_with_search_tools.invoke(messages)
    return {"messages": [response]}

def conversational_node(state: AgentState):

    print("--- Conversational Node ---")
    response = llm.invoke(state['messages'])
    return {"messages": [response]}


In [8]:
def should_call_tools(state: AgentState) -> Literal["tools", END]:

    if not isinstance(state['messages'][-1], AIMessage) or not state['messages'][-1].tool_calls:
        return END
    return "tools"

In [9]:
workflow = StateGraph(AgentState)

workflow.add_node("router", router_node)
workflow.add_node("calendar_agent", calendar_agent_node)
workflow.add_node("calendar_tools", calendar_tool_node)
workflow.add_node("email_agent", email_agent_node)
workflow.add_node("email_tools", email_tool_node)
workflow.add_node("search_agent", search_agent_node)
workflow.add_node("search_tools", search_tool_node)
workflow.add_node("conversational_agent", conversational_node)

workflow.set_entry_point("router")

workflow.add_conditional_edges(
    "router",
    lambda state: state["route"],
    {
        "calendar": "calendar_agent",
        "email": "email_agent",
        "search": "search_agent",
        "conversational": "conversational_agent"
    }
)

workflow.add_edge("conversational_agent", END)


workflow.add_conditional_edges("calendar_agent", should_call_tools, {"tools": "calendar_tools", END: END})
workflow.add_edge("calendar_tools", "calendar_agent")

workflow.add_conditional_edges("email_agent", should_call_tools, {"tools": "email_tools", END: END})
workflow.add_edge("email_tools", "email_agent")

workflow.add_conditional_edges("search_agent", should_call_tools, {"tools": "search_tools", END: END})
workflow.add_edge("search_tools", "search_agent")


<langgraph.graph.state.StateGraph at 0x107f12e40>

In [None]:
db_path = os.path.abspath("conversations.sqlite")

print(f"Using database at: {db_path}")

with SqliteSaver.from_conn_string(db_path) as memory:
    
    app = workflow.compile(checkpointer=memory)
    print("\nRouter-based graph compiled. Ready for interaction.")
    
    config = {"configurable": {"thread_id": "berky_alkn1"}}
    print("\nYour persistent assistant is ready. You can now start chatting.")
    print("Type 'quit' or 'exit' to end the conversation.")
    print("-" * 50)
    
    while True:
        try:
            user_input = input("You: ")
            if user_input.lower() in ["quit", "exit"]:
                print("Assistant: Goodbye!")
                break

            print(f"You: {user_input}")
            
            messages = [HumanMessage(content=user_input)]
            
            response = app.invoke({"messages": messages}, config)
            
            final_response = response["messages"][-1]
            if final_response.content:
                 print(f"Assistant: {final_response.content}")
            
        except Exception as e:
            print(f"\n--- An Error Occurred ---")
            print(f"Error: {e}")
            print("Please try again.")


Using database at: /Users/berkayalkan/Lecture/CMPE411/gitHub/ai-personal-assistant/conversations_routed_en.sqlite

Router-based graph compiled. Ready for interaction.

Your persistent assistant is ready. You can now start chatting.
Type 'quit' or 'exit' to end the conversation.
--------------------------------------------------
You: Hi
--- Router Node ---
Router Decision: 'conversational'
--- Conversational Node ---
Assistant: Hi Berkay! How's it going?
Assistant: Goodbye!
