# stage 0: starting the timeboxing session
The scheduling agent has detected the user intents to start a timeboxing session.
* /Timebox command
* intent classification during conversation.
* or handoff from another agent(reminder to plan)

question: carry over constraints/preferences or plans from how the user expressed their intent?

# context
we ran the /timebox command, the agent started a thread named "Timeboxing tomorrow (fri, 19-01)" (ie relative date, day name, dd-mm).

## stage 1: collecting constraints
tha agent posts which constraints it has fetched from the durable storage using a tool which serializes them to slack cards, allowing the user to review and edit them.
It asks the user which apply and what else it should take into account.
This list of constaints should always contain a timeboxing session.


# notebook harness

Goal: make it easy to run timeboxing building blocks *without Slack*.

This notebook mirrors patterns in `tests/unit/`:
- instantiate `TimeboxingFlowAgent` via `__new__` for ‚Äúno LLM / no MCP‚Äù tests
- stub the specific methods we want to isolate
- run GraphFlow turns by feeding a `TextMessage` into `build_timeboxing_graphflow(...).run_stream(...)`

When you want to go live:
- LLM calls require `OPENAI_API_KEY` or `OPENROUTER_API_KEY` (via `fateforger.core.config.settings`).
- Calendar MCP calls require `MCP_CALENDAR_SERVER_URL`.


In [None]:
# --- bootstrap imports (repo-local) ---
from __future__ import annotations

import os
import sys
from pathlib import Path


print("sys.executable:", sys.executable)
print("python:", sys.version)
print("REPO_ROOT:", Path.cwd())
print("OPENAI_API_KEY set?", bool(os.getenv("OPENAI_API_KEY")))
print("OPENROUTER_API_KEY set?", bool(os.getenv("OPENROUTER_API_KEY")))
print("MCP_CALENDAR_SERVER_URL:", os.getenv("MCP_CALENDAR_SERVER_URL", "<unset>"))


In [None]:
# --- core timeboxing imports ---
import asyncio
import types
from datetime import date

from autogen_agentchat.messages import TextMessage

from fateforger.agents.timeboxing.agent import Session, TimeboxingFlowAgent
from fateforger.agents.timeboxing.flow_graph import build_timeboxing_graphflow
from fateforger.agents.timeboxing.patching import TimeboxPatcher
from fateforger.agents.timeboxing.prompt_rendering import render_skeleton_draft_system_prompt
from fateforger.agents.timeboxing.stage_gating import StageDecision, StageGateOutput, TimeboxingStage
from fateforger.agents.timeboxing.timebox import Timebox

print("Imported OK")


## A) GraphFlow: run a turn without LLM/MCP

This reproduces `tests/unit/test_timeboxing_graphflow_state_machine.py` patterns.
We stub the LLM-dependent methods so we can validate routing/transition/presentation.


### Reference tests

- GraphFlow routing: `tests/unit/test_timeboxing_graphflow_state_machine.py`
- Stage-gate payload formatting: `tests/unit/test_timeboxing_stage_gate_json_context.py`
- Skeleton context injection: `tests/unit/test_timeboxing_skeleton_context_injection.py`
- Skeleton fallback: `tests/unit/test_timeboxing_skeleton_fallback.py`
- Constraint extraction background: `tests/unit/test_timeboxing_constraint_extraction_background.py`


In [None]:
def make_stub_orchestrator() -> TimeboxingFlowAgent:
    agent = TimeboxingFlowAgent.__new__(TimeboxingFlowAgent)
    agent._timebox_patcher = TimeboxPatcher()

    async def _noop_calendar(_self, _session: Session, *, timeout_s: float = 0.0) -> None:
        return None

    def _noop_queue_extract(**_kwargs):
        return None

    async def _fake_decide(_self, _session: Session, *, user_message: str) -> StageDecision:
        # For this stub: treat any message containing 'go' as proceed.
        if "go" in (user_message or "").lower():
            return StageDecision(action="proceed")
        return StageDecision(action="provide_info")

    async def _fake_stage_gate(
        _self,
        *,
        stage: TimeboxingStage,
        user_message: str,
        context: dict,
    ) -> StageGateOutput:
        return StageGateOutput(
            stage_id=stage,
            ready=(stage != TimeboxingStage.COLLECT_CONSTRAINTS),
            summary=[f"stub stage: {stage.value}", f"user_message={user_message!r}"] if user_message else [f"stub stage: {stage.value}"],
            missing=[] if stage != TimeboxingStage.COLLECT_CONSTRAINTS else ["timezone", "work_window"],
            question=None if stage != TimeboxingStage.COLLECT_CONSTRAINTS else "What work window are you planning for?",
            facts={},
        )

    agent._ensure_calendar_immovables = types.MethodType(_noop_calendar, agent)
    agent._queue_constraint_extraction = _noop_queue_extract  # type: ignore[assignment]
    agent._decide_next_action = types.MethodType(_fake_decide, agent)
    agent._run_stage_gate = types.MethodType(_fake_stage_gate, agent)
    agent._collect_background_notes = lambda _session: None  # type: ignore[assignment]

    return agent


async def run_graph_turn_stub(user_text: str, *, stage: TimeboxingStage = TimeboxingStage.COLLECT_CONSTRAINTS) -> str:
    agent = make_stub_orchestrator()
    session = Session(thread_ts="local", channel_id="local", user_id="u1", committed=True)
    session.stage = stage
    flow = build_timeboxing_graphflow(orchestrator=agent, session=session)
    presenter: TextMessage | None = None
    async for item in flow.run_stream(task=TextMessage(content=user_text, source="user")):
        if isinstance(item, TextMessage) and item.source == "PresenterNode":
            presenter = item
    return presenter.content if presenter else "<no presenter output>"


print(await run_graph_turn_stub("hi"))
print("\n---\n")
print(await run_graph_turn_stub("go", stage=TimeboxingStage.COLLECT_CONSTRAINTS))


## B) Stage-gate prompt IO formatting (TOON injection)

This checks the string payload sent to stage-gating LLMs.
It should include JSON for scalar facts and TOON tables for list-shaped data.


In [None]:
agent = TimeboxingFlowAgent.__new__(TimeboxingFlowAgent)

payload = agent._format_stage_gate_input(
    stage=TimeboxingStage.COLLECT_CONSTRAINTS,
    context={
        "stage_id": "CollectConstraints",
        "user_message": "I want to plan tomorrow in Amsterdam",
        "facts": {"date": "2026-01-27", "timezone": "Europe/Amsterdam"},
        "immovables": [{"title": "Meeting", "start": "15:00", "end": "16:00"}],
        "durable_constraints": [],
    },
)

print(payload)


## C) SkeletonContext + skeleton system prompt rendering (no LLM)

This exercises the coordinator‚Äôs context builder and the Jinja template renderer.
It should inject constraints/tasks/immovables via TOON.


