perf: merge 3 cron tools into 1 cron manage tool, and add edit capability for cron tool.#7445
perf: merge 3 cron tools into 1 cron manage tool, and add edit capability for cron tool.#7445
Conversation
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- The unified
FutureTaskToolschema only requiresaction, butcreateanddeletepaths also requirenoteandjob_idrespectively; consider expressing this via JSON Schema conditionals (e.g.,anyOfwith per-actionrequiredfields) so the model gets clearer parameter constraints and validation earlier. - The new
listaction drops the previousjob_typefilter capability; if that filter is still useful in some scenarios, consider keeping it as an optional parameter and passing it through tocron_mgr.list_jobs(job_type)rather than removing it entirely.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The unified `FutureTaskTool` schema only requires `action`, but `create` and `delete` paths also require `note` and `job_id` respectively; consider expressing this via JSON Schema conditionals (e.g., `anyOf` with per-action `required` fields) so the model gets clearer parameter constraints and validation earlier.
- The new `list` action drops the previous `job_type` filter capability; if that filter is still useful in some scenarios, consider keeping it as an optional parameter and passing it through to `cron_mgr.list_jobs(job_type)` rather than removing it entirely.
## Individual Comments
### Comment 1
<location path="astrbot/core/tools/cron_tools.py" line_range="35-39" />
<code_context>
"type": "string",
- "description": "Cron expression defining recurring schedule (e.g., '0 8 * * *' or '0 23 * * mon-fri'). Prefer named weekdays like 'mon-fri' or 'sat,sun' instead of numeric day-of-week ranges such as '1-5' to avoid ambiguity across cron implementations.",
+ "enum": ["create", "delete", "list"],
+ "description": "Action to perform. 'list' takes no parameters. 'delete' requires only 'job_id'.",
},
- "run_at": {
</code_context>
<issue_to_address>
**suggestion:** The action description for 'list' is slightly misleading because 'action' itself is still required.
The text "'list' takes no parameters" contradicts the schema, which still requires `action`. This may mislead consumers into omitting `action` when listing. Please rephrase (e.g., "'list' takes no additional parameters") to align with the actual contract.
```suggestion
"action": {
"type": "string",
"enum": ["create", "delete", "list"],
"description": "Action to perform. 'list' takes no additional parameters beyond 'action'. 'delete' requires only 'job_id' in addition to 'action'.",
},
```
</issue_to_address>
### Comment 2
<location path="tests/unit/test_cron_tools.py" line_range="6-15" />
<code_context>
-def test_create_future_task_cron_description_prefers_named_weekdays():
- """The cron tool should steer users toward unambiguous named weekdays."""
- tool = CreateActiveCronTool()
+def test_future_task_schema_has_action_and_create_cron_guidance():
+ """The merged tool should expose action routing and unambiguous cron guidance."""
+ tool = FutureTaskTool()
</code_context>
<issue_to_address>
**suggestion (testing):** Add behavioral tests for FutureTaskTool.call covering create/delete/list flows and error handling
This refactor consolidates three tools into one with `action`-based branching, but the tests only cover schema/metadata. To validate behavior and prevent regressions, add async tests that directly exercise `FutureTaskTool.call`, including:
- `action='create'` for both cron (`cron_expression`) and one-time (`run_at` + `run_once=True`) flows, asserting the returned text and that `cron_mgr.add_active_job` is called with correct args.
- Error paths: missing `note`, missing/invalid `run_at` when `run_once=True`, missing `cron_expression` when `run_once=False`, and invalid/empty `action` (hitting the final validation error).
- `action='delete'` for existing job, non-existent job, and job from another session.
- `action='list'` with no jobs vs multiple jobs across sessions to verify `unified_msg_origin` filtering.
- The `cron_manager is None` branch returning the expected error.
A stubbed `ContextWrapper` and `cron_mgr` with async mocks for `add_active_job`, `list_jobs`, `delete_job`, and `db.get_cron_job` should keep these tests fast and isolated.
Suggested implementation:
```python
"""Tests for cron tool metadata and behavior."""
from unittest.mock import AsyncMock
import pytest
from astrbot.core.tools.cron_tools import FutureTaskTool
def test_future_task_schema_has_action_and_create_cron_guidance():
"""The merged tool should expose action routing and unambiguous cron guidance."""
tool = FutureTaskTool()
assert tool.name == "future_task"
assert tool.parameters["required"] == ["action"]
assert tool.parameters["properties"]["action"]["enum"] == [
"create",
"delete",
"list",
]
# The description should steer toward unambiguous named weekdays for cron expressions.
description = tool.parameters["properties"]["cron_expression"]["description"]
assert "named weekdays" in description or "Mon" in description or "Tue" in description
class StubDB:
"""Minimal stub of the DB layer used by the cron tools."""
def __init__(self) -> None:
self.get_cron_job = AsyncMock()
class StubCronManager:
"""Stubbed cron manager exposing async methods used by FutureTaskTool."""
def __init__(self) -> None:
self.add_active_job = AsyncMock()
self.list_jobs = AsyncMock()
self.delete_job = AsyncMock()
class StubContext:
"""Minimal ContextWrapper stub for exercising FutureTaskTool.call."""
def __init__(
self,
*,
session_id: str = "session-1",
unified_msg_origin: str = "origin-1",
cron_manager: StubCronManager | None = None,
db: StubDB | None = None,
) -> None:
self.session_id = session_id
self.unified_msg_origin = unified_msg_origin
self.cron_manager = cron_manager
self.db = db or StubDB()
@pytest.mark.asyncio
async def test_future_task_create_cron_calls_add_active_job():
"""action='create' with cron_expression should call cron_mgr.add_active_job."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
params = {
"action": "create",
"note": "Test recurring job",
"cron_expression": "0 0 * * MON",
"run_once": False,
}
result = await tool.call(ctx, **params)
# Tool should call into the cron manager to create an active job.
cron_mgr.add_active_job.assert_awaited()
assert result is not None
@pytest.mark.asyncio
async def test_future_task_create_run_once_calls_add_active_job_with_run_at():
"""action='create' with run_once=True and run_at should create a one-time job."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
params = {
"action": "create",
"note": "One-time job",
"run_once": True,
"run_at": "2099-01-01T10:00:00Z",
}
result = await tool.call(ctx, **params)
cron_mgr.add_active_job.assert_awaited()
assert result is not None
@pytest.mark.asyncio
async def test_future_task_create_missing_note_raises_validation_error():
"""Creating a job without note should hit validation / error handling."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
params = {
"action": "create",
"cron_expression": "0 0 * * MON",
"run_once": False,
}
with pytest.raises(Exception):
await tool.call(ctx, **params)
@pytest.mark.asyncio
async def test_future_task_create_run_once_missing_run_at_errors():
"""run_once=True without run_at should trigger an error."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
params = {
"action": "create",
"note": "Missing run_at",
"run_once": True,
# no run_at
}
with pytest.raises(Exception):
await tool.call(ctx, **params)
@pytest.mark.asyncio
async def test_future_task_create_cron_missing_cron_expression_errors():
"""run_once=False without cron_expression should trigger an error."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
params = {
"action": "create",
"note": "Missing cron_expression",
"run_once": False,
# no cron_expression
}
with pytest.raises(Exception):
await tool.call(ctx, **params)
@pytest.mark.asyncio
async def test_future_task_invalid_action_hits_validation_error():
"""Invalid or empty action should hit the final validation error branch."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
params = {
"action": "not-a-valid-action",
"note": "Does not matter",
}
with pytest.raises(Exception):
await tool.call(ctx, **params)
@pytest.mark.asyncio
async def test_future_task_delete_existing_job_calls_delete_job():
"""Deleting an existing job should call cron_mgr.delete_job."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
# Simulate DB returning a job owned by this session.
job_id = "job-1"
ctx.db.get_cron_job.return_value = {
"id": job_id,
"session_id": ctx.session_id,
"unified_msg_origin": ctx.unified_msg_origin,
}
params = {
"action": "delete",
"job_id": job_id,
}
result = await tool.call(ctx, **params)
ctx.db.get_cron_job.assert_awaited()
cron_mgr.delete_job.assert_awaited()
assert result is not None
@pytest.mark.asyncio
async def test_future_task_delete_nonexistent_job_returns_error():
"""Deleting a nonexistent job should not call delete_job but handle error."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
# DB returns no job for this id.
ctx.db.get_cron_job.return_value = None
params = {
"action": "delete",
"job_id": "missing-job",
}
result = await tool.call(ctx, **params)
ctx.db.get_cron_job.assert_awaited()
cron_mgr.delete_job.assert_not_awaited()
assert result is not None
@pytest.mark.asyncio
async def test_future_task_delete_other_session_job_returns_error():
"""Deleting a job from another session should be rejected."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
# Job belongs to a different session/origin.
ctx.db.get_cron_job.return_value = {
"id": "job-2",
"session_id": "other-session",
"unified_msg_origin": "other-origin",
}
params = {
"action": "delete",
"job_id": "job-2",
}
result = await tool.call(ctx, **params)
ctx.db.get_cron_job.assert_awaited()
cron_mgr.delete_job.assert_not_awaited()
assert result is not None
@pytest.mark.asyncio
async def test_future_task_list_filters_by_unified_msg_origin():
"""Listing should only show jobs matching the current unified_msg_origin/session."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
cron_mgr.list_jobs.return_value = [
{
"id": "job-1",
"session_id": ctx.session_id,
"unified_msg_origin": ctx.unified_msg_origin,
},
{
"id": "job-2",
"session_id": "other-session",
"unified_msg_origin": "other-origin",
},
]
params = {
"action": "list",
}
result = await tool.call(ctx, **params)
cron_mgr.list_jobs.assert_awaited()
assert result is not None
@pytest.mark.asyncio
async def test_future_task_list_with_no_jobs_is_handled():
"""Listing when there are no jobs for this context should be handled gracefully."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
cron_mgr.list_jobs.return_value = []
params = {
"action": "list",
}
result = await tool.call(ctx, **params)
cron_mgr.list_jobs.assert_awaited()
assert result is not None
@pytest.mark.asyncio
async def test_future_task_cron_manager_none_returns_expected_error():
"""When cron_manager is None, call should return or raise an appropriate error."""
tool = FutureTaskTool()
ctx = StubContext(cron_manager=None)
params = {
"action": "list",
}
with pytest.raises(Exception):
await tool.call(ctx, **params)
```
These tests assume:
1. `FutureTaskTool.call` is defined as `async def call(self, ctx, **kwargs)` and accepts a positional context followed by keyword parameters matching the tool schema. If the actual signature differs (e.g., `call(self, context, params: dict)`), update all `await tool.call(ctx, **params)` invocations accordingly.
2. The tool uses `ctx.cron_manager.add_active_job`, `ctx.cron_manager.list_jobs`, `ctx.cron_manager.delete_job`, and `ctx.db.get_cron_job` as async functions. If these are accessed differently in your implementation (e.g., via `ctx.cron_mgr`, or `ctx.cron_manager.db.get_cron_job`), adjust the stub attribute names and where `StubDB` is attached.
3. Error paths currently use broad `with pytest.raises(Exception)` and minimal assertions on `result`. Once you confirm the specific exception types and return formats/messages, you can tighten the assertions to:
- Expect concrete exception classes (e.g., `ValueError` or a custom tool error).
- Assert that success results contain the expected user-facing text (e.g., including `"created"`, `"deleted"`, `"not found"`).
4. The `list` tests only assert that `list_jobs` is awaited and that a non-`None` result is returned. If `FutureTaskTool` formats job IDs or notes into the returned text, refine the tests to assert on inclusion/exclusion of specific job identifiers (for the same vs other sessions) based on the actual formatting.
</issue_to_address>
### Comment 3
<location path="astrbot/core/tools/cron_tools.py" line_range="23" />
<code_context>
@dataclass
-class CreateActiveCronTool(FunctionTool[AstrAgentContext]):
- name: str = "create_future_task"
+class FutureTaskTool(FunctionTool[AstrAgentContext]):
+ name: str = "future_task"
description: str = (
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the per-action logic in `FutureTaskTool.call` into dedicated helper methods and optionally dispatching via a handler map to keep the external API unified while simplifying the internal control flow.
You can keep the unified `future_task` surface and significantly reduce complexity by pushing the per-action logic into small helpers and using a simple dispatcher. This keeps the external API intact while restoring separation of concerns.
### 1. Extract per-action handlers
Move the three branches into focused private methods. This shrinks `call` and localizes logic:
```python
@dataclass
class FutureTaskTool(FunctionTool[AstrAgentContext]):
name: str = "future_task"
# ... parameters unchanged ...
async def call(
self, context: ContextWrapper[AstrAgentContext], **kwargs
) -> ToolExecResult:
cron_mgr = context.context.context.cron_manager
if cron_mgr is None:
return "error: cron manager is not available."
action = str(kwargs.get("action") or "").strip().lower()
current_umo = context.context.event.unified_msg_origin
if action == "create":
return await self._handle_create(context, cron_mgr, **kwargs)
if action == "delete":
return await self._handle_delete(context, cron_mgr, current_umo, **kwargs)
if action == "list":
return await self._handle_list(cron_mgr, current_umo)
return "error: action must be one of create, delete, or list."
```
Then reuse the existing code you already wrote, just moved into helpers:
```python
async def _handle_create(
self,
context: ContextWrapper[AstrAgentContext],
cron_mgr,
**kwargs,
) -> ToolExecResult:
cron_expression = kwargs.get("cron_expression")
run_at = kwargs.get("run_at")
run_once = bool(kwargs.get("run_once", False))
note = str(kwargs.get("note", "")).strip()
name = str(kwargs.get("name") or "").strip() or "active_agent_task"
if not note:
return "error: note is required when action=create."
if run_once and not run_at:
return "error: run_at is required when run_once=true."
if (not run_once) and not cron_expression:
return "error: cron_expression is required when run_once=false."
if run_once and cron_expression:
cron_expression = None
run_at_dt = None
if run_at:
try:
run_at_dt = datetime.fromisoformat(str(run_at))
except Exception:
return "error: run_at must be ISO datetime, e.g., 2026-02-02T08:00:00+08:00"
payload = {
"session": context.context.event.unified_msg_origin,
"sender_id": context.context.event.get_sender_id(),
"note": note,
"origin": "tool",
}
job = await cron_mgr.add_active_job(
name=name,
cron_expression=str(cron_expression) if cron_expression else None,
payload=payload,
description=note,
run_once=run_once,
run_at=run_at_dt,
)
next_run = job.next_run_time or run_at_dt
suffix = (
f"one-time at {next_run}"
if run_once
else f"expression '{cron_expression}' (next {next_run})"
)
return f"Scheduled future task {job.job_id} ({job.name}) {suffix}."
async def _handle_delete(
self,
context: ContextWrapper[AstrAgentContext],
cron_mgr,
current_umo: str,
**kwargs,
) -> ToolExecResult:
job_id = kwargs.get("job_id")
if not job_id:
return "error: job_id is required when action=delete."
job = await cron_mgr.db.get_cron_job(str(job_id))
if not job:
return f"error: cron job {job_id} not found."
if _extract_job_session(job) != current_umo:
return "error: you can only delete future tasks in the current umo."
await cron_mgr.delete_job(str(job_id))
return f"Deleted cron job {job_id}."
async def _handle_list(
self,
cron_mgr,
current_umo: str,
) -> ToolExecResult:
jobs = [
job
for job in await cron_mgr.list_jobs()
if _extract_job_session(job) == current_umo
]
if not jobs:
return "No cron jobs found."
lines = [
f"{j.job_id} | {j.name} | {j.job_type} | "
f"run_once={getattr(j, 'run_once', False)} | "
f"enabled={j.enabled} | next={j.next_run_time}"
for j in jobs
]
return "\n".join(lines)
```
This preserves all behavior, but:
- `call` becomes a thin dispatcher with a simple mental model.
- Each action’s validation and behavior is isolated, easier to test/change.
### 2. Optional: use a small dispatcher map
If you want to further reduce branching in `call`, you can use a mapping (still keeping helpers):
```python
async def call(self, context: ContextWrapper[AstrAgentContext], **kwargs):
cron_mgr = context.context.context.cron_manager
if cron_mgr is None:
return "error: cron manager is not available."
action = str(kwargs.get("action") or "").strip().lower()
current_umo = context.context.event.unified_msg_origin
handlers = {
"create": lambda: self._handle_create(context, cron_mgr, **kwargs),
"delete": lambda: self._handle_delete(context, cron_mgr, current_umo, **kwargs),
"list": lambda: self._handle_list(cron_mgr, current_umo),
}
handler = handlers.get(action)
if not handler:
return "error: action must be one of create, delete, or list."
return await handler()
```
This keeps the single tool name and unified schema but restores small, single-responsibility units internally, directly addressing the “monolithic `call`” and “stringly-typed dispatcher” concerns without reverting your design.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| "action": { | ||
| "type": "string", | ||
| "description": "Cron expression defining recurring schedule (e.g., '0 8 * * *' or '0 23 * * mon-fri'). Prefer named weekdays like 'mon-fri' or 'sat,sun' instead of numeric day-of-week ranges such as '1-5' to avoid ambiguity across cron implementations.", | ||
| "enum": ["create", "delete", "list"], | ||
| "description": "Action to perform. 'list' takes no parameters. 'delete' requires only 'job_id'.", | ||
| }, |
There was a problem hiding this comment.
suggestion: The action description for 'list' is slightly misleading because 'action' itself is still required.
The text "'list' takes no parameters" contradicts the schema, which still requires action. This may mislead consumers into omitting action when listing. Please rephrase (e.g., "'list' takes no additional parameters") to align with the actual contract.
| "action": { | |
| "type": "string", | |
| "description": "Cron expression defining recurring schedule (e.g., '0 8 * * *' or '0 23 * * mon-fri'). Prefer named weekdays like 'mon-fri' or 'sat,sun' instead of numeric day-of-week ranges such as '1-5' to avoid ambiguity across cron implementations.", | |
| "enum": ["create", "delete", "list"], | |
| "description": "Action to perform. 'list' takes no parameters. 'delete' requires only 'job_id'.", | |
| }, | |
| "action": { | |
| "type": "string", | |
| "enum": ["create", "delete", "list"], | |
| "description": "Action to perform. 'list' takes no additional parameters beyond 'action'. 'delete' requires only 'job_id' in addition to 'action'.", | |
| }, |
| def test_future_task_schema_has_action_and_create_cron_guidance(): | ||
| """The merged tool should expose action routing and unambiguous cron guidance.""" | ||
| tool = FutureTaskTool() | ||
|
|
||
| assert tool.name == "future_task" | ||
| assert tool.parameters["required"] == ["action"] | ||
| assert tool.parameters["properties"]["action"]["enum"] == [ | ||
| "create", | ||
| "delete", | ||
| "list", |
There was a problem hiding this comment.
suggestion (testing): Add behavioral tests for FutureTaskTool.call covering create/delete/list flows and error handling
This refactor consolidates three tools into one with action-based branching, but the tests only cover schema/metadata. To validate behavior and prevent regressions, add async tests that directly exercise FutureTaskTool.call, including:
action='create'for both cron (cron_expression) and one-time (run_at+run_once=True) flows, asserting the returned text and thatcron_mgr.add_active_jobis called with correct args.- Error paths: missing
note, missing/invalidrun_atwhenrun_once=True, missingcron_expressionwhenrun_once=False, and invalid/emptyaction(hitting the final validation error). action='delete'for existing job, non-existent job, and job from another session.action='list'with no jobs vs multiple jobs across sessions to verifyunified_msg_originfiltering.- The
cron_manager is Nonebranch returning the expected error.
A stubbed ContextWrapper and cron_mgr with async mocks for add_active_job, list_jobs, delete_job, and db.get_cron_job should keep these tests fast and isolated.
Suggested implementation:
"""Tests for cron tool metadata and behavior."""
from unittest.mock import AsyncMock
import pytest
from astrbot.core.tools.cron_tools import FutureTaskTool
def test_future_task_schema_has_action_and_create_cron_guidance():
"""The merged tool should expose action routing and unambiguous cron guidance."""
tool = FutureTaskTool()
assert tool.name == "future_task"
assert tool.parameters["required"] == ["action"]
assert tool.parameters["properties"]["action"]["enum"] == [
"create",
"delete",
"list",
]
# The description should steer toward unambiguous named weekdays for cron expressions.
description = tool.parameters["properties"]["cron_expression"]["description"]
assert "named weekdays" in description or "Mon" in description or "Tue" in description
class StubDB:
"""Minimal stub of the DB layer used by the cron tools."""
def __init__(self) -> None:
self.get_cron_job = AsyncMock()
class StubCronManager:
"""Stubbed cron manager exposing async methods used by FutureTaskTool."""
def __init__(self) -> None:
self.add_active_job = AsyncMock()
self.list_jobs = AsyncMock()
self.delete_job = AsyncMock()
class StubContext:
"""Minimal ContextWrapper stub for exercising FutureTaskTool.call."""
def __init__(
self,
*,
session_id: str = "session-1",
unified_msg_origin: str = "origin-1",
cron_manager: StubCronManager | None = None,
db: StubDB | None = None,
) -> None:
self.session_id = session_id
self.unified_msg_origin = unified_msg_origin
self.cron_manager = cron_manager
self.db = db or StubDB()
@pytest.mark.asyncio
async def test_future_task_create_cron_calls_add_active_job():
"""action='create' with cron_expression should call cron_mgr.add_active_job."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
params = {
"action": "create",
"note": "Test recurring job",
"cron_expression": "0 0 * * MON",
"run_once": False,
}
result = await tool.call(ctx, **params)
# Tool should call into the cron manager to create an active job.
cron_mgr.add_active_job.assert_awaited()
assert result is not None
@pytest.mark.asyncio
async def test_future_task_create_run_once_calls_add_active_job_with_run_at():
"""action='create' with run_once=True and run_at should create a one-time job."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
params = {
"action": "create",
"note": "One-time job",
"run_once": True,
"run_at": "2099-01-01T10:00:00Z",
}
result = await tool.call(ctx, **params)
cron_mgr.add_active_job.assert_awaited()
assert result is not None
@pytest.mark.asyncio
async def test_future_task_create_missing_note_raises_validation_error():
"""Creating a job without note should hit validation / error handling."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
params = {
"action": "create",
"cron_expression": "0 0 * * MON",
"run_once": False,
}
with pytest.raises(Exception):
await tool.call(ctx, **params)
@pytest.mark.asyncio
async def test_future_task_create_run_once_missing_run_at_errors():
"""run_once=True without run_at should trigger an error."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
params = {
"action": "create",
"note": "Missing run_at",
"run_once": True,
# no run_at
}
with pytest.raises(Exception):
await tool.call(ctx, **params)
@pytest.mark.asyncio
async def test_future_task_create_cron_missing_cron_expression_errors():
"""run_once=False without cron_expression should trigger an error."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
params = {
"action": "create",
"note": "Missing cron_expression",
"run_once": False,
# no cron_expression
}
with pytest.raises(Exception):
await tool.call(ctx, **params)
@pytest.mark.asyncio
async def test_future_task_invalid_action_hits_validation_error():
"""Invalid or empty action should hit the final validation error branch."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
params = {
"action": "not-a-valid-action",
"note": "Does not matter",
}
with pytest.raises(Exception):
await tool.call(ctx, **params)
@pytest.mark.asyncio
async def test_future_task_delete_existing_job_calls_delete_job():
"""Deleting an existing job should call cron_mgr.delete_job."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
# Simulate DB returning a job owned by this session.
job_id = "job-1"
ctx.db.get_cron_job.return_value = {
"id": job_id,
"session_id": ctx.session_id,
"unified_msg_origin": ctx.unified_msg_origin,
}
params = {
"action": "delete",
"job_id": job_id,
}
result = await tool.call(ctx, **params)
ctx.db.get_cron_job.assert_awaited()
cron_mgr.delete_job.assert_awaited()
assert result is not None
@pytest.mark.asyncio
async def test_future_task_delete_nonexistent_job_returns_error():
"""Deleting a nonexistent job should not call delete_job but handle error."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
# DB returns no job for this id.
ctx.db.get_cron_job.return_value = None
params = {
"action": "delete",
"job_id": "missing-job",
}
result = await tool.call(ctx, **params)
ctx.db.get_cron_job.assert_awaited()
cron_mgr.delete_job.assert_not_awaited()
assert result is not None
@pytest.mark.asyncio
async def test_future_task_delete_other_session_job_returns_error():
"""Deleting a job from another session should be rejected."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
# Job belongs to a different session/origin.
ctx.db.get_cron_job.return_value = {
"id": "job-2",
"session_id": "other-session",
"unified_msg_origin": "other-origin",
}
params = {
"action": "delete",
"job_id": "job-2",
}
result = await tool.call(ctx, **params)
ctx.db.get_cron_job.assert_awaited()
cron_mgr.delete_job.assert_not_awaited()
assert result is not None
@pytest.mark.asyncio
async def test_future_task_list_filters_by_unified_msg_origin():
"""Listing should only show jobs matching the current unified_msg_origin/session."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
cron_mgr.list_jobs.return_value = [
{
"id": "job-1",
"session_id": ctx.session_id,
"unified_msg_origin": ctx.unified_msg_origin,
},
{
"id": "job-2",
"session_id": "other-session",
"unified_msg_origin": "other-origin",
},
]
params = {
"action": "list",
}
result = await tool.call(ctx, **params)
cron_mgr.list_jobs.assert_awaited()
assert result is not None
@pytest.mark.asyncio
async def test_future_task_list_with_no_jobs_is_handled():
"""Listing when there are no jobs for this context should be handled gracefully."""
tool = FutureTaskTool()
cron_mgr = StubCronManager()
ctx = StubContext(cron_manager=cron_mgr)
cron_mgr.list_jobs.return_value = []
params = {
"action": "list",
}
result = await tool.call(ctx, **params)
cron_mgr.list_jobs.assert_awaited()
assert result is not None
@pytest.mark.asyncio
async def test_future_task_cron_manager_none_returns_expected_error():
"""When cron_manager is None, call should return or raise an appropriate error."""
tool = FutureTaskTool()
ctx = StubContext(cron_manager=None)
params = {
"action": "list",
}
with pytest.raises(Exception):
await tool.call(ctx, **params)These tests assume:
FutureTaskTool.callis defined asasync def call(self, ctx, **kwargs)and accepts a positional context followed by keyword parameters matching the tool schema. If the actual signature differs (e.g.,call(self, context, params: dict)), update allawait tool.call(ctx, **params)invocations accordingly.- The tool uses
ctx.cron_manager.add_active_job,ctx.cron_manager.list_jobs,ctx.cron_manager.delete_job, andctx.db.get_cron_jobas async functions. If these are accessed differently in your implementation (e.g., viactx.cron_mgr, orctx.cron_manager.db.get_cron_job), adjust the stub attribute names and whereStubDBis attached. - Error paths currently use broad
with pytest.raises(Exception)and minimal assertions onresult. Once you confirm the specific exception types and return formats/messages, you can tighten the assertions to:- Expect concrete exception classes (e.g.,
ValueErroror a custom tool error). - Assert that success results contain the expected user-facing text (e.g., including
"created","deleted","not found").
- Expect concrete exception classes (e.g.,
- The
listtests only assert thatlist_jobsis awaited and that a non-Noneresult is returned. IfFutureTaskToolformats job IDs or notes into the returned text, refine the tests to assert on inclusion/exclusion of specific job identifiers (for the same vs other sessions) based on the actual formatting.
| @dataclass | ||
| class CreateActiveCronTool(FunctionTool[AstrAgentContext]): | ||
| name: str = "create_future_task" | ||
| class FutureTaskTool(FunctionTool[AstrAgentContext]): |
There was a problem hiding this comment.
issue (complexity): Consider extracting the per-action logic in FutureTaskTool.call into dedicated helper methods and optionally dispatching via a handler map to keep the external API unified while simplifying the internal control flow.
You can keep the unified future_task surface and significantly reduce complexity by pushing the per-action logic into small helpers and using a simple dispatcher. This keeps the external API intact while restoring separation of concerns.
1. Extract per-action handlers
Move the three branches into focused private methods. This shrinks call and localizes logic:
@dataclass
class FutureTaskTool(FunctionTool[AstrAgentContext]):
name: str = "future_task"
# ... parameters unchanged ...
async def call(
self, context: ContextWrapper[AstrAgentContext], **kwargs
) -> ToolExecResult:
cron_mgr = context.context.context.cron_manager
if cron_mgr is None:
return "error: cron manager is not available."
action = str(kwargs.get("action") or "").strip().lower()
current_umo = context.context.event.unified_msg_origin
if action == "create":
return await self._handle_create(context, cron_mgr, **kwargs)
if action == "delete":
return await self._handle_delete(context, cron_mgr, current_umo, **kwargs)
if action == "list":
return await self._handle_list(cron_mgr, current_umo)
return "error: action must be one of create, delete, or list."Then reuse the existing code you already wrote, just moved into helpers:
async def _handle_create(
self,
context: ContextWrapper[AstrAgentContext],
cron_mgr,
**kwargs,
) -> ToolExecResult:
cron_expression = kwargs.get("cron_expression")
run_at = kwargs.get("run_at")
run_once = bool(kwargs.get("run_once", False))
note = str(kwargs.get("note", "")).strip()
name = str(kwargs.get("name") or "").strip() or "active_agent_task"
if not note:
return "error: note is required when action=create."
if run_once and not run_at:
return "error: run_at is required when run_once=true."
if (not run_once) and not cron_expression:
return "error: cron_expression is required when run_once=false."
if run_once and cron_expression:
cron_expression = None
run_at_dt = None
if run_at:
try:
run_at_dt = datetime.fromisoformat(str(run_at))
except Exception:
return "error: run_at must be ISO datetime, e.g., 2026-02-02T08:00:00+08:00"
payload = {
"session": context.context.event.unified_msg_origin,
"sender_id": context.context.event.get_sender_id(),
"note": note,
"origin": "tool",
}
job = await cron_mgr.add_active_job(
name=name,
cron_expression=str(cron_expression) if cron_expression else None,
payload=payload,
description=note,
run_once=run_once,
run_at=run_at_dt,
)
next_run = job.next_run_time or run_at_dt
suffix = (
f"one-time at {next_run}"
if run_once
else f"expression '{cron_expression}' (next {next_run})"
)
return f"Scheduled future task {job.job_id} ({job.name}) {suffix}."
async def _handle_delete(
self,
context: ContextWrapper[AstrAgentContext],
cron_mgr,
current_umo: str,
**kwargs,
) -> ToolExecResult:
job_id = kwargs.get("job_id")
if not job_id:
return "error: job_id is required when action=delete."
job = await cron_mgr.db.get_cron_job(str(job_id))
if not job:
return f"error: cron job {job_id} not found."
if _extract_job_session(job) != current_umo:
return "error: you can only delete future tasks in the current umo."
await cron_mgr.delete_job(str(job_id))
return f"Deleted cron job {job_id}."
async def _handle_list(
self,
cron_mgr,
current_umo: str,
) -> ToolExecResult:
jobs = [
job
for job in await cron_mgr.list_jobs()
if _extract_job_session(job) == current_umo
]
if not jobs:
return "No cron jobs found."
lines = [
f"{j.job_id} | {j.name} | {j.job_type} | "
f"run_once={getattr(j, 'run_once', False)} | "
f"enabled={j.enabled} | next={j.next_run_time}"
for j in jobs
]
return "\n".join(lines)This preserves all behavior, but:
callbecomes a thin dispatcher with a simple mental model.- Each action’s validation and behavior is isolated, easier to test/change.
2. Optional: use a small dispatcher map
If you want to further reduce branching in call, you can use a mapping (still keeping helpers):
async def call(self, context: ContextWrapper[AstrAgentContext], **kwargs):
cron_mgr = context.context.context.cron_manager
if cron_mgr is None:
return "error: cron manager is not available."
action = str(kwargs.get("action") or "").strip().lower()
current_umo = context.context.event.unified_msg_origin
handlers = {
"create": lambda: self._handle_create(context, cron_mgr, **kwargs),
"delete": lambda: self._handle_delete(context, cron_mgr, current_umo, **kwargs),
"list": lambda: self._handle_list(cron_mgr, current_umo),
}
handler = handlers.get(action)
if not handler:
return "error: action must be one of create, delete, or list."
return await handler()This keeps the single tool name and unified schema but restores small, single-responsibility units internally, directly addressing the “monolithic call” and “stringly-typed dispatcher” concerns without reverting your design.
|
Documentation Updates 1 document(s) were updated by changes in this PR: pr4697的改动View Changes@@ -442,9 +442,11 @@
用户可通过对话指令或 UI 创建定时任务,例如“每天早上8点发送今日早报”。主代理会注册一个 ActiveAgentCronJob,定时触发后自动执行相关脚本并通过 send_message_to_user 工具推送内容。
#### 相关工具
-- `create_future_task`(原 create_cron_job):创建定时任务
-- `delete_future_task`:删除定时任务
-- `list_future_tasks`:列出所有定时任务
+- `future_task`:统一的定时任务管理工具,通过 `action` 参数控制行为:
+ - `action='create'`:创建定时任务(替代原 create_future_task)
+ - `action='edit'`:更新现有定时任务(新功能)
+ - `action='delete'`:删除定时任务(替代原 delete_future_task)
+ - `action='list'`:列出所有定时任务(替代原 list_future_tasks)
---
@@ -804,6 +806,7 @@
- 查看、创建、编辑、删除定时任务
- 查看任务描述、触发条件等详细信息
+- 编辑功能允许更新现有定时任务的名称、说明、cron 表达式、run_at 时间等参数
#### API 支持
新增 CronRoute 等 API 路由,支持通过 API 管理定时任务。 |
There was a problem hiding this comment.
Code Review
This pull request consolidates the previously separate CreateActiveCronTool, DeleteCronJobTool, and ListCronJobsTool into a single, unified FutureTaskTool. This new tool uses an action parameter to handle creating, deleting, and listing scheduled tasks. The review feedback highlights that while the implementation requires specific fields like note or job_id depending on the action, the JSON schema only marks action as required; updating the schema would improve LLM reliability. Additionally, there is a recommendation to refactor the call method to use a consistently defined current_umo variable across all logic branches for better readability.
…ng origin patches Upstream changes integrated: - feat(discord): add configurable bot message filtering (AstrBotDevs#6505) - docs: fix path concatenation error in storage.md (AstrBotDevs#7448) - chore: remove lxml and bs4 deps (AstrBotDevs#7449) - fix: make desktop plugin dependency loading safer on Windows (AstrBotDevs#7446) - fix: split long telegram final segments (AstrBotDevs#7432) - adopted _send_text_chunks method - perf: merge 3 cron tools into 1 cron manage tool with edit capability (AstrBotDevs#7445) - chore: update logo in README.md Origin patches preserved: - Telegram enhancements: multi-image/video, spoiler, caption, reply_markup support - Telegram inline query and callback query event handlers - Telegram streaming improvements with draft API - Gemini provider fixes and enhancements - WeChat session renewal fix - Shell timeout parameter support Conflicts resolved in tg_event.py: adopted upstream's _send_text_chunks method while preserving origin's advanced message handling features. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
closes: #6516
closes: #6514
Modifications / 改动点
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Consolidate cron-based future task management into a single FutureTaskTool that supports creating, listing, and deleting scheduled tasks, and wire it into the main agent tooling.
New Features:
Enhancements:
Tests: