Skip to content

feat: add cron scheduler reminder workflow#855

Merged
ZhuangCY merged 34 commits intomainfrom
feat/cron-scheduler-cli-workflow
Apr 13, 2026
Merged

feat: add cron scheduler reminder workflow#855
ZhuangCY merged 34 commits intomainfrom
feat/cron-scheduler-cli-workflow

Conversation

@wuman001
Copy link
Copy Markdown
Collaborator

Summary

  • add the cron scheduler, store, executor, and cron tool so reminder-style requests run as scheduled jobs instead of blocking the main CLI session
  • add CLI cron management UX including /cron show, /cron inbox, batch enable/disable/remove actions, status-bar unread counts, and clearer task completion rendering
  • persist the latest cron execution summary and surface it in notifications and /cron show, with examples and end-to-end coverage for reminder flows

Test Plan

  • pytest tests/core/agent/test_reminder_cron_e2e.py tests/core/scheduler/test_notifications.py tests/test_slash_commands.py tests/tools/test_cron_tool.py -q

wuman001 added 30 commits April 8, 2026 22:12
Add comprehensive design for Aworld cron/scheduled task capabilities:
- Three scheduling modes (at/every/cron)
- Two execution modes (session continuation/isolated)
- Hybrid storage (project/user/memory)
- Three user interfaces (Agent tool, slash command, CLI subcommand)
- Session continuation with graceful degradation
- Services mode extensibility hooks

Design emphasizes:
- Definition-execution separation (CronJob vs Task)
- Reusing existing Task/TaskRunner infrastructure
- Minimal invasion to core Agent logic
- CLI mode priority, Services mode prepared

Implementation planned in 3 phases (6-8 days):
1. Core functionality (MVP)
2. CLI integration
3. Session continuation mode

Related: brainstorming session on adding openclaw-like cron capabilities
Major revisions based on design review feedback:

**Scope Reduction (MVP Focus):**
- Remove main_session mode (requires heartbeat/system-event semantics)
- Remove delivery/wakeMode/failureAlert fields
- Remove multi-storage backends (file-only)
- Remove aworld-cli cron subcommand (requires CLI parser changes)
- Remove Services mode support (CLI-first approach)

**Critical Fixes:**
- Execution path: Use Runners.run() instead of direct TaskRunner instantiation
- Agent resolution: From aworld_cli.core.agent_registry
- User interface: Fix context.args → context.user_args
- Scheduler lifecycle: Bind to BaseCliRuntime.start()/stop()
- Data model: Simplified to isolated-mode only

**Enhanced Reliability (OpenClaw-inspired):**
- Startup recovery (cleanup stale running, recalculate next_run)
- Atomic file writes with fcntl locking
- Concurrent execution limits (semaphore)
- Timeout protection + exponential backoff retry
- Agent instance caching

**Implementation Plan:**
- Phase 1: Core infrastructure (3-4 days)
- Phase 2: Agent integration (2 days)
- Phase 3: Reliability testing (1-2 days)

Total: 6-8 days for MVP

Design aligns with actual codebase structure and real execution paths.
Add cron scheduling capabilities (isolated mode only, CLI-first):

**New Components:**
- types.py: Data models (CronJob, CronSchedule, CronPayload, CronJobState)
- store.py: FileBasedCronStore with atomic writes + fcntl locking
- executor.py: CronExecutor using Runners.run() for execution
- scheduler.py: CronScheduler with timer loop + startup recovery
- __init__.py: Singleton get_scheduler()

**Features:**
- Three scheduling modes: at (one-time), every (interval), cron (expression)
- Startup recovery: cleanup stale running, recalculate next_run
- Atomic file operations with exclusive locking
- Concurrent execution limits (semaphore)
- Exponential backoff retry on failures
- Timeout protection per job
- Agent instance caching

**Dependencies:**
- croniter>=1.4.0 (cron expression parsing)
- pytz>=2023.3 (timezone support)

**Architecture:**
- Definition-execution separation (CronJob vs Task)
- Reuses Runners.run() for execution (no direct TaskRunner instantiation)
- Agent resolution from aworld_cli.core.agent_registry
- Isolated mode only (session_id=None)

**Not Included (MVP scope):**
- Main session continuation mode
- Delivery semantics
- Multiple storage backends
- Services mode support

Related: #1 Phase 1 implementation
Design: docs/plans/2026-04-08-cron-scheduler-design.md
Add user interfaces for cron scheduler:

**New Files:**
- aworld/tools/builtin/cron_tool.py: Agent tool for task management
- aworld/tools/builtin/__init__.py: Export cron_tool
- aworld-cli/src/aworld_cli/commands/cron_cmd.py: /cron slash command

**Modified Files:**
- aworld-cli/src/aworld_cli/inner_plugins/smllc/agents/aworld_agent.py:
  Add 'cron' to default tool_names
- aworld-cli/src/aworld_cli/commands/__init__.py:
  Register cron_cmd
- aworld-cli/src/aworld_cli/runtime/base.py:
  Bind scheduler lifecycle to CLI runtime (start/stop)

**Features:**
- cron_tool actions: add, list, remove, run, status
- Duration parsing: 30s, 5m, 2h, 1d
- Schedule types: at (one-time), every (interval), cron (expression)
- /cron command: natural language task creation + management
- Silent scheduler startup (no user-facing messages)
- Graceful degradation (scheduler failure doesn't block CLI)

**Usage:**
```bash
aworld-cli
> 每天早上9点运行测试
Agent: [calls cron_tool add] 已创建任务...

> /cron list
[显示任务列表]

> /cron remove job-abc123
```

Related: #2 Phase 2 implementation
Design: docs/plans/2026-04-08-cron-scheduler-design.md
Align code with design doc updates (Implementable MVP):

**Critical Fixes:**
1. Tool location: Move cron_tool.py from aworld/tools/builtin/ to aworld/tools/
   - Reason: Current aworld.tools package scan expects flat structure
   - Removed aworld/tools/builtin/ directory entirely

2. Agent resolution: Use LocalAgentRegistry instead of get_agent_builder()
   - Reason: get_agent_builder() doesn't exist in current codebase
   - Updated executor.py to use LocalAgentRegistry().get() + local_agent.get_swarm()
   - Maintains agent caching for performance

**Design Doc Context (v3.0):**
- Emphasis on matching current codebase reality
- Not equivalent to OpenClaw's daemon-based cron
- Scheduler runs only while CLI is alive
- Missed offline runs are NOT replayed

**Files Changed:**
- aworld/tools/cron_tool.py: Moved from builtin/ subdirectory
- aworld/core/scheduler/executor.py: Use LocalAgentRegistry

Related: Design doc v2.0 → v3.0 updates
…xecution

Critical fix for race condition in scheduler's job triggering logic.

Problem:
- Previous implementation had a race window between checking if a job
  is due and marking it as running
- If two scheduler ticks overlapped, the same job could be triggered
  twice, causing duplicate execution (double emails, duplicate DB writes)

Solution:
- Added FileBasedCronStore.claim_due_job() for atomic claim operation
- Single read-modify-write cycle under fcntl lock ensures only one
  claim succeeds even in concurrent scenarios
- Refactored CronScheduler._schedule_loop to use claim before execution
- Renamed _trigger_job to _execute_claimed_job (semantically correct)

Changes:
- aworld/core/scheduler/store.py: +53 lines (claim_due_job method)
- aworld/core/scheduler/scheduler.py: Refactored scheduling loop
  - Use claim_due_job() instead of direct triggering
  - Remove duplicate mark-as-running logic (now in claim)
  - Reduce post-trigger sleep 1s -> 0.1s (better high-freq precision)
- tests/core/scheduler/test_store.py: +152 lines
  - 5 unit tests for claim semantics
  - Validates single-fire guarantee in concurrent scenarios

Test results: 5/5 passed
- Single-fire: Same job cannot be claimed twice
- Concurrent safety: Only 1 of 5 parallel claims succeeds
- Precondition checks: disabled/not-due/no-next-run jobs cannot claim

Aligns with design document Section 6.2 (claim-due-job requirement)
and Section 7.4 (atomic claim before execution).
…e summaries

Add proactive cron task completion notifications to aworld-cli TUI per design doc v3.1 Section 8.

**Implementation (Phase 1-5):**
1. Notification infrastructure (CronNotificationCenter with FIFO queue)
2. Scheduler integration (notification_sink parameter + _publish_notification())
3. Runtime wiring (notification center creation + drain method)
4. Console rendering (render_cron_notifications() + two drain points)
5. Runtime reference passing (executor._base_runtime for notification access)

**Design compliance:**
- Section 8.4: Fixed-template summaries (no raw error text in notifications)
  - Success: "Cron task completed"
  - Error: "Cron task failed" (directs to /cron list for details)
  - Timeout: "Cron task timed out after Xs"
- Section 8.7: TUI rendering with inline Rich markup, color coding, overflow cap at 3
- Section 8.8: In-memory only (no persistence across restarts)
- Section 8.6: Idle notification poller postponed (technical complexity)

**Key changes:**
- aworld-cli/src/aworld_cli/runtime/cron_notifications.py: CronNotification + CronNotificationCenter
- aworld/core/scheduler/scheduler.py: notification_sink + _publish_notification() + 6 notification calls
- aworld-cli/src/aworld_cli/runtime/base.py: _notification_center + _drain_notifications()
- aworld-cli/src/aworld_cli/console.py: render_cron_notifications() + 2 drain points
- aworld/core/scheduler/store.py: claim_due_job signature update (advance next_run_at atomically)
- tests/core/scheduler/test_notifications.py: 6 unit tests

**Test results:** 37/37 passed (6 new notification tests, 0 regressions)

Test: python -m pytest tests/core/scheduler/test_notifications.py -v
Critical fix for cron executor agent resolution to support TeamSwarm and other multi-agent topologies.

**Problem:**
- Previous implementation extracted a single agent from swarm via swarm.root or agents[0]
- This broke TeamSwarm configurations where the root agent coordinates sub-agents
- Cron jobs using TeamSwarm agents (e.g., Aworld agent) would lose their orchestration structure

**Solution:**
- Renamed _resolve_agent() to _resolve_swarm() (semantically accurate)
- Cache and return the entire swarm instead of extracting a single agent
- Use swarm directly in execute_with_retry() instead of wrapping in new Swarm(agent)
- Made get_swarm() call properly async (await local_agent.get_swarm())

**Impact:**
- Cron jobs now preserve full agent topology (TeamSwarm with sub-agents)
- Enables complex multi-agent cron tasks (e.g., Developer + Evaluator evolution loop)
- No breaking changes for single-agent scenarios

**Changes:**
- aworld/core/scheduler/executor.py:
  - _resolve_agent() -> _resolve_swarm() (async)
  - Cache swarm instead of extracting agent
  - Remove Swarm(agent) wrapper in execute_with_retry()
  - Update docstrings and comments
Add enable/disable actions to cron tool and slash command for more granular job control.

**New capabilities:**
- /cron enable <job_id> - Re-enable a disabled job
- /cron disable <job_id> - Disable job without deletion (preserves job data)

**Use cases:**
- Temporarily pause recurring jobs without losing schedule/payload
- Re-enable jobs after maintenance/testing
- Safer than remove (preserves job ID and history)

**Changes:**
- aworld/tools/cron_tool.py:
  - Add 'enable' and 'disable' to action enum
  - Implement enable/disable logic via scheduler.update_job()
  - Update docstring with new actions and examples

- aworld-cli/src/aworld_cli/commands/cron_cmd.py:
  - Add enable/disable prompt generation
  - Update help text with new commands
  - Add usage examples

**Behavior:**
- enable: Sets enabled=True, job will be scheduled on next tick
- disable: Sets enabled=False, job stops scheduling but remains in storage
- Both show job details before/after operation
Add 26 comprehensive unit tests covering critical scheduler functionality per design doc Section 15.1.

**Test coverage:**

**test_executor.py (10 tests):**
- Swarm resolution and caching
- TeamSwarm preservation (validates previous executor fix)
- Execution success/failure scenarios
- Retry logic with exponential backoff
- Cache isolation between agents
- Error handling for missing agents

**test_scheduler.py (16 tests):**
- Schedule parsing (at/every/cron expressions)
- next_run calculation for all schedule types
- Startup recovery (clears stale running state)
- Recalculate next_run on startup
- One-shot job deletion after run
- Recurring job cadence preservation
- Manual trigger does not corrupt schedule
- Semaphore respect for concurrent execution
- Overdue task selection (most overdue first)
- Disabled job handling

**Critical validations:**
- Overdue tasks are selected and can be claimed/executed
- Most overdue task is prioritized (FIFO with negative wait_seconds)
- Manual run preserves recurring cadence
- Startup recovery prevents zombie jobs

**Test results:** 26/26 passed (137 deprecation warnings from datetime.utcnow)

Test: python -m pytest tests/core/scheduler/ -v
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive cron scheduling system to Aworld, allowing for the management of one-time and recurring tasks via a new /cron command and an agent-integrated cron tool. The system includes persistent file-based storage, a background scheduler, and a TUI notification center with status bar integration. Feedback focuses on enhancing reliability and UI stability: specifically, resolving a race condition in the storage layer by using unique temporary filenames, preventing terminal output corruption by properly synchronizing background printing with the interactive prompt, and ensuring data integrity by awaiting in-flight tasks during scheduler shutdown.

Comment on lines +130 to +141
temp_file = self.file_path.with_suffix('.tmp')

try:
with open(temp_file, "w") as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX) # Exclusive lock
try:
json.dump(data, f, indent=2, ensure_ascii=False)
finally:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)