In [None]:
async def build_context_and_render_prompt() -> str:
    agent = TimeboxingFlowAgent.__new__(TimeboxingFlowAgent)
    agent._constraint_store = None

    async def _noop_calendar(_self, _session: Session, *, timeout_s: float = 0.0) -> None:
        return None

    agent._ensure_calendar_immovables = types.MethodType(_noop_calendar, agent)

    session = Session(thread_ts="t1", channel_id="c1", user_id="u1")
    session.planned_date = "2026-01-27"
    session.tz_name = "Europe/Amsterdam"
    session.frame_facts = {
        "date": session.planned_date,
        "timezone": session.tz_name,
        "work_window": {"start": "09:00", "end": "17:30"},
        "immovables": [{"title": "Meeting", "start": "15:00", "end": "16:00"}],
    }
    session.input_facts = {
        "daily_one_thing": {"title": "Write spec", "block_count": 1},
        "block_plan": {"deep_blocks": 2, "shallow_blocks": 1, "block_minutes": 90},
        "tasks": [
            {"title": "Email cleanup", "block_count": 1, "importance": "low"},
            {"title": "Design review", "block_count": 1, "importance": "high"},
        ],
    }

    ctx = await agent._build_skeleton_context(session)
    return render_skeleton_draft_system_prompt(context=ctx)


print(await build_context_and_render_prompt())


## D) Timebox validation: overlaps + anchoring rules

`Timebox.schedule_and_validate()` fills missing times/durations and rejects overlaps.


In [None]:
from datetime import timedelta, time

from fateforger.agents.schedular.models.calendar import CalendarEvent, EventType

# A valid, non-overlapping minimal schedule
tb_ok = Timebox(
    date=date(2026, 1, 27),
    timezone="Europe/Amsterdam",
    events=[
        CalendarEvent(
            summary="Deep Work",
            event_type=EventType.DEEP_WORK,
            start_time=time(9, 0),
            duration=timedelta(minutes=90),
            calendarId="primary",
            timeZone="Europe/Amsterdam",
        ),
        CalendarEvent(
            summary="Shallow",
            event_type=EventType.SHALLOW_WORK,
            anchor_prev=True,
            duration=timedelta(minutes=30),
            calendarId="primary",
            timeZone="Europe/Amsterdam",
        ),
        CalendarEvent(
            summary="Meeting",
            event_type=EventType.MEETING,
            start_time=time(15, 0),
            end_time=time(16, 0),
            calendarId="primary",
            timeZone="Europe/Amsterdam",
        ),
    ],
)

print("OK timebox events:")
for e in tb_ok.events:
    print(" -", e.summary, e.start_time, "‚Üí", e.end_time, "(DT=", e.duration, ")")


# Overlap example (should raise)
try:
    Timebox(
        date=date(2026, 1, 27),
        timezone="Europe/Amsterdam",
        events=[
            CalendarEvent(
                summary="A",
                event_type=EventType.DEEP_WORK,
                start_time=time(10, 0),
                end_time=time(11, 0),
                calendarId="primary",
                timeZone="Europe/Amsterdam",
            ),
            CalendarEvent(
                summary="B",
                event_type=EventType.MEETING,
                start_time=time(10, 30),
                end_time=time(11, 30),
                calendarId="primary",
                timeZone="Europe/Amsterdam",
            ),
        ],
    )
except Exception as e:
    print("Overlap rejected:", e)


## E) Calendar ‚Üí Timebox baseline (MCP `list-events`)

This is the minimal ‚Äústart from calendar‚Äù primitive: pull a day from one or more calendars and convert it into a `Timebox` (time-of-day events).

Requirements to run:
- `MCP_CALENDAR_SERVER_URL` pointing at your calendar MCP server
- your MCP server already authenticated to Google Calendar

Notes:
- We map any existing calendar events to `EventType.MEETING` by default (good enough for v1).
- All-day events are currently skipped (can be added later).


In [None]:
from __future__ import annotations

import os

from dataclasses import dataclass
from datetime import datetime, date as date_type, time as time_type, timedelta, timezone
from typing import Any, Iterable
from zoneinfo import ZoneInfo

from dateutil import parser as date_parser

from fateforger.agents.schedular.models.calendar import CalendarEvent, EventType
from fateforger.agents.timeboxing.timebox import Timebox


from autogen_ext.tools.mcp import McpWorkbench, StreamableHttpServerParams

from fateforger.core.config import settings


def get_calendar_mcp_server_url() -> str:
    """Return the configured Google Calendar MCP server URL."""
    return settings.mcp_calendar_server_url


def _parse_event_dt(raw: dict[str, Any] | None, *, tz: ZoneInfo) -> datetime | None:
    # Parse a calendar event datetime payload into a timezone-aware datetime.
    if not raw:
        return None
    if raw.get("dateTime"):
        return date_parser.isoparse(raw["dateTime"]).astimezone(tz)
    # all-day events use date; skip for v1
    return None


def _hhmm(t: time_type) -> str:
    return t.strftime("%H:%M")


@dataclass(frozen=True)
class CalendarDayQuery:
    day: date_type
    tz_name: str
    calendar_ids: tuple[str, ...] = ("primary",)


class CalendarDayLoader:
    # Minimal loader: MCP list-events -> Timebox baseline.

    def __init__(self, *, server_url: str, timeout_s: float = 10.0) -> None:
        self._workbench = McpWorkbench(
            StreamableHttpServerParams(url=server_url, timeout=timeout_s)
        )

    async def list_day_events(self, *, query: CalendarDayQuery) -> list[dict[str, Any]]:
        tz = ZoneInfo(query.tz_name)
        start = (
            datetime.combine(query.day, datetime.min.time(), tz)
            .astimezone(timezone.utc)
            .isoformat()
        )
        end = (
            (datetime.combine(query.day, datetime.min.time(), tz) + timedelta(days=1))
            .astimezone(timezone.utc)
            .isoformat()
        )

        out: list[dict[str, Any]] = []
        for calendar_id in query.calendar_ids:
            args = {
                "calendarId": calendar_id,
                "timeMin": start,
                "timeMax": end,
                "singleEvents": True,
                "orderBy": "startTime",
            }
            result = await self._workbench.call_tool("list-events", arguments=args) # TODO: this is the important bit, but we must check with which args to call it so we get the info we want. the info we want is:
            payload = result if isinstance(result, dict) else getattr(result, "content", None)

            items: list[dict[str, Any]] = []
            if isinstance(payload, dict) and isinstance(payload.get("items"), list):
                items = [x for x in payload["items"] if isinstance(x, dict)]
            elif isinstance(payload, list):
                items = [x for x in payload if isinstance(x, dict)]

            for ev in items:
                ev.setdefault("_calendarId", calendar_id)
            out.extend(items)

        return out

    @staticmethod
    def to_timebox(*, query: CalendarDayQuery, events: Iterable[dict[str, Any]]) -> Timebox:
        tz = ZoneInfo(query.tz_name)
        day_events: list[CalendarEvent] = []

        for raw in events:
            if (raw.get("status") or "").lower() == "cancelled":
                continue

            start_dt = _parse_event_dt(raw.get("start"), tz=tz)
            end_dt = _parse_event_dt(raw.get("end"), tz=tz)
            if not start_dt or not end_dt or end_dt <= start_dt:
                continue

            summary = (raw.get("summary") or "Busy").strip() or "Busy"
            calendar_id = raw.get("_calendarId") or raw.get("calendarId") or "primary"
            event_id = raw.get("id") or raw.get("eventId")

            day_events.append(
                CalendarEvent(
                    calendarId=str(calendar_id),
                    eventId=str(event_id) if event_id else None,
                    summary=summary,
                    description=raw.get("description"),
                    location=raw.get("location"),
                    event_type=EventType.MEETING,
                    start_time=_hhmm(start_dt.timetz().replace(tzinfo=None)),
                    end_time=_hhmm(end_dt.timetz().replace(tzinfo=None)),
                    timeZone=query.tz_name,
                )
            )

        day_events.sort(key=lambda e: (e.start_time or time_type.max))
        return Timebox(events=day_events, date=query.day, timezone=query.tz_name)


async def fetch_day_timebox(*, day: str, tz_name: str, calendar_ids: list[str] | None = None) -> Timebox:
    # Fetch the day from MCP and return a baseline Timebox.
    server_url = get_calendar_mcp_server_url()
    if not server_url:
        raise RuntimeError("Set MCP_CALENDAR_SERVER_URL to your calendar MCP server.")

    query = CalendarDayQuery(
        day=date_type.fromisoformat(day),
        tz_name=tz_name,
        calendar_ids=tuple(calendar_ids or ["primary"]),
    )
    loader = CalendarDayLoader(server_url=server_url)
    raw = await loader.list_day_events(query=query)
    return loader.to_timebox(query=query, events=raw)


# Example (uncomment to run):
tb = await fetch_day_timebox(day="2026-01-27", tz_name="Europe/Amsterdam")
print(len(tb.events), "events")
print(tb.model_dump(mode="json"))


0 events
{'events': [], 'date': '2026-01-27', 'timezone': 'Europe/Amsterdam'}


In [9]:
# TODO: do a raw call to the mcp server, noting the args as well (so looking inside of the implemnetation to understand which properties of the calendar event we can fetch)
# TODO: set up the simplest use of the mcp calendar server as possible
from dataclasses import dataclass
from datetime import datetime, date as date_type, time as time_type, timedelta, timezone
from typing import Any, Iterable
from zoneinfo import ZoneInfo

from dateutil import parser as date_parser

from fateforger.agents.schedular.models.calendar import CalendarEvent, EventType
from fateforger.agents.timeboxing.timebox import Timebox


from autogen_ext.tools.mcp import McpWorkbench, StreamableHttpServerParams

from fateforger.core.config import settings


# set up calendar mcp tool

def calendar_workbench() -> McpWorkbench:
    server_url = get_calendar_mcp_server_url()

    return McpWorkbench(
        StreamableHttpServerParams(url=server_url, timeout=10.0)
    )
workbench = calendar_workbench()

# query the workbench
result = await workbench.call_tool(
    "list-events", # TODO: what are all the args we can pass here?
    arguments={
        "calendarId": "primary",
        "timeMin": "2026-01-27T00:00:00Z",
        "timeMax": "2026-01-28T00:00:00Z",
        "singleEvents": True,
        "orderBy": "startTime",
    },
)

raw_json_string = result.result[0].content


In [10]:
from __future__ import annotations

from typing import Any, Optional

from pydantic import BaseModel, ConfigDict, Field


class GCalEventDateTime(BaseModel):
    """Google Calendar event start/end payload (timed or all-day)."""

    model_config = ConfigDict(extra="allow", populate_by_name=True)

    date_time: Optional[str] = Field(default=None, alias="dateTime")  # RFC3339
    date: Optional[str] = None  # YYYY-MM-DD (all-day)
    time_zone: Optional[str] = Field(default=None, alias="timeZone")


class GCalPerson(BaseModel):
    """Google Calendar creator/organizer payload."""

    model_config = ConfigDict(extra="allow", populate_by_name=True)

    email: Optional[str] = None
    self_: Optional[bool] = Field(default=None, alias="self")


class GCalReminders(BaseModel):
    """Google Calendar reminders payload."""

    model_config = ConfigDict(extra="allow", populate_by_name=True)

    use_default: Optional[bool] = Field(default=None, alias="useDefault")
    overrides: Optional[list[dict[str, Any]]] = None


class GCalEvent(BaseModel):
    """Google Calendar event resource (subset + extra passthrough)."""

    model_config = ConfigDict(extra="allow", populate_by_name=True)

    id: str
    summary: Optional[str] = None
    start: GCalEventDateTime
    end: GCalEventDateTime
    status: Optional[str] = None
    html_link: Optional[str] = Field(default=None, alias="htmlLink")
    created: Optional[str] = None
    updated: Optional[str] = None
    creator: Optional[GCalPerson] = None
    organizer: Optional[GCalPerson] = None
    ical_uid: Optional[str] = Field(default=None, alias="iCalUID")
    sequence: Optional[int] = None
    reminders: Optional[GCalReminders] = None
    event_type: Optional[str] = Field(default=None, alias="eventType")
    guests_can_modify: Optional[bool] = Field(default=None, alias="guestsCanModify")
    calendar_id: Optional[str] = Field(default=None, alias="calendarId")
    account_id: Optional[str] = Field(default=None, alias="accountId")


class GCalEventsResponse(BaseModel):
    """List events response shape you pasted."""

    model_config = ConfigDict(extra="allow", populate_by_name=True)

    events: list[GCalEvent]
    total_count: int = Field(alias="totalCount")


# Usage:
resp = GCalEventsResponse.model_validate_json(raw_json_string)
first = resp.events[0]
first.start.date_time, first.start.time_zone


('2026-01-27T09:00:00+01:00', 'Europe/Amsterdam')

In [11]:
resp

GCalEventsResponse(events=[GCalEvent(id='30287b8c333b44c18b3bfbd52f357d4f', summary='morning review', start=GCalEventDateTime(date_time='2026-01-27T09:00:00+01:00', date=None, time_zone='Europe/Amsterdam'), end=GCalEventDateTime(date_time='2026-01-27T09:15:00+01:00', date=None, time_zone='Europe/Amsterdam'), status='confirmed', html_link='https://www.google.com/calendar/event?eid=MzAyODdiOGMzMzNiNDRjMThiM2JmYmQ1MmYzNTdkNGYgaHVnby5ldmVyc0Bt', created='2026-01-26T21:07:30.000Z', updated='2026-01-26T21:07:31.897Z', creator=GCalPerson(email='hugo.evers@gmail.com', self_=True), organizer=GCalPerson(email='hugo.evers@gmail.com', self_=True), ical_uid='30287b8c333b44c18b3bfbd52f357d4f@google.com', sequence=0, reminders=GCalReminders(use_default=True, overrides=None), event_type='default', guests_can_modify=True, calendar_id='primary', account_id='normal')], total_count=1)

In [None]:
# turn that into a Timebox object, assuming those events are now fixed (ie start adn end time determined)

In [12]:
import json
from autogen_ext.tools.mcp import StreamableHttpServerParams, mcp_server_tools

params = StreamableHttpServerParams(url=get_calendar_mcp_server_url(), timeout=10.0)
tools = await mcp_server_tools(params)

tool = next(t for t in tools if t.name == "list-events")
print(json.dumps(tool.schema["parameters"], indent=2))


{
  "type": "object",
  "properties": {
    "account": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "items": {
            "type": "string"
          },
          "maxItems": 10,
          "minItems": 1,
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Account nickname(s) to query (e.g., 'work' or ['work', 'personal']) - the friendly names you gave when connecting accounts. Optional - if omitted, queries all connected accounts and merges results. Use 'list-calendars' to see available accounts.",
      "title": "Account"
    },
    "calendarId": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "items": {
            "type": "string"
          },
          "maxItems": 50,
          "minItems": 1,
          "type": "array"
        }
      ],
      "title": "Calendarid"
    },
    "timeMin": {
      "anyOf": [
        {
     

In [15]:
for tool in tools:
    print(tool.name)

list-calendars
list-events
search-events
get-event
list-colors
create-event
update-event
delete-event
get-freebusy
get-current-time
respond-to-event
manage-accounts


## tools:

list-calendars to get available calendars
list-events to get events for a day
search-events to find events by query
get-event to find events by id
create-event for creating events
update-event for managing events
delete-event for managing events
get-freebusy: for suggesting available slots

In [13]:
# create a single event with an id, then store it to the calendar via MCP, and then retrieve it by id.
create_tool = next(t for t in tools if t.name == "create-event")
print(json.dumps(create_tool.schema["parameters"], indent=2))

{
  "type": "object",
  "properties": {
    "account": {
      "anyOf": [
        {
          "pattern": "^[a-z0-9_-]{1,64}$",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Account nickname to use for this operation (e.g., 'work', 'personal') - the friendly name you gave when connecting the account. Optional when only one account is connected - will auto-select the account with appropriate permissions. Use 'list-calendars' to see available accounts.",
      "title": "Account"
    },
    "calendarId": {
      "description": "ID of the calendar (use 'primary' for the main calendar)",
      "title": "Calendarid",
      "type": "string"
    },
    "eventId": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional custom event ID (5-1024 characters, base32hex encoding: lowercase lette

In [None]:
from __future__ import annotations

import os

from dataclasses import dataclass
from datetime import datetime, date as date_type, time as time_type, timedelta, timezone
from typing import Any, Iterable
from zoneinfo import ZoneInfo

from dateutil import parser as date_parser

from fateforger.agents.schedular.models.calendar import CalendarEvent, EventType
from fateforger.agents.timeboxing.timebox import Timebox


from autogen_ext.tools.mcp import McpWorkbench, StreamableHttpServerParams

from fateforger.core.config import settings


def get_calendar_mcp_server_url() -> str:
    """Return the configured Google Calendar MCP server URL."""
    return settings.mcp_calendar_server_url


def _parse_event_dt(raw: dict[str, Any] | None, *, tz: ZoneInfo) -> datetime | None:
    # Parse a calendar event datetime payload into a timezone-aware datetime.
    if not raw:
        return None
    if raw.get("dateTime"):
        return date_parser.isoparse(raw["dateTime"]).astimezone(tz)
    # all-day events use date; skip for v1
    return None


def _hhmm(t: time_type) -> str:
    return t.strftime("%H:%M")


@dataclass(frozen=True)
class CalendarDayQuery:
    day: date_type
    tz_name: str
    calendar_ids: tuple[str, ...] = ("primary",)


class CalendarDayLoader:
    # Minimal loader: MCP list-events -> Timebox baseline.

    def __init__(self, *, server_url: str, timeout_s: float = 10.0) -> None:
        self._workbench = McpWorkbench(
            StreamableHttpServerParams(url=server_url, timeout=timeout_s)
        )

    async def list_day_events(self, *, query: CalendarDayQuery) -> list[dict[str, Any]]:
        tz = ZoneInfo(query.tz_name)
        start = (
            datetime.combine(query.day, datetime.min.time(), tz)
            .astimezone(timezone.utc)
            .isoformat()
        )
        end = (
            (datetime.combine(query.day, datetime.min.time(), tz) + timedelta(days=1))
            .astimezone(timezone.utc)
            .isoformat()
        )

        out: list[dict[str, Any]] = []
        for calendar_id in query.calendar_ids:
            args = {
                "calendarId": calendar_id,
                "timeMin": start,
                "timeMax": end,
                "singleEvents": True,
                "orderBy": "startTime",
            }
            result = await self._workbench.call_tool("list-events", arguments=args) # TODO: this is the important bit, but we must check with which args to call it so we get the info we want. the info we want is:
            payload = result if isinstance(result, dict) else getattr(result, "content", None)

            items: list[dict[str, Any]] = []
            if isinstance(payload, dict) and isinstance(payload.get("items"), list):
                items = [x for x in payload["items"] if isinstance(x, dict)]
            elif isinstance(payload, list):
                items = [x for x in payload if isinstance(x, dict)]

            for ev in items:
                ev.setdefault("_calendarId", calendar_id)
            out.extend(items)

        return out

    @staticmethod
    def to_timebox(*, query: CalendarDayQuery, events: Iterable[dict[str, Any]]) -> Timebox:
        tz = ZoneInfo(query.tz_name)
        day_events: list[CalendarEvent] = []

        for raw in events:
            if (raw.get("status") or "").lower() == "cancelled":
                continue

            start_dt = _parse_event_dt(raw.get("start"), tz=tz)
            end_dt = _parse_event_dt(raw.get("end"), tz=tz)
            if not start_dt or not end_dt or end_dt <= start_dt:
                continue

            summary = (raw.get("summary") or "Busy").strip() or "Busy"
            calendar_id = raw.get("_calendarId") or raw.get("calendarId") or "primary"
            event_id = raw.get("id") or raw.get("eventId")

            day_events.append(
                CalendarEvent(
                    calendarId=str(calendar_id),
                    eventId=str(event_id) if event_id else None,
                    summary=summary,
                    description=raw.get("description"),
                    location=raw.get("location"),
                    event_type=EventType.MEETING,
                    start_time=_hhmm(start_dt.timetz().replace(tzinfo=None)),
                    end_time=_hhmm(end_dt.timetz().replace(tzinfo=None)),
                    timeZone=query.tz_name,
                )
            )

        day_events.sort(key=lambda e: (e.start_time or time_type.max))
        return Timebox(events=day_events, date=query.day, timezone=query.tz_name)


async def fetch_day_timebox(*, day: str, tz_name: str, calendar_ids: list[str] | None = None) -> Timebox:
    # Fetch the day from MCP and return a baseline Timebox.
    server_url = get_calendar_mcp_server_url()
    if not server_url:
        raise RuntimeError("Set MCP_CALENDAR_SERVER_URL to your calendar MCP server.")

    query = CalendarDayQuery(
        day=date_type.fromisoformat(day),
        tz_name=tz_name,
        calendar_ids=tuple(calendar_ids or ["primary"]),
    )
    loader = CalendarDayLoader(server_url=server_url)
    raw = await loader.list_day_events(query=query)
    return loader.to_timebox(query=query, events=raw)


# Example (uncomment to run):
tb = await fetch_day_timebox(day="2026-01-27", tz_name="Europe/Amsterdam")
print(len(tb.events), "events")
print(tb.model_dump(mode="json"))


0 events
{'events': [], 'date': '2026-01-27', 'timezone': 'Europe/Amsterdam'}


In [16]:
search_tool = next(t for t in tools if t.name == "search-events")
print(json.dumps(search_tool.schema["parameters"], indent=2))

{
  "type": "object",
  "properties": {
    "account": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "items": {
            "type": "string"
          },
          "maxItems": 10,
          "minItems": 1,
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Account nickname(s) to query (e.g., 'work' or ['work', 'personal']) - the friendly names you gave when connecting accounts. Optional - if omitted, queries all connected accounts and merges results. Use 'list-calendars' to see available accounts.",
      "title": "Account"
    },
    "calendarId": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        }
      ],
      "description": "Calendar identifier(s) to search. Accepts calendar IDs or names. Single or multiple calendars supported.",
      

In [17]:
import json

r = await workbench.call_tool(
    "list-events",
    arguments={"calendarId": "primary", "fields": ["__invalid__"]},
)

assert r.is_error
text = r.result[0].content  # string
details = json.loads(text.split("list-events: ", 1)[1])  # JSON list
allowed_fields = details[0]["options"]
allowed_fields


['id',
 'summary',
 'description',
 'start',
 'end',
 'location',
 'attendees',
 'colorId',
 'transparency',
 'extendedProperties',
 'reminders',
 'conferenceData',
 'attachments',
 'status',
 'htmlLink',
 'created',
 'updated',
 'creator',
 'organizer',
 'recurrence',
 'recurringEventId',
 'originalStartTime',
 'visibility',
 'iCalUID',
 'sequence',
 'hangoutLink',
 'anyoneCanAddSelf',
 'guestsCanInviteOthers',
 'guestsCanModify',
 'guestsCanSeeOtherGuests',
 'privateCopy',
 'locked',
 'source',
 'eventType']

## F) Edit the Timebox with `TimeboxPatcher` (LLM)

This uses the repo‚Äôs existing patcher (`src/fateforger/agents/timeboxing/patching.py`).
You can feed free-form instructions and get back a new `Timebox`.

Requires an LLM API key (OpenAI or OpenRouter).


In [None]:
import os

from fateforger.agents.timeboxing.patching import TimeboxPatcher
from fateforger.agents.timeboxing.timebox import Timebox


async def patch_timebox(tb: Timebox, *, instruction: str) -> Timebox:
    if not (os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY")):
        raise RuntimeError("Set OPENAI_API_KEY or OPENROUTER_API_KEY to use the patcher.")

    patcher = TimeboxPatcher()
    return await patcher.apply_patch(
        current=tb,
        user_message=instruction,
        constraints=[],
        actions=[],
    )


# Example (uncomment to run):
# tb0 = await fetch_day_timebox(day="2026-01-27", tz_name="Europe/Amsterdam")
# tb1 = await patch_timebox(tb0, instruction="Add two 90min Deep Work blocks before my first meeting, and a 15min break after each.")
# print(tb1.model_dump(mode="json"))


## G) Batch submit: diff old/new Timebox and apply MCP ops

This is a minimal ‚Äúpowertool‚Äù submitter. For v1 safety it only writes **owned** events:
- create/update/delete for events whose `eventId` starts with `fftimebo0`
- it refuses to update/delete existing calendar meetings (foreign events)

You can relax this later once the permission model + undo history are implemented.


In [None]:
import os

import base64
import hashlib
from dataclasses import dataclass
from datetime import datetime, time as time_type, timezone
from zoneinfo import ZoneInfo

from fateforger.agents.timeboxing.timebox import Timebox


def _base32hex_id(seed: str, *, prefix: str, max_len: int = 64) -> str:
    digest = hashlib.sha1(seed.encode("utf-8")).digest()
    token = base64.b32hexencode(digest).decode("ascii").lower().rstrip("=")
    return (prefix + token)[:max_len]


def ensure_owned_event_ids(tb: Timebox, *, prefix: str = "fftimebo0") -> Timebox:
    # Assign deterministic MCP-compatible eventIds for events missing one.
    payload = tb.model_dump(mode="json")
    events = payload.get("events") or []

    for i, ev in enumerate(events):
        if ev.get("eventId"):
            continue
        seed = f"{payload.get('date')}|{ev.get('summary','event')}|{ev.get('ST') or ev.get('start_time') or ''}|{i}"
        ev["eventId"] = _base32hex_id(seed, prefix=prefix)

    return Timebox.model_validate(payload)


@dataclass(frozen=True)
class CalendarOp:
    op: str  # create|update|delete
    calendar_id: str
    event_id: str
    summary: str | None = None
    start: str | None = None
    end: str | None = None
    time_zone: str | None = None
    color_id: str | None = None
    description: str | None = None


def _event_iso(tb: Timebox, *, t: time_type) -> str:
    tz = ZoneInfo(tb.timezone or "UTC")
    dt_val = datetime.combine(tb.date, t).replace(tzinfo=tz)
    return dt_val.astimezone(timezone.utc).isoformat()


def diff_timeboxes(before: Timebox, after: Timebox, *, owned_prefix: str = "fftimebo0") -> list[CalendarOp]:
    before_by_id = {e.eventId: e for e in before.events if e.eventId}
    after_by_id = {e.eventId: e for e in after.events if e.eventId}
    ops: list[CalendarOp] = []

    # creates (owned only)
    for eid, e in after_by_id.items():
        if eid not in before_by_id and eid.startswith(owned_prefix):
            if not e.start_time or not e.end_time:
                continue
            ops.append(
                CalendarOp(
                    op="create",
                    calendar_id=e.calendarId or "primary",
                    event_id=eid,
                    summary=e.summary,
                    start=_event_iso(after, t=e.start_time),
                    end=_event_iso(after, t=e.end_time),
                    time_zone=after.timezone,
                    color_id=getattr(e, "colorId", None),
                    description=e.description,
                )
            )

    # updates (owned only)
    for eid, e_new in after_by_id.items():
        e_old = before_by_id.get(eid)
        if not e_old or not eid.startswith(owned_prefix):
            continue

        changed = False
        if (e_new.summary or "") != (e_old.summary or ""):
            changed = True
        if e_new.start_time != e_old.start_time or e_new.end_time != e_old.end_time:
            changed = True
        if (e_new.description or "") != (e_old.description or ""):
            changed = True

        if not changed:
            continue
        if not e_new.start_time or not e_new.end_time:
            continue

        ops.append(
            CalendarOp(
                op="update",
                calendar_id=e_new.calendarId or "primary",
                event_id=eid,
                summary=e_new.summary,
                start=_event_iso(after, t=e_new.start_time),
                end=_event_iso(after, t=e_new.end_time),
                time_zone=after.timezone,
                color_id=getattr(e_new, "colorId", None),
                description=e_new.description,
            )
        )

    # deletes (owned only)
    for eid, e_old in before_by_id.items():
        if eid not in after_by_id and eid.startswith(owned_prefix):
            ops.append(
                CalendarOp(
                    op="delete",
                    calendar_id=e_old.calendarId or "primary",
                    event_id=eid,
                )
            )

    return ops


class BatchCalendarSubmitter:
    # Apply calendar ops using MCP create/update/delete in a single batch.

    def __init__(self, *, server_url: str, timeout_s: float = 10.0) -> None:
        self._workbench = McpWorkbench(
            StreamableHttpServerParams(url=server_url, timeout=timeout_s)
        )

    async def apply(self, ops: list[CalendarOp]) -> list[Any]:
        results: list[Any] = []
        for op in ops:
            if op.op == "create":
                results.append(
                    await self._workbench.call_tool(
                        "create-event",
                        arguments={
                            "calendarId": op.calendar_id,
                            "eventId": op.event_id,
                            "summary": op.summary,
                            "start": op.start,
                            "end": op.end,
                            "timeZone": op.time_zone,
                            "colorId": op.color_id,
                            "description": op.description,
                        },
                    )
                )
            elif op.op == "update":
                results.append(
                    await self._workbench.call_tool(
                        "update-event",
                        arguments={
                            "calendarId": op.calendar_id,
                            "eventId": op.event_id,
                            "summary": op.summary,
                            "start": op.start,
                            "end": op.end,
                            "timeZone": op.time_zone,
                            "colorId": op.color_id,
                            "description": op.description,
                        },
                    )
                )
            elif op.op == "delete":
                results.append(
                    await self._workbench.call_tool(
                        "delete-event",
                        arguments={
                            "calendarId": op.calendar_id,
                            "eventId": op.event_id,
                        },
                    )
                )
            else:
                raise ValueError(f"Unknown op: {op.op}")

        return results


# Example end-to-end (uncomment to run):
# tb0 = await fetch_day_timebox(day="2026-01-27", tz_name="Europe/Amsterdam")
# tb1 = await patch_timebox(tb0, instruction="Add a new 90min Deep Work block at 09:00. Leave existing meetings unchanged.")
# tb1 = ensure_owned_event_ids(tb1)
# ops = diff_timeboxes(tb0, tb1)
# print("Planned ops:")
# for op in ops:
#     print(op)
# submitter = BatchCalendarSubmitter(server_url=os.getenv("MCP_CALENDAR_SERVER_URL"))
# res = await submitter.apply(ops)
# print(res)


## H) (Optional) Run the real stage machine with LLM calls

This calls the real `TimeboxingFlowAgent` methods, so you need:
- API key configured (`OPENAI_API_KEY` or `OPENROUTER_API_KEY`)
- optionally an MCP calendar server (otherwise it will just skip/timeout calendar prefetch)

Tip: start by disabling calendar prefetch via a no-op patch, then re-enable later.


In [None]:
# Toggle helper: choose whether to call the MCP calendar server.
# - If False, we stub calendar prefetch to keep runs deterministic.
# - If True, you need MCP_CALENDAR_SERVER_URL configured + calendar auth.
USE_LIVE_CALENDAR = False
print("USE_LIVE_CALENDAR =", USE_LIVE_CALENDAR)


In [None]:
async def run_real_agent_turns(*, planned_date: str = "2026-01-27", tz_name: str = "Europe/Amsterdam") -> None:
    agent = TimeboxingFlowAgent("timeboxing_agent")

    if not USE_LIVE_CALENDAR:
        # Disable calendar reads while testing prompt flow.
        async def _noop_calendar(_self, _session: Session, *, timeout_s: float = 0.0) -> None:
            return None

        agent._ensure_calendar_immovables = types.MethodType(_noop_calendar, agent)

    session = Session(
        thread_ts="local",
        channel_id="local",
        user_id="u1",
        committed=True,
        planned_date=planned_date,
        tz_name=tz_name,
    )
    session.frame_facts.setdefault("date", planned_date)
    session.frame_facts.setdefault("timezone", tz_name)

    for text in [
        "My workday is 9 to 5:30",
        "Daily one thing: write spec. Tasks: email cleanup, design review",
        "go",
    ]:
        reply = await agent._run_graph_turn(session=session, user_text=text)
        print("\nUSER:", text)
        print("AGENT:\n", reply.content)


await run_real_agent_turns()


In [1]:

import json
from slack_sdk.models.blocks import (
    SectionBlock, ContextBlock, ActionsBlock, 
    ButtonElement, MarkdownTextObject, PlainTextObject
)

class SlackEventRenderer:
    def __init__(self, event: "CalendarEvent"):
        self.event = event
        # Ensure we can handle 'draft' or integer IDs
        self.db_id = str(self.event.id) if self.event.id else "draft"

    def _get_action_value(self, **kwargs):
        """Helper to create a stable JSON transport string."""
        data = {"id": self.db_id}
        data.update(kwargs)
        return json.dumps(data)

    def render_full_view(self) -> list:
        st = self.event.start_time.strftime("%H:%M") if self.event.start_time else "--:--"
        et = self.event.end_time.strftime("%H:%M") if self.event.end_time else "--:--"
        
        blocks = [
            SectionBlock(
                text=MarkdownTextObject(text=f"üóìÔ∏è *{self.event.summary}* | `{st}‚Äì{et}`"),
                accessory=ButtonElement(
                    text=PlainTextObject(text="Edit"),
                    action_id="event_open_modal",
                    value=self._get_action_value()
                )
            ),
            # Granular Controls Row
            ActionsBlock(elements=[
                ButtonElement(text=PlainTextObject(text="-15m"), action_id="event_shift", value=self._get_action_value(delta=-15)),
                ButtonElement(text=PlainTextObject(text="+15m"), action_id="event_shift", value=self._get_action_value(delta=15)),
                ButtonElement(
                    text=PlainTextObject(text="Confirm & Sync"),
                    style="primary",
                    action_id="event_sync_google",
                    value=self._get_action_value()
                )
            ])
        ]
        return blocks

    def render_sync_success(self, calendar_url: str):
        return [
            SectionBlock(
                text=MarkdownTextObject(text=f":white_check_mark: *Successfully Synced!*\n{self.event.summary} is now on your calendar."),
                accessory=ButtonElement(
                    text=PlainTextObject(text="View Event"),
                    url=calendar_url,
                    action_id="nav_to_google" # No server-side handler needed for URLs
                )
            )
        ]

    def render_sync_error(self, error_msg: str):
        return [
            SectionBlock(text=MarkdownTextObject(text=f":x: *Sync Failed*\nError: `{error_msg}`")),
            ActionsBlock(elements=[
                ButtonElement(
                    text=PlainTextObject(text="Retry Sync"),
                    style="danger",
                    action_id="event_sync_google",
                    value=self._get_action_value()
                )
            ])
        ]



In [7]:
from __future__ import annotations

import json
from dataclasses import dataclass
from datetime import timedelta, time
from enum import Enum, auto
from typing import Any, Callable, Dict, List, Optional, Sequence

from slack_sdk.models.blocks import ActionsBlock, ContextBlock, SectionBlock
from slack_sdk.models.blocks.block_elements import ButtonElement
from slack_sdk.models.blocks.basic_components import MarkdownTextObject, PlainTextObject


# ----------------------------
# Your event shape (compatible)
# ----------------------------

class UIState(Enum):
    PROPOSED = auto()      # option suggested from free/busy; tweak + confirm
    CONFIRMED = auto()     # event committed; show links + lightweight actions
    STARTING_NOW = auto()  # reminder state; open thread / postpone
    SYNCING = auto()       # transient
    ERROR = auto()         # transient/terminal with retry


class DetailLevel(Enum):
    MIN = auto()
    STANDARD = auto()
    RICH = auto()


@dataclass(frozen=True)
class CardDetailSpec:
    level: DetailLevel = DetailLevel.MIN
    show_location: bool = False
    show_attendees_count: bool = False
    show_attendees_names: bool = False  # careful: can get tall/long
    show_notes_snippet: bool = False
    notes_max_chars: int = 180
    extra_detail_blocks: Optional[Sequence[Callable[[], Any]]] = None  # open-ended hook


# Minimal type hint only; use your real models.
class CalendarEvent:  # pragma: no cover
    id: Any
    summary: str
    event_type: Any  # enum with .name
    start_time: Optional[time]
    end_time: Optional[time]
    duration: Optional[timedelta]
    location: Optional[str]
    attendees: Optional[list]
    notes: Optional[str]


class SlackEventRenderer:
    """
    Renderer = (Header) + (Details per CardDetailSpec) + (State-driven Actions/Status).
    - action_id stays stable for routing
    - per-card context travels in ButtonElement.value as compact JSON
    """

    def __init__(self, event: CalendarEvent, *, session_id: Optional[str] = None):
        self.event = event
        # For demos you can fall back to event.id/"draft".
        # In production, prefer a stable uuid session_id from the moment you propose options.
        self.sid = session_id or (str(getattr(event, "id", None)) if getattr(event, "id", None) is not None else "draft")

    # ---------- transport helpers ----------

    def _val(self, **kwargs: Any) -> str:
        payload = {"sid": self.sid, **kwargs}
        s = json.dumps(payload, separators=(",", ":"), ensure_ascii=False)
        if len(s) > 2000:
            raise ValueError("Slack button value exceeded 2000 chars.")
        return s

    # ---------- formatting helpers ----------

    @property
    def type_emoji(self) -> str:
        emoji_map = {
            "MEETING": "üë•",
            "COMMUTE": "üöó",
            "DEEP_WORK": "üß†",
            "SHALLOW_WORK": "üìù",
            "PLAN_REVIEW": "üîç",
            "HABIT": "‚ú®",
            "REGENERATION": "üîã",
            "BUFFER": "‚è≥",
            "BACKGROUND": "‚òÅÔ∏è",
        }
        name = getattr(getattr(self.event, "event_type", None), "name", None)
        return emoji_map.get(name, "üìÖ")

    def _fmt_time(self, t: Optional[time]) -> str:
        return t.strftime("%H:%M") if t else "--:--"

    def _fmt_duration(self) -> str:
        d = getattr(self.event, "duration", None)
        if not d:
            return "??m"
        mins = max(0, int(d.total_seconds() // 60))
        h, m = divmod(mins, 60)
        return f"{h}h {m}m" if h else f"{m}m"

    def _header_text(self) -> str:
        st = self._fmt_time(getattr(self.event, "start_time", None))
        et = self._fmt_time(getattr(self.event, "end_time", None))
        summary = getattr(self.event, "summary", "(no title)")
        return f"{self.type_emoji} *{summary}* | `{st}‚Äì{et}` ({self._fmt_duration()})"

    # ---------- public API ----------

    def render(
        self,
        state: UIState,
        detail: CardDetailSpec,
        *,
        option_hint: Optional[str] = None,   # e.g. "Option 2/3"
        event_url: Optional[str] = None,     # Google Calendar link after confirm
        thread_url: Optional[str] = None,    # Slack thread permalink
        error_msg: Optional[str] = None,
    ) -> List[Any]:
        blocks: List[Any] = []
        blocks.extend(self._render_header())

        details = self._render_details(detail, event_url=event_url)
        blocks.extend(details)

        status = self._render_status(state, option_hint=option_hint, error_msg=error_msg)
        if status:
            blocks.append(status)

        blocks.append(self._render_actions(state, event_url=event_url, thread_url=thread_url))
        return blocks

    # ---------- sections ----------

    def _render_header(self) -> List[Any]:
        return [
            SectionBlock(
                text=MarkdownTextObject(text=self._header_text()),
                accessory=ButtonElement(
                    text=PlainTextObject(text="Edit"),
                    action_id="event_edit_open",
                    value=self._val(),
                ),
            )
        ]

    def _render_details(self, detail: CardDetailSpec, *, event_url: Optional[str]) -> List[Any]:
        blocks: List[Any] = []

        # Context row: compact metadata
        meta_parts: List[str] = []
        if detail.show_location and getattr(self.event, "location", None):
            meta_parts.append(f"üìç {self.event.location}")

        attendees = getattr(self.event, "attendees", None) or []
        if detail.show_attendees_count and attendees:
            meta_parts.append(f"üë• {len(attendees)} attendee{'s' if len(attendees) != 1 else ''}")

        if detail.show_attendees_names and attendees:
            # Keep it short-ish: Slack context blocks can get cramped.
            shown = attendees[:6]
            suffix = "‚Ä¶" if len(attendees) > 6 else ""
            meta_parts.append("üë• " + ", ".join(map(str, shown)) + suffix)

        if meta_parts and detail.level in (DetailLevel.STANDARD, DetailLevel.RICH):
            blocks.append(ContextBlock(elements=[MarkdownTextObject(text="  ‚Ä¢  ".join(meta_parts))]))

        # Notes snippet: only in RICH (or explicitly enabled)
        notes = getattr(self.event, "notes", None)
        if detail.show_notes_snippet and notes:
            snippet = notes.strip().replace("\n", " ")
            if len(snippet) > detail.notes_max_chars:
                snippet = snippet[: detail.notes_max_chars - 1] + "‚Ä¶"
            blocks.append(SectionBlock(text=MarkdownTextObject(text=f"> {snippet}")))

        # Optional hook for future detail blocks (conference links, organizer, etc.)
        if detail.extra_detail_blocks:
            for fn in detail.extra_detail_blocks:
                b = fn()
                if b is not None:
                    blocks.append(b)

        # Light-touch: if we have an event_url and we‚Äôre in a richer card, show it as text once
        if event_url and detail.level == DetailLevel.RICH:
            blocks.append(ContextBlock(elements=[MarkdownTextObject(text=f"üîó <{event_url}|Open in Google Calendar>")]))

        return blocks

    def _render_status(
        self,
        state: UIState,
        *,
        option_hint: Optional[str],
        error_msg: Optional[str],
    ) -> Optional[Any]:
        if state == UIState.PROPOSED:
            txt = f"üß≠ Suggested slot{f' ‚Äî {option_hint}' if option_hint else ''}"
            return ContextBlock(elements=[MarkdownTextObject(text=txt)])

        if state == UIState.SYNCING:
            return ContextBlock(elements=[MarkdownTextObject(text="‚è≥ Syncing‚Ä¶")])

        if state == UIState.CONFIRMED:
            return ContextBlock(elements=[MarkdownTextObject(text="‚úÖ Confirmed")])

        if state == UIState.STARTING_NOW:
            return ContextBlock(elements=[MarkdownTextObject(text="‚è±Ô∏è Starting now")])

        if state == UIState.ERROR:
            msg = (error_msg or "Unknown error").strip()
            if len(msg) > 160:
                msg = msg[:159] + "‚Ä¶"
            return ContextBlock(elements=[MarkdownTextObject(text=f"‚ö†Ô∏è {msg}")])

        return None

    def _render_actions(self, state: UIState, *, event_url: Optional[str], thread_url: Optional[str]) -> Any:
        elements: List[Any] = []

        if state == UIState.PROPOSED:
            elements.extend(
                [
                    ButtonElement(
                        text=PlainTextObject(text="-15m"),
                        action_id="event_shift",
                        value=self._val(delta_min=-15),
                    ),
                    ButtonElement(
                        text=PlainTextObject(text="+15m"),
                        action_id="event_shift",
                        value=self._val(delta_min=15),
                    ),
                    ButtonElement(
                        text=PlainTextObject(text="Confirm"),
                        style="primary",
                        action_id="event_confirm",
                        value=self._val(),
                    ),
                ]
            )

        elif state == UIState.SYNCING:
            # Keep actions minimal to prevent double submits
            elements.append(
                ButtonElement(
                    text=PlainTextObject(text="Cancel"),
                    action_id="event_cancel_sync",
                    value=self._val(),
                )
            )

        elif state == UIState.CONFIRMED:
            if thread_url:
                elements.append(
                    ButtonElement(
                        text=PlainTextObject(text="Open Thread"),
                        url=thread_url,
                        action_id="event_open_thread",
                    )
                )
            if event_url:
                elements.append(
                    ButtonElement(
                        text=PlainTextObject(text="Go to Event"),
                        url=event_url,
                        action_id="event_open_gcal",
                    )
                )
            elements.append(
                ButtonElement(
                    text=PlainTextObject(text="Postpone 15m"),
                    action_id="event_postpone",
                    value=self._val(delta_min=15),
                )
            )

        elif state == UIState.STARTING_NOW:
            if thread_url:
                elements.append(
                    ButtonElement(
                        text=PlainTextObject(text="Start Session"),
                        style="primary",
                        url=thread_url,
                        action_id="event_start_session",
                    )
                )
            elements.append(
                ButtonElement(
                    text=PlainTextObject(text="Postpone 15m"),
                    action_id="event_postpone",
                    value=self._val(delta_min=15),
                )
            )
            if event_url:
                elements.append(
                    ButtonElement(
                        text=PlainTextObject(text="Go to Event"),
                        url=event_url,
                        action_id="event_open_gcal",
                    )
                )

        elif state == UIState.ERROR:
            elements.append(
                ButtonElement(
                    text=PlainTextObject(text="Retry"),
                    style="danger",
                    action_id="event_confirm",
                    value=self._val(),
                )
            )
            # Keep edit always available via accessory; optionally add "Back"
            elements.append(
                ButtonElement(
                    text=PlainTextObject(text="Back"),
                    action_id="event_back",
                    value=self._val(),
                )
            )

        else:
            elements.append(
                ButtonElement(
                    text=PlainTextObject(text="Edit"),
                    action_id="event_edit_open",
                    value=self._val(),
                )
            )

        return ActionsBlock(elements=elements)


# ----------------------------
# Example usage (runnable)
# ----------------------------

if __name__ == "__main__":
    from datetime import datetime
    from dataclasses import dataclass

    class DummyEventType(Enum):
        PLAN_REVIEW = auto()

    @dataclass
    class DummyCalendarEvent:
        id: Optional[int]
        summary: str
        event_type: DummyEventType
        start_time: Optional[time]
        end_time: Optional[time]
        duration: Optional[timedelta]
        location: Optional[str] = None
        attendees: Optional[list] = None
        notes: Optional[str] = None

    ev = DummyCalendarEvent(
        id=123,
        summary="Daily planning",
        event_type=DummyEventType.PLAN_REVIEW,
        start_time=time(18, 30),
        end_time=time(19, 0),
        duration=timedelta(minutes=30),
        location="Home office",
        attendees=["hugo@biolytics.ai", "madelon@example.com"],
        notes="Decide top 3 outcomes for tomorrow, identify blockers, schedule deep work blocks.",
    )

    renderer = SlackEventRenderer(ev)
    detail = CardDetailSpec(
        level=DetailLevel.STANDARD,
        show_location=True,
        show_attendees_count=True,
        show_notes_snippet=False,
    )

    blocks = renderer.render(
        state=UIState.PROPOSED,
        detail=detail,
        option_hint="Option 1/3",
        event_url=None,
        thread_url=None,
    )

    # Slack SDK block objects -> dicts you can send to chat.postMessage/respond/chat.update
    payload = {"blocks": [b.to_dict() for b in blocks]}
    print(json.dumps(payload, indent=2, ensure_ascii=False))


{
  "blocks": [
    {
      "accessory": {
        "action_id": "event_edit_open",
        "text": {
          "text": "Edit",
          "type": "plain_text"
        },
        "type": "button",
        "value": "{\"sid\":\"123\"}"
      },
      "text": {
        "text": "üîç *Daily planning* | `18:30‚Äì19:00` (30m)",
        "type": "mrkdwn"
      },
      "type": "section"
    },
    {
      "elements": [
        {
          "text": "üìç Home office  ‚Ä¢  üë• 2 attendees",
          "type": "mrkdwn"
        }
      ],
      "type": "context"
    },
    {
      "elements": [
        {
          "text": "üß≠ Suggested slot ‚Äî Option 1/3",
          "type": "mrkdwn"
        }
      ],
      "type": "context"
    },
    {
      "elements": [
        {
          "action_id": "event_shift",
          "text": {
            "text": "-15m",
            "type": "plain_text"
          },
          "type": "button",
          "value": "{\"sid\":\"123\",\"delta_min\":-15}"
        },
      

In [6]:
from datetime import time
from types import SimpleNamespace
import json

# Example event (lightweight stand-in for CalendarEvent)
event = SimpleNamespace(
    id= "evt_123",
    summary="Design Review",
    start_time=time(14, 0),
    end_time=time(15, 0),
    description="Discuss UI updates",
    location="Zoom",
)

renderer = SlackEventRenderer(event)

def _to_dict_safe(b):
    if hasattr(b, "to_dict"):
        return b.to_dict()
    if hasattr(b, "serialize"):
        return b.serialize()
    return str(b)

blocks = renderer.render()
print("Full view blocks JSON:")
print(json.dumps([_to_dict_safe(b) for b in blocks], indent=2, default=str))



AttributeError: 'types.SimpleNamespace' object has no attribute 'duration'

In [None]:

from datetime import datetime, timedelta

@app.action("event_shift")
def handle_shift(ack, payload, respond):
    ack()
    # 1. Parse the JSON 'payload' from the button value
    data = json.loads(payload["value"])
    event_id = data["id"]
    delta_minutes = data["delta"]

    # 2. Logic: Update the ORM model (Simplified example)
    # event = session.get(CalendarEvent, event_id) or draft_logic()
    # event.start_time = ... shift logic ...
    # session.commit()
    
    # 3. Transformation: Re-render the UI with the updated model
    # renderer = SlackEventRenderer(event)
    # respond(blocks=renderer.render_full_view(), replace_original=True)
    print(f"Shifting event {event_id} by {delta_minutes} mins")

@app.action("event_sync_google")
def handle_sync(ack, payload, respond):
    ack()
    data = json.loads(payload["value"])
    
    # Show an immediate 'Processing' state (The First Transformation)
    respond(text=":hourglass_flowing_sand: Syncing with Google Calendar...", replace_original=True)

    try:
        # Simulate API call to Google
        # result = google_api.create_event(...)
        success = True 
        
        if success:
            # Transformation: Success State
            # respond(blocks=renderer.render_sync_success("https://..."), replace_original=True)
            pass
    except Exception as e:
        # Transformation: Error State with Retry button
        # respond(blocks=renderer.render_sync_error(str(e)), replace_original=True)
        pass

