# **Installing Dependencies**

In [13]:
pip install git+https://github.com/traversaal-ai/AgentPro.git -q

  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone


In [None]:
!pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib gradio openai tzlocal python-dateutil

# **Uploading Credentials**

In [2]:
from google.colab import files
files.upload()  # Upload your credentials.json

Saving credentials.json to credentials.json


{'credentials.json': b'{"web":{"client_id":"445245336189-e387aqaburibr387r956nrrjhvqqlttt.apps.googleusercontent.com","project_id":"daily-planner-app-461915","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://oauth2.googleapis.com/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"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","redirect_uris":["http://localhost:8000","https://localhost:8000"]}}'}

# **Google Calendar API Authentication**

In [3]:
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
import pickle
import os
import re

SCOPES = ['https://www.googleapis.com/auth/calendar']

def extract_auth_code(full_url):
    """Extracts just the authorization code from the full redirect URL"""
    match = re.search(r'code=([^&]+)', full_url)
    if match:
        return match.group(1)
    return full_url  # Fallback if regex fails

def authenticate_google():
    creds = None
    # Load token if exists
    if os.path.exists('token.pkl'):
        with open('token.pkl', 'rb') as token:
            creds = pickle.load(token)

    if not creds or not creds.valid:
        flow = InstalledAppFlow.from_client_secrets_file(
            'credentials.json',
            SCOPES,
            redirect_uri='https://colab.research.google.com'
        )

        auth_url, _ = flow.authorization_url(prompt='consent')
        print('STEP 1: Click this URL and authorize the application:')
        print(auth_url)
        print('\nAfter approving, you will be redirected to an invalid page.')
        print('STEP 2: Copy the ENTIRE URL from the address bar and paste it below.')

        # Get the full redirect URL
        full_url = input('Paste the full redirect URL here: ')

        # Extract just the code parameter
        auth_code = extract_auth_code(full_url)

        # Exchange code for tokens
        flow.fetch_token(code=auth_code)
        creds = flow.credentials

        # Save credentials for future runs
        with open('token.pkl', 'wb') as token:
            pickle.dump(creds, token)

    return build('calendar', 'v3', credentials=creds)

# Run authentication
print("Starting Google Calendar API authentication...")
try:
    service = authenticate_google()
    print("\n✅ Authentication successful!")
    print("You can now use the 'service' object to interact with the Google Calendar API.")

    # Test the connection
    print("\nTesting connection - listing your calendars:")
    calendars = service.calendarList().list().execute()
    for calendar in calendars.get('items', []):
        print(f"- {calendar['summary']}")

except Exception as e:
    print(f"\n❌ Authentication failed: {str(e)}")
    print("\nTroubleshooting steps:")
    print("1. Ensure 'https://colab.research.google.com' is in your Authorized Redirect URIs")
    print("2. Verify your email is added as a test user in OAuth consent screen")
    print("3. Check that Google Calendar API is enabled")
    print("4. Make sure you're copying the ENTIRE redirect URL after approval")
    print("5. Try deleting token.pkl and starting fresh if issues persist")

Starting Google Calendar API authentication...
STEP 1: Click this URL and authorize the application:
https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=445245336189-e387aqaburibr387r956nrrjhvqqlttt.apps.googleusercontent.com&redirect_uri=https%3A%2F%2Fcolab.research.google.com&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcalendar&state=S4zGAcZhUqxtSNUgFzsL46buHiGf9F&prompt=consent&access_type=offline

After approving, you will be redirected to an invalid page.
STEP 2: Copy the ENTIRE URL from the address bar and paste it below.
Paste the full redirect URL here: https://colab.research.google.com/v2/drive%3Fstate=S4zGAcZhUqxtSNUgFzsL46buHiGf9F&code=4/0AUJR-x5nVwSGmlypnu6O2nC8s3MZOlKiJX3ifRUnhdN0j0N6rCd7Z8D7s7B1w_kt2q76jg&scope=https://www.googleapis.com/auth/calendar?state=S4zGAcZhUqxtSNUgFzsL46buHiGf9F&code=4/0AUJR-x5nVwSGmlypnu6O2nC8s3MZOlKiJX3ifRUnhdN0j0N6rCd7Z8D7s7B1w_kt2q76jg&scope=https://www.googleapis.com/auth/calendar

✅ Authentication successful!
You can 

In [4]:
# Retrieve API key securely from Colab user data
from google.colab import userdata
OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
ARES_API_KEY = userdata.get('ARES_API_KEY')

# **Custom Tools**

## **Daily Event Summary Tool**

In [5]:
from agentpro.tools import Tool
from typing import Any
import datetime
import dateutil.parser
import re

class DailyEventSummaryTool(Tool):
    # ── 1) Class attributes (Pydantic fields) ───────────────────────────
    name: str = "Daily Event Summary"
    description: str = (
        "Provide a narrative summary of Google Calendar events for any requested day. "
        "You can ask for today, tomorrow, day after tomorrow, last day of this year, "
        "a weekday of the current week (e.g., 'Wednesday'), or an explicit date like 'June 10, 2025'."
    )
    action_type: str = "daily_event_summary"
    input_format: str = (
        "Natural language calendar query. Examples:\n"
        "  • \"What’s on my calendar today?\"\n"
        "  • \"Show me my schedule tomorrow.\"\n"
        "  • \"Plan for day after tomorrow.\"\n"
        "  • \"Events on the last day of this year.\"\n"
        "  • \"What do I have on Wednesday?\"\n"
        "  • \"What do I have on June 10, 2025?\"\n"
    )

    # ── 2) We expect a Google Calendar “service” to be passed in at instantiation ──
    service: Any

    def run(self, input_text: Any) -> str:
        """
        Determine which calendar day the user wants and return a single-sentence
        narrative describing each event's start time, end time, and duration.

        Supported queries:
          - “today”
          - “tomorrow”
          - “day after tomorrow”
          - “last day of this year” (Dec 31, current year)
          - any weekday of the current week (e.g., “Monday”, “Friday”)
          - explicit dates (e.g., “June 10, 2025” or “2025-06-10”)
        """
        text = str(input_text).lower()

        # ────────────────────────────────────────────────────────────────────
        # A) Handle relative-day keywords and weekdays of this week
        # ────────────────────────────────────────────────────────────────────
        today_utc = datetime.datetime.utcnow().date()
        target_date = None

        # 1) “today”
        if "today" in text:
            target_date = today_utc

        # 2) “tomorrow” (but not “day after tomorrow”)
        elif "tomorrow" in text and "day after" not in text:
            target_date = today_utc + datetime.timedelta(days=1)

        # 3) “day after tomorrow”
        elif "day after tomorrow" in text:
            target_date = today_utc + datetime.timedelta(days=2)

        # 4) “last day of this year” or “last day of the year”
        elif "last day of this year" in text or "last day of the year" in text:
            year = today_utc.year
            target_date = datetime.date(year, 12, 31)

        else:
            # 5) Try to match a weekday name in the current week
            weekdays = {
                "monday": 1,
                "tuesday": 2,
                "wednesday": 3,
                "thursday": 4,
                "friday": 5,
                "saturday": 6,
                "sunday": 7
            }
            for name, iso_num in weekdays.items():
                if name in text:
                    # Compute offset from today's ISO weekday to the requested one
                    today_iso = today_utc.isoweekday()  # Monday=1 ... Sunday=7
                    delta_days = iso_num - today_iso
                    target_date = today_utc + datetime.timedelta(days=delta_days)
                    break

            # 6) If still None, try to parse an explicit date
            if target_date is None and re.search(r"\d", text):
                try:
                    parsed_dt = dateutil.parser.parse(text, fuzzy=True)
                    target_date = parsed_dt.date()
                except (ValueError, OverflowError):
                    target_date = None

        # If we still don't have a date, return fallback instructions
        if target_date is None:
            return (
                "Sorry, I couldn’t figure out which day you mean.\n"
                "Please ask for:\n"
                "  • “today”\n"
                "  • “tomorrow”\n"
                "  • “day after tomorrow”\n"
                "  • “last day of this year”\n"
                "  • a weekday this week (e.g., “Wednesday”)\n"
                "  • or specify an explicit date (e.g., “June 10, 2025”)."
            )

        # ────────────────────────────────────────────────────────────────────
        # B) Build UTC‐based timestamps for that entire day: [00:00 → next 00:00)
        # ────────────────────────────────────────────────────────────────────
        start_of_day = datetime.datetime.combine(target_date, datetime.time.min).isoformat() + "Z"
        end_of_day = (
            datetime.datetime.combine(target_date, datetime.time.min)
            + datetime.timedelta(days=1)
        ).isoformat() + "Z"

        # ────────────────────────────────────────────────────────────────────
        # C) Query Google Calendar API for events in that window
        # ────────────────────────────────────────────────────────────────────
        events_res = (
            self.service.events()
                        .list(
                            calendarId="primary",
                            timeMin=start_of_day,
                            timeMax=end_of_day,
                            singleEvents=True,
                            orderBy="startTime"
                        )
                        .execute()
        )
        items = events_res.get("items", [])
        if not items:
            return f"You have no events scheduled for {target_date.strftime('%B %d, %Y')}."

        # ────────────────────────────────────────────────────────────────────
        # D) Build narrative: “On {date}, first meeting is ‘X’, which will start at {h:mm AM/PM}
        #    and end at {h:mm AM/PM} (Duration: {N hours M minutes}), and second meeting is ….”
        # ────────────────────────────────────────────────────────────────────
        ordinals = [
            "first", "second", "third", "fourth", "fifth",
            "sixth", "seventh", "eighth", "ninth", "tenth"
        ]

        narrative_parts = []
        for idx, ev in enumerate(items):
            start_raw = ev["start"].get("dateTime")
            end_raw   = ev["end"].get("dateTime")
            summary = ev.get("summary", "(no title)")

            if start_raw and end_raw:
                # Timed event
                start_dt = datetime.datetime.fromisoformat(start_raw.replace("Z", "+00:00"))
                end_dt   = datetime.datetime.fromisoformat(end_raw.replace("Z", "+00:00"))

                # Format “H:MM AM/PM”
                start_str = start_dt.strftime("%I:%M %p").lstrip("0")
                end_str   = end_dt.strftime("%I:%M %p").lstrip("0")

                # Compute duration
                duration_ts = end_dt - start_dt
                total_seconds = int(duration_ts.total_seconds())
                hours = total_seconds // 3600
                minutes = (total_seconds % 3600) // 60

                if hours and minutes:
                    duration_str = f"{hours} hours {minutes} minutes"
                elif hours:
                    duration_str = f"{hours} hours"
                else:
                    duration_str = f"{minutes} minutes"

                ordinal = ordinals[idx] if idx < len(ordinals) else f"{idx+1}th"
                part = (
                    f"{ordinal} meeting is “{summary},” which will start at {start_str} "
                    f"and end at {end_str} (Duration: {duration_str})"
                )
            else:
                # All-day event
                start_date = ev["start"].get("date")
                ordinal = ordinals[idx] if idx < len(ordinals) else f"{idx+1}th"
                part = f"{ordinal} event is “{summary},” which is an all-day event on {start_date}"

            narrative_parts.append(part)

        # Join all parts with “, and …”
        joined = ", and ".join(narrative_parts)
        date_str = target_date.strftime("%B %d, %Y")

        return f"On {date_str}, {joined}."

## **Weekly Event Summary Tool**

In [6]:
from agentpro.tools import Tool
from typing import Any, List, Tuple, Optional
import datetime
import dateutil.parser
import re


class WeeklyEventSummaryTool(Tool):
    """
    Tool to provide a narrative summary of Google Calendar events for an entire week.
    Users can ask for “this week,” “next week,” “last week,” or specify a date (e.g., “week of June 10, 2025”).
    The tool will gather events from Monday through Sunday of the chosen week and return a day-by-day narrative.
    """

    # ── 1) Class attributes ─────────────────────────────────────────────────────────────────
    name: str = "Weekly Event Summary"
    description: str = (
        "Provide a narrative summary of Google Calendar events for any requested week. "
        "You can ask for:\n"
        "  • “My events this week.”\n"
        "  • “Show my schedule next week.”\n"
        "  • “Weekly summary for last week.”\n"
        "  • “What do I have the week of June 10, 2025?”\n"
        "  • “Events for week of 2025-06-10.”\n"
        "If no explicit week is mentioned, defaults to this week (Monday→Sunday)."
    )
    action_type: str = "weekly_event_summary"
    input_format: str = (
        "Natural-language calendar query specifying a week. Examples:\n"
        "  • “What’s on my calendar this week?”\n"
        "  • “Show me my schedule next week.”\n"
        "  • “Weekly summary for last week.”\n"
        "  • “Events for the week of June 10, 2025.”\n"
        "  • “Week of 2025-06-10.”"
    )

    # ── 2) We expect a Google Calendar “service” to be passed in at instantiation ───────────
    service: Any

    # ── 3) Main entry point ─────────────────────────────────────────────────────────────────
    def run(self, input_text: Any) -> str:
        """
        Parse a natural-language weekly summary command, identify target week, and return a narrative summary.
        """
        text = str(input_text).strip().lower()

        # 1) Determine which week the user means
        week_start = self._resolve_week_start(text)
        if week_start is None:
            return (
                "Sorry, I couldn't determine which week you meant. "
                "Please ask for 'this week', 'next week', 'last week', or 'week of <date>'."
            )

        # 2) Build a list of dates from Monday through Sunday
        dates = [week_start + datetime.timedelta(days=i) for i in range(7)]
        week_end = dates[-1]

        # 3) For each day in the week, fetch events and build narrative parts
        narrative_parts: List[str] = []
        for day in dates:
            events = self._fetch_events_on_date(day)
            day_str = day.strftime("%A, %B %d, %Y")
            if not events:
                narrative_parts.append(f"On {day_str}, you have no events scheduled.")
            else:
                # Build a single sentence listing each event’s start time, end time, and title
                sentences: List[str] = []
                for idx, ev in enumerate(events):
                    start_raw = ev["start"].get("dateTime")
                    end_raw   = ev["end"].get("dateTime")
                    summary = ev.get("summary", "(no title)")

                    if start_raw and end_raw:
                        start_dt = datetime.datetime.fromisoformat(start_raw.replace("Z", "+00:00"))
                        end_dt   = datetime.datetime.fromisoformat(end_raw.replace("Z", "+00:00"))
                        start_str = start_dt.strftime("%I:%M %p").lstrip("0")
                        end_str   = end_dt.strftime("%I:%M %p").lstrip("0")
                        sentences.append(f"“{summary}” from {start_str} to {end_str}")
                    else:
                        # Should not happen for non-all-day events since we filter them
                        sentences.append(f"“{summary}” (all-day)")

                # Join individual event descriptions with “; ”
                day_events_str = "; ".join(sentences)
                narrative_parts.append(f"On {day_str}, you have: {day_events_str}.")

        # 4) Combine into one multiline narrative
        week_range_str = f"{week_start.strftime('%B %d, %Y')} to {week_end.strftime('%B %d, %Y')}"
        header = f"Weekly summary for {week_range_str}:"
        body = " ".join(narrative_parts)
        return f"{header}\n\n{body}"

    # ────────────────────────────────────────────────────────────────────────────────────────────

    def _resolve_week_start(self, text: str) -> Optional[datetime.date]:
        """
        Determine the Monday (week_start) of the requested week.
        Supports 'this week', 'next week', 'last week', or 'week of <date>'.
        If no keyword found, defaults to this week.
        """
        today = datetime.date.today()
        weekday = today.weekday()  # Monday=0 ... Sunday=6
        monday_this_week = today - datetime.timedelta(days=weekday)

        # 1) Check for 'this week'
        if "this week" in text:
            return monday_this_week

        # 2) 'next week'
        if "next week" in text:
            return monday_this_week + datetime.timedelta(days=7)

        # 3) 'last week'
        if "last week" in text:
            return monday_this_week - datetime.timedelta(days=7)

        # 4) 'week of <date>'
        # Look for a date substring to parse
        # e.g., "week of june 10, 2025" or "week of 2025-06-10"
        match = re.search(
            r"week\s+of\s+(.+)", text
        )
        if match:
            date_part = match.group(1).strip()
            try:
                parsed = dateutil.parser.parse(date_part, fuzzy=True)
                target_date = parsed.date()
                # Find Monday of that week
                wd = target_date.weekday()
                return target_date - datetime.timedelta(days=wd)
            except (ValueError, OverflowError):
                return None

        # 5) If no explicit keyword, default to this week
        return monday_this_week

    def _fetch_events_on_date(self, date_obj: datetime.date) -> List[dict]:
        """
        Fetch all non-all-day events on the provided date (UTC midnight → next midnight).
        Returns a list of event dicts (as returned by Google Calendar API), sorted by start time.
        """
        start_of_day = datetime.datetime.combine(date_obj, datetime.time.min).isoformat() + "Z"
        end_of_day = (datetime.datetime.combine(date_obj, datetime.time.min)
                      + datetime.timedelta(days=1)).isoformat() + "Z"

        events_res = (
            self.service.events()
                        .list(
                            calendarId="primary",
                            timeMin=start_of_day,
                            timeMax=end_of_day,
                            singleEvents=True,
                            orderBy="startTime"
                        )
                        .execute()
        )
        items = events_res.get("items", [])
        # Filter out all-day events (they have 'start.date' instead of 'start.dateTime')
        non_all_day = [ev for ev in items if ev.get("start", {}).get("dateTime")]
        return sorted(
            non_all_day,
            key=lambda ev: datetime.datetime.fromisoformat(ev["start"]["dateTime"].replace("Z", "+00:00"))
        )


## **Modify Event Tool**

In [7]:
from agentpro.tools import Tool
from typing import Any, List, Tuple, Optional
import datetime
import dateutil.parser
import re


class ModifyEventTool(Tool):
    """
    Tool to change the date, time, and/or duration of an existing Google Calendar event.
    Accepts natural‐language commands such as:
      • “Shift the first meeting on Saturday to Monday”
      • “Shift the first meeting on Saturday to 5 AM”
      • “Reschedule all my meetings from Saturday to Sunday.”
      • “Reschedule the second meeting from Monday to Wednesday at 10 AM.”
      • “Shift the first meeting on 7th June to 9th June.”
      • “Change my first meeting on June 10, 2025, to June 12, 2025, at 2 PM.”
      • “Reschedule all of my tomorrow meetings to Sunday.”
      • “Move my ‘Team Sync’ meeting today from 3 PM to 4 PM.”
      • “Change ‘Project Discussion’ meeting tomorrow to 2:30 PM.”
      • “Reschedule my third appointment tomorrow to Friday at 11 AM.”
    """

    # ── 1) Class attributes ─────────────────────────────────────────────────────────────────
    name: str = "Modify Event"
    description: str = (
        "Change the date, time, and/or duration of an existing Google Calendar event. "
        "Supports natural‐language like 'Shift the first meeting on Saturday to 5 AM.'"
    )
    action_type: str = "modify_event"
    input_format: str = (
        "Natural‐language command describing which event(s) to modify and how.\n"
        "Examples:\n"
        "  • “Shift the first meeting on Saturday to Monday.”\n"
        "  • “Shift the first meeting on Saturday to 5 AM.”\n"
        "  • “Shift the first meeting on 7th June to 9th June.”\n"
        "  • “Reschedule all of my tomorrow meetings to Sunday.”\n"
        "  • “Move my ‘Team Sync’ meeting today from 3 PM to 4 PM.”\n"
        "  • “Change ‘Project Discussion’ meeting tomorrow to 2:30 PM.”\n"
        "  • “Reschedule the second meeting from Monday to Wednesday at 10 AM.”\n"
        "  • “Change my first meeting on June 10, 2025, to June 12, 2025, at 2 PM.”\n"
        "  • “Reschedule my third appointment tomorrow to Friday at 11 AM.”\n"
    )

    # ── 2) We expect a Google Calendar “service” to be passed in at instantiation ───────────
    service: Any

    # ── 3) Main entry point ─────────────────────────────────────────────────────────────────
    def run(self, input_text: Any) -> str:
        """
        Parse a natural‐language modification command, identify target event(s), and update them.
        Returns a human‐readable confirmation or an error message if something goes wrong.
        """
        text = str(input_text).strip()

        # 1) Split into “source” (which event(s) to modify) vs “target” (new date/time)
        source_part, target_part = self._split_source_target(text)
        if source_part is None or target_part is None:
            return (
                "Sorry, I couldn't identify which part of your command specifies the modification. "
                "Please use a format like “Shift the first meeting on Saturday to 5 AM.”"
            )

        # 2) From source_part, extract ordinal (“first”, “second”, “last”, “all”), title (if any),
        #    and source_date_spec (weekday/explicit date/today/tomorrow).
        ordinal = self._extract_ordinal(source_part)
        title = self._extract_title(source_part)
        source_date_spec = self._extract_date_spec(source_part)

        if source_date_spec is None:
            return (
                "Sorry, I couldn't determine which day or date you meant. "
                "Please specify a weekday (e.g., 'Monday'), 'today', 'tomorrow', or an explicit date (e.g., 'June 10, 2025')."
            )

        # 3) Resolve source_date_spec → a concrete `datetime.date`
        source_date = self._resolve_date(source_date_spec)
        if source_date is None:
            return f"Sorry, I couldn’t parse the date '{source_date_spec}'."

        # 4) Fetch all non‐all‐day events on that source_date
        events = self._fetch_events_on_date(source_date)
        if not events:
            return f"You have no non‐all‐day events on {source_date.strftime('%B %d, %Y')}."

        # 5) Select which events to modify based on ordinal/title
        target_events = self._select_target_events(events, ordinal, title)
        if isinstance(target_events, str):
            # an error message string
            return target_events
        if not target_events:
            return f"No events found matching that specification on {source_date.strftime('%B %d, %Y')}."

        # 6) Parse the “target” spec to determine new_date_spec and new_time_spec
        new_date_spec, new_time_spec = self._parse_target_part(target_part)

        # 7) Resolve new_date_spec → `datetime.date` (if given). If omitted, keep original date.
        new_date: Optional[datetime.date] = None
        if new_date_spec:
            new_date = self._resolve_date(new_date_spec)
            if new_date is None:
                return f"Sorry, I couldn’t parse the target date '{new_date_spec}'."

        # 8) Resolve new_time_spec → either (new_start_time, new_end_time) or (new_start_time, None).
        new_start_time: Optional[datetime.time] = None
        new_end_time: Optional[datetime.time] = None
        if new_time_spec:
            parsed = self._resolve_time_spec(new_time_spec)
            if parsed is None:
                return (
                    f"Sorry, I couldn’t parse the target time '{new_time_spec}'. "
                    f"Please specify like 'at 2 PM', 'to 4 PM', or 'from 3 PM to 5 PM'."
                )
            new_start_time, new_end_time = parsed

        # 9) For each selected event, compute new start/end datetimes (preserving duration logic)
        updates: List[Tuple[str, datetime.datetime, datetime.datetime, datetime.datetime, datetime.datetime]] = []
        # Each tuple: (event_summary, old_start_dt, old_end_dt, new_start_dt, new_end_dt)
        for ev in target_events:
            old_start_dt = datetime.datetime.fromisoformat(
                ev["start"]["dateTime"].replace("Z", "+00:00")
            )
            old_end_dt = datetime.datetime.fromisoformat(
                ev["end"]["dateTime"].replace("Z", "+00:00")
            )
            original_duration = old_end_dt - old_start_dt

            # Determine which date to apply: either new_date or old_start_dt.date()
            apply_date = new_date if new_date else old_start_dt.date()

            # Keep the same tzinfo as the original event
            tzinfo = old_start_dt.tzinfo

            if new_start_time:
                # Case A: New time is provided (could be start-only or start+end)
                new_start_dt = datetime.datetime.combine(apply_date, new_start_time, tzinfo=tzinfo)
                if new_end_time:
                    new_end_dt = datetime.datetime.combine(apply_date, new_end_time, tzinfo=tzinfo)
                else:
                    # Single new_start: preserve original duration
                    new_end_dt = new_start_dt + original_duration
            else:
                # Case B: No new time provided → keep original start/end times but shift date if needed
                original_start_time = old_start_dt.time()
                original_end_time = old_end_dt.time()
                new_start_dt = datetime.datetime.combine(apply_date, original_start_time, tzinfo=tzinfo)
                new_end_dt = datetime.datetime.combine(apply_date, original_end_time, tzinfo=tzinfo)

            # 10) Update the event in Google Calendar
            _ = self._update_event(
                event_id=ev["id"],
                new_start_iso=new_start_dt.isoformat(),
                new_end_iso=new_end_dt.isoformat(),
            )
            updates.append((ev.get("summary", "(no title)"), old_start_dt, old_end_dt, new_start_dt, new_end_dt))

        # 11) Return a confirmation message
        return self._format_confirmation(updates)

    # ────────────────────────────────────────────────────────────────────────────────────────────

    def _split_source_target(self, text: str) -> Tuple[Optional[str], Optional[str]]:
        """
        Naively split the input_text into source_part (which events) vs target_part (new date/time).
        We look for the first occurrence of ' to ' that is not part of a 'from X to Y' time range.
        """
        lowered = text.lower()
        # If there's a "from X to Y" time‐range, skip that " to " and find the next " to "
        time_range_match = re.search(
            r"\bfrom\s+\d{1,2}(:\d{2})?\s*(am|pm)?\s+to\s+\d{1,2}(:\d{2})?\s*(am|pm)?",
            lowered
        )
        if time_range_match:
            span_start, span_end = time_range_match.span()
            next_to = lowered.find(" to ", span_end)
            if next_to != -1:
                before = text[:next_to]
                after = text[next_to + len(" to "):]
                return before.strip(), after.strip()

        # Fallback: split on the first ' to '
        parts = re.split(r"\s+to\s+", text, maxsplit=1)
        if len(parts) == 2:
            return parts[0].strip(), parts[1].strip()

        return None, None

    def _extract_ordinal(self, text: str) -> Optional[str]:
        """
        Extract ordinal keyword (first, second, third, ..., last, all).
        Returns the matched keyword (lowercased) or None if none found.
        """
        match = re.search(
            r"\b(first|second|third|fourth|fifth|sixth|seventh|eighth|ninth|tenth|last|all)\b",
            text,
            re.IGNORECASE
        )
        return match.group(1).lower() if match else None

    def _extract_title(self, text: str) -> Optional[str]:
        """
        If the user specified an event title in quotes (single or double),
        return that substring (without quotes). Otherwise, None.
        """
        match = re.search(r"[‘'“\"]([^‘'”\"]+)[’'”\"]", text)
        if match:
            return match.group(1).strip()
        return None

    def _extract_date_spec(self, text: str) -> Optional[str]:
        """
        Return the substring that indicates a source date: 'today', 'tomorrow',
        a weekday name, or an explicit date. Otherwise, None.
        """
        lowered = text.lower()

        if "today" in lowered:
            return "today"
        if "tomorrow" in lowered:
            return "tomorrow"

        # Weekday names
        weekdays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
        for wd in weekdays:
            if re.search(rf"\b{wd}\b", lowered):
                return wd

        # Check if there's an explicit date phrase: must contain a month name or ISO format
        if re.search(r"\b(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\w*\b", lowered) \
           or re.search(r"\b\d{4}-\d{2}-\d{2}\b", text) \
           or re.search(r"\b\d{1,2}(st|nd|rd|th)?\s+(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\w*\b", lowered):
            # We'll hand the full text to dateutil.parse later
            return text

        return None

    def _resolve_date(self, spec: str) -> Optional[datetime.date]:
        """
        Convert specifications like 'today', 'tomorrow', 'Saturday',
        '7th June', 'June 10, 2025' into a datetime.date object.
        """
        spec_lower = spec.strip().lower()
        today = datetime.date.today()

        if spec_lower == "today":
            return today
        if spec_lower == "tomorrow":
            return today + datetime.timedelta(days=1)

        # Weekday resolution: find next occurrence (this week or next)
        weekdays_map = {
            "monday": 0,
            "tuesday": 1,
            "wednesday": 2,
            "thursday": 3,
            "friday": 4,
            "saturday": 5,
            "sunday": 6,
        }
        if spec_lower in weekdays_map:
            target_wd = weekdays_map[spec_lower]
            today_wd = today.weekday()  # Monday=0 ... Sunday=6
            if target_wd >= today_wd:
                delta = target_wd - today_wd
            else:
                delta = 7 - (today_wd - target_wd)
            return today + datetime.timedelta(days=delta)

        # Try explicit date parsing (assume current year if omitted)
        try:
            parsed = dateutil.parser.parse(
                spec,
                fuzzy=True,
                default=datetime.datetime(today.year, 1, 1),
            )
            return parsed.date()
        except (ValueError, OverflowError):
            return None

    def _fetch_events_on_date(self, date_obj: datetime.date) -> List[dict]:
        """
        Fetch all non‐all‐day events on the provided date (UTC midnight to next midnight).
        Returns a list of event dicts (as returned by Google Calendar API).
        """
        start_of_day = datetime.datetime.combine(date_obj, datetime.time.min).isoformat() + "Z"
        end_of_day = (datetime.datetime.combine(date_obj, datetime.time.min)
                      + datetime.timedelta(days=1)).isoformat() + "Z"

        events_res = (
            self.service.events()
                        .list(
                            calendarId="primary",
                            timeMin=start_of_day,
                            timeMax=end_of_day,
                            singleEvents=True,
                            orderBy="startTime"
                        )
                        .execute()
        )
        items = events_res.get("items", [])
        # Filter out all‐day events (those have 'start.date' instead of 'start.dateTime')
        non_all_day = [ev for ev in items if ev.get("start", {}).get("dateTime")]
        return non_all_day

    def _select_target_events(
        self,
        events: List[dict],
        ordinal: Optional[str],
        title: Optional[str]
    ) -> Any:
        """
        Given a list of events on the same date, choose which to modify based on:
          - If title is provided: filter by case‐insensitive substring match.
            • If exactly one match → return [that_event].
            • If multiple:
                - If ordinal == 'first' or 'last' → pick earliest or latest among those matches.
                - Else → return an error string prompting clarification.
            • If no match → return [].
          - If no title but ordinal provided:
            - Sort all events by start time.
            - 'first' → [earliest], 'second' → [second-earliest], 'last' → [latest], 'all' → all.
            - If the specified ordinal index is out of range → return [].
          - If neither title nor ordinal → return an error string asking for clarification.
        """
        # Title-based selection
        if title:
            matches = [
                ev for ev in events
                if title.lower() in (ev.get("summary", "") or "").lower()
            ]
            if not matches:
                return []
            if len(matches) == 1:
                return matches

            # Multiple matches and ordinal present?
            if ordinal in ("first", "last"):
                sorted_by_time = sorted(
                    matches,
                    key=lambda ev: datetime.datetime.fromisoformat(
                        ev["start"]["dateTime"].replace("Z", "+00:00")
                    )
                )
                return [sorted_by_time[0]] if ordinal == "first" else [sorted_by_time[-1]]

            return (
                f"Multiple events match '{title}'. Which one did you mean? "
                f"You can say 'first {title}' or 'last {title}'."
            )

        # No title; rely on ordinal
        if ordinal:
            sorted_all = sorted(
                events,
                key=lambda ev: datetime.datetime.fromisoformat(
                    ev["start"]["dateTime"].replace("Z", "+00:00")
                )
            )
            if ordinal == "all":
                return sorted_all
            if ordinal == "first":
                return [sorted_all[0]] if sorted_all else []
            if ordinal == "last":
                return [sorted_all[-1]] if sorted_all else []
            ord_map = {
                "second": 1,
                "third": 2,
                "fourth": 3,
                "fifth": 4,
                "sixth": 5,
                "seventh": 6,
                "eighth": 7,
                "ninth": 8,
                "tenth": 9
            }
            if ordinal in ord_map:
                idx = ord_map[ordinal]
                return [sorted_all[idx]] if idx < len(sorted_all) else []
            return []

        # Neither title nor ordinal → ambiguous
        return (
            "Please specify which event(s) to modify (e.g., 'first meeting', "
            "'last appointment', 'all meetings', or include the title in quotes)."
        )

    def _parse_target_part(self, text: str) -> Tuple[Optional[str], Optional[str]]:
        """
        Given the target_part (everything after 'to'), determine:
          - new_date_spec (like 'Monday', 'June 12, 2025', 'Friday') OR None if no date.
          - new_time_spec (like '5 AM', '2:30 PM', '3 PM to 4 PM') OR None if no time.

        Strategy:
          1) Look explicitly for date keywords first:
             • 'today' or 'tomorrow'
             • weekday names
             • explicit date phrases containing a month name or ISO format (YYYY-MM-DD)
          2) Only if one of those appears do we set new_date_spec. Otherwise, new_date_spec stays None.
          3) Independently, look for time‐range patterns or single times ("at 5 PM", "5 AM", etc.).
          4) Return (date_spec, time_spec). If neither is found, return (None, None).
        """
        lowered = text.lower()

        # 1) Identify new_date_spec (only if a date keyword is present)
        new_date_spec: Optional[str] = None
        if "today" in lowered:
            new_date_spec = "today"
        elif "tomorrow" in lowered:
            new_date_spec = "tomorrow"
        else:
            # Weekday names
            weekdays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
            for wd in weekdays:
                if re.search(rf"\b{wd}\b", lowered):
                    new_date_spec = wd
                    break

            # If still None, check for explicit date phrase: must contain a month name or ISO format
            if new_date_spec is None:
                if re.search(r"\b(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\w*\b", lowered) \
                   or re.search(r"\b\d{4}-\d{2}-\d{2}\b", text) \
                   or re.search(r"\b\d{1,2}(st|nd|rd|th)?\s+(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\w*\b", lowered):
                    new_date_spec = text

        # 2) Identify new_time_spec (if any)
        # Look for 'from X to Y' or single time patterns like 'at X', 'X AM/PM', etc.
        time_pattern = re.compile(
            r"(?P<from>from\s+)?"
            r"(?P<h1>\d{1,2}(:\d{2})?\s*(am|pm))"
            r"\s*(to\s*(?P<h2>\d{1,2}(:\d{2})?\s*(am|pm)))?",
            re.IGNORECASE
        )
        time_match = time_pattern.search(text)
        new_time_spec: Optional[str] = time_match.group(0) if time_match else None

        return new_date_spec, new_time_spec

    def _resolve_time_spec(self, spec: str) -> Optional[Tuple[datetime.time, Optional[datetime.time]]]:
        """
        Convert strings like '3 PM to 4 PM', 'at 2:30 PM', '2 PM', 'from 3 PM to 5 PM'
        into (new_start_time, new_end_time) where new_end_time may be None (meaning preserve original duration).
        """
        spec = spec.strip().lower()
        # Find all time tokens in the spec
        times = re.findall(r"(\d{1,2}(:\d{2})?\s*(am|pm))", spec, re.IGNORECASE)
        parsed_times: List[datetime.time] = []
        for ttuple in times:
            tstr = ttuple[0]
            try:
                dt = dateutil.parser.parse(tstr)
                parsed_times.append(dt.time())
            except (ValueError, OverflowError):
                continue

        if not parsed_times:
            return None
        if len(parsed_times) == 1:
            # Single new time: assume new start; preserve duration later
            return parsed_times[0], None
        # Two times: first is new_start, second is new_end
        return parsed_times[0], parsed_times[1]

    def _update_event(self, event_id: str, new_start_iso: str, new_end_iso: str) -> dict:
        """
        Call Google Calendar API to patch the event's start and end times.
        Returns the patched event resource.
        """
        updated = (
            self.service.events()
                        .patch(
                            calendarId="primary",
                            eventId=event_id,
                            body={
                                "start": {"dateTime": new_start_iso},
                                "end": {"dateTime": new_end_iso},
                            }
                        )
                        .execute()
        )
        return updated

    def _format_confirmation(
        self,
        updates: List[Tuple[str, datetime.datetime, datetime.datetime, datetime.datetime, datetime.datetime]]
    ) -> str:
        """
        Given a list of tuples:
            (event_summary, old_start_dt, old_end_dt, new_start_dt, new_end_dt)
        produce a combined, human‐readable confirmation string.
        """
        lines: List[str] = []
        for summary, old_start, old_end, new_start, new_end in updates:
            old_fmt = (
                f"{old_start.strftime('%B %d, %Y %I:%M %p').lstrip('0')}–"
                f"{old_end.strftime('%I:%M %p').lstrip('0')}"
            )
            new_fmt = (
                f"{new_start.strftime('%B %d, %Y %I:%M %p').lstrip('0')}–"
                f"{new_end.strftime('%I:%M %p').lstrip('0')}"
            )
            lines.append(
                f"The meeting \"{summary}\" originally scheduled for {old_fmt} has been rescheduled to {new_fmt}."
            )
        return " ".join(lines)

## **Current Date-Time Tool**

In [8]:
from agentpro.tools import Tool
from typing import Any
import datetime
import pytz

class CurrentDateTimeTool(Tool):
    # 1) Class attributes (Pydantic fields)
    name: str = "Current DateTime"
    description: str = "Get the current day, time, year, and month"
    action_type: str = "current_date_time_pst"
    input_format: str = "Any input; this tool returns the current date/time."

    def run(self, input_text: Any) -> str:
        """
        Ignores the input_text and returns:
          • Current weekday name (e.g., Monday)
          • Current time (HH:MM:SS) in Pakistan Standard Time
          • Current year (YYYY)
          • Current month name (e.g., June)
        """

        now_pkt = datetime.datetime.now()

        weekday = now_pkt.strftime("%A")              # e.g., "Friday"
        time_str = now_pkt.strftime("%I:%M %p")       # e.g., "07:45 PM"
        year_str = now_pkt.strftime("%Y")             # e.g., "2025"
        month = now_pkt.strftime("%B")                # e.g., "June"
        date_str = now_pkt.strftime("%d %B %Y")       # e.g., "05 June 2025"

        return (
            f"Day of week: {weekday}\n"
            f"Current time: {time_str}\n"
            f"Date: {date_str}\n"
            f"Year: {year_str}\n"
            f"Month: {month}"
        )

# **Initializing Tools and Passing to AgentPro**

In [9]:
from agentpro import create_model, ReactAgent
from agentpro.tools import UserInputTool
from agentpro.tools import AresInternetTool
import os


model = create_model(
    provider="openai",
    model_name="gpt-3.5-turbo",
    api_key=OPENAI_API_KEY,
    temperature=0.3
)

daily_planner_tool = DailyEventSummaryTool(service=service)
weekly_planner_tool = WeeklyEventSummaryTool(service=service)
modify_event_tool = ModifyEventTool(service=service)
current_dt_tool = CurrentDateTimeTool()
ares_tool = AresInternetTool(ARES_API_KEY)


tools_list = [daily_planner_tool, weekly_planner_tool, current_dt_tool, modify_event_tool, ares_tool]

# 4) Create the ReactAgent
agent = ReactAgent(model=model, tools=tools_list)

# Quick sanity-check in Colab (outside Gradio):
print("=== SANITY CHECK: agent.run(...) ===")
test_out = agent.run("How many meetings do I have on Saturday? their details?")
print("→", test_out.final_answer)

=== SANITY CHECK: agent.run(...) ===
✅  [Debug] Sending System Prompt (with history) to LLM:
You are an AI assistant that follows the ReAct (Reasoning + Acting) pattern.
        
Your goal is to help users by breaking down complex tasks into a series of thought-out steps and actions.

You have access to these tools: daily_event_summary, weekly_event_summary, current_date_time_pst, modify_event, ares_internet_search

Tool: Daily Event Summary
Description: Provide a narrative summary of Google Calendar events for any requested day. You can ask for today, tomorrow, day after tomorrow, last day of this year, a weekday of the current week (e.g., 'Wednesday'), or an explicit date like 'June 10, 2025'.
Action Type: daily_event_summary
Input Format: Natural language calendar query. Examples:
  • "What’s on my calendar today?"
  • "Show me my schedule tomorrow."
  • "Plan for day after tomorrow."
  • "Events on the last day of this year."
  • "What do I have on Wednesday?"
  • "What do I have o

# **Gradio App**

In [10]:
import gradio as gr

def chat_with_agent(user_input):
    try:
        response = agent.run(user_input)
        return response.final_answer
    except Exception as e:
        # Return the traceback or exception message so you can see what went wrong
        return "❌ Exception in agent.run():\n" + repr(e)

# Launch Gradio interface
gr.Interface(
    fn=chat_with_agent,
    inputs=gr.Textbox(lines=2, placeholder="E.g., What's on my calendar today?"),
    outputs="text",
    title="🗓️ Daily Planner Chatbot"
).launch(share=True)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://8a19ec0c06fab53dd7.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