# Atomic replace
temp_file.replace(self.file_path)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The use of a fixed temporary file name (.tmp) combined with open(temp_file, "w") creates a race condition for cross-process synchronization. If two processes attempt to write simultaneously, the second process will truncate the temporary file while the first process is still writing to it, even if flock is used later. Furthermore, the lock on the temporary file does not synchronize the read-modify-write cycle of the main file. To ensure cross-process safety, use a unique temporary file name (e.g., via tempfile.NamedTemporaryFile) and coordinate the entire read-modify-write operation using a dedicated lock file (e.g., cron.json.lock).

Comment on lines +1282 to +1288
if not self._is_agent_executing:
notifications = await self._drain_notifications_safe(runtime)
if notifications:
# Print above current prompt line
self.console.print() # Add newline before notifications
self.render_cron_notifications(notifications)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _notification_poller background task prints notifications directly to the console using self.console.print() while the user may be interacting with the prompt. Since aworld-cli uses prompt_toolkit for the interactive session, printing directly to stdout without synchronization can garble the prompt display or cause layout issues. It is recommended to use prompt_toolkit.patch_stdout.patch_stdout or the application's print_above_prompt mechanism to ensure that background output is rendered correctly above the active input line.

Comment on lines +74 to +89
async def stop(self):
"""Stop scheduler."""
if not self.running:
return

logger.info("Stopping cron scheduler...")
self.running = False

if self._timer_task:
self._timer_task.cancel()
try:
await self._timer_task
except asyncio.CancelledError:
pass

logger.info("Cron scheduler stopped")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The stop method cancels the main timer loop but does not wait for in-flight background job execution tasks started via asyncio.create_task in _schedule_loop. This can lead to jobs being abruptly terminated mid-execution when the CLI exits, potentially leaving the persistent store in an inconsistent state or missing terminal notifications. Consider maintaining a set of active tasks and awaiting them during the shutdown sequence.

@ZhuangCY ZhuangCY merged commit fec96b8 into main Apr 13, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants